其他分享
首页 > 其他分享> > SpringBoot与ElasticSearch、ActiveMQ、RocketMQ的整合及多环境配置、响应式框架WebFlux、服务器端主动推送SSE技术、生产环境部署、Actuator监控平台

SpringBoot与ElasticSearch、ActiveMQ、RocketMQ的整合及多环境配置、响应式框架WebFlux、服务器端主动推送SSE技术、生产环境部署、Actuator监控平台

作者:互联网

1、SpringBoot 与 ElasticSearch 框架的整合

(1)主要的搜索框架:MySQL、Solr、ElasticSearch

(2)ElasticSearch 主要特点

(3)ElasticSearch 6.x 新特性

MySQL:			database	table					record
ElasticSearch:	index		type(只能存在一个)    	document

(4)快速部署 ElasticSearch 5.6.x

  1. 配置 JDK 1.8
  2. 使用 wget 下载 ElasticSearch 安装包:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.8.tar.gz
  1. 解压安装包:
tar -zxvf elasticsearch-5.6.8.tar.gz
  1. 外网访问配置:
~]$ vim elasticsearch/conf/elasticsearch.yml

network.host: 0.0.0.0
  1. 报错一:
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 986513408 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/software/temp/elasticsearch-6.2.2/hs_err_pid1912.log

解决方案:内存不够,购买阿里云、腾讯云,亚马逊云的机器可以动态增加内存
6. 报错二:

[root@iZwz95j86y235aroi85ht0Z bin]# ./elasticsearch
[2018-02-22T20:14:04,870][WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [] uncaught exception in thread [main]
org.elasticsearch.bootstrap.StartupException: java.lang.RuntimeException: can not run elasticsearch as root
	at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:125) ~[elasticsearch-6.2.2.jar:6.2.2]
	at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:112) ~[elasticsearch-6.2.2.jar:6.2.2]
	at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) ~[elasticsearch-6.2.2.jar:6.2.2]
	at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124) ~[elasticsearch-cli-6.2.2.jar:6.2.2]

解决方案:只能使用非 root 用户运行 ElasticSearch,需要添加普通用户:

useradd -m es
passwd es
  1. 报错三:
~]$ ./elasticsearch
Exception in thread "main" java.nio.file.AccessDeniedException: /usr/local/software/temp/elasticsearch-6.2.2/config/jvm.options

解决方案:ElasticSearch 目录权限不够:

chmod 777 -R 当前es目录
  1. 集群测试
    测试工具:Postman 工具
    查看集群状态:localhost:9200/_cat/health?v
    查看索引列表:localhost:9200/_cat/indices?v

(5)SpringBoot 整合 ElasticSearch

  1. pom.xml 文件中添加 Maven 依赖
<dependency>  
	<groupId>org.springframework.boot</groupId>  
	<artifactId>spring-boot-starter-data-elasticsearch</artifactId>  
</dependency>
  1. application.properties 配置文件
spring.data.elasticsearch.cluster-name=elasticsearch
spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300
spring.data.elasticsearch.repositories.enabled=true
  1. Article.java
package net.xdclass.base_project.domain;

import java.io.Serializable;
import org.springframework.data.elasticsearch.annotations.Document;

/**
 * 功能描述:文章对象
 */
// "blog" 与 "article" 都要小写
@Document(indexName = "blog", type = "article")
public class Article implements Serializable{

	private static final long serialVersionUID = 1L;
	private long id;
	private String title;
	private String summary;
	private String content;
	private int pv;

	public long getId() {
		return id;
	}
	public void setId(long id) {
		this.id = id;
	}
	public String getTitle() {
		return title;
	}
	public void setTitle(String title) {
		this.title = title;
	}
	public String getSummary() {
		return summary;
	}
	public void setSummary(String summary) {
		this.summary = summary;
	}
	public String getContent() {
		return content;
	}
	public void setContent(String content) {
		this.content = content;
	}
	public int getPv() {
		return pv;
	}
	public void setPv(int pv) {
		this.pv = pv;
	}
}
  1. ArticleRepository.java
package net.xdclass.base_project.repository;

import net.xdclass.base_project.domain.Article;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Repository;

@Component 
//@Repository
public interface ArticleRepository extends ElasticsearchRepository<Article, Long> {

}
  1. ArticleController.java
package net.xdclass.base_project.controller;

import net.xdclass.base_project.domain.Article;
import net.xdclass.base_project.domain.JsonData;
import net.xdclass.base_project.repository.ArticleRepository;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1/article")
public class ArticleController {

	@Autowired
	private ArticleRepository articleRepository;
	
	@GetMapping("save")
	public Object save(long id,String title){
		Article article = new Article();
		article.setId(id);
		article.setPv(123);
		article.setContent("springboot整合elasticsearch,这个是新版本 2018年录制");
		article.setTitle(title);
		article.setSummary("搜索框架整合");
		articleRepository.save(article);
		return JsonData.buildSuccess();
	}
	
	@GetMapping("search")
	public Object search(String title){
		// 搜索全部文档
		// QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
		// 单个匹配,搜索标题为 title 的文档
		QueryBuilder queryBuilder = QueryBuilders.matchQuery("title", title); 
		Iterable<Article> list =  articleRepository.search(queryBuilder);
		return JsonData.buildSuccess(list);
	}
}
  1. 查看 ElasticSearch 中存放的数据:
    查看索引信息:http://localhost:9200/_cat/indices?v
    查看某个索引库结构:http://localhost:9200/blog
    查看某个对象:http://localhost:9200/blog/article/1

2、SpringBoot 与 ActiveMQ 的整合

(1)JMS(Java Message Service,Java 消息服务),Java 平台中关于面向消息中间件的接口,是一种与厂商无关的 API,用来访问消息、收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系型数据库的接口。
示例:微信支付加消息队列:

  1. 基本概念:
  1. 编程模型(MQ中需要用的一些类):

  1. 特点:
    跨平台、多语言、多项目、解耦、分布式事务、流量控制、最终一致性、RPC调用(上下游对接,数据源变动后通知下游)

(2)ActiveMQ 5.x 消息队列

  1. 支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议
  2. 支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
  3. 完全支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
  4. Spring支持,ActiveMQ可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
  5. 支持在流行的J2EE服务器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中进行测试
  6. 使用JDBC和高性能日志支持非常快速的持久化
  1. 下载地址:http://activemq.apache.org/activemq-5153-release.html
  2. 快速开始:http://activemq.apache.org/getting-started.html
  3. 如果是32位的机器,就双击 win32 目录下的 activemq.bat,如果是 64 位机器,则双击 win64 目录下的 activemq.bat
  4. bin 目录里面启动,选择对应的系统版本和位数,启动命令:activeMQ start
  5. 启动后访问路径:http://127.0.0.1:8161/
  6. 用户名和密码默认都是 admin
  1. Name:队列名称
  2. Number Of Pending Messages:等待消费的消息个数
  3. Number Of Consumers:当前连接的消费者数目
  4. Messages Enqueued:进入队列的消息总个数,包括出队列的和待消费的,这个数量只增不减
  5. Messages Dequeued:已经消费的消息数量

(3)SpringBoot 整合 ActiveMQ 之点对点消息和发布订阅模式

## pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>net.xdclass</groupId>
  <artifactId>base_project</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.1.RELEASE</version>
	</parent>
	  
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!-- 整合消息队列ActiveMQ -->
		<dependency>  
            <groupId>org.springframework.boot</groupId>  
            <artifactId>spring-boot-starter-activemq</artifactId>  
        </dependency>
        <!-- 如果配置线程池则加入 -->
        <dependency>  
            <groupId>org.apache.activemq</groupId>  
            <artifactId>activemq-pool</artifactId>  
        </dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>
## application.properties

# 整合jms测试,安装在别的机器,防火墙和端口号记得开放
spring.activemq.broker-url=tcp://127.0.0.1:61616

# 集群配置
# spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)

spring.activemq.user=admin
spring.activemq.password=admin
# 下列配置要增加依赖
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

# default point to point
# spring.jms.pub-sub-domain=true
## XdclassApplication.java

package net.xdclass.base_project;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

@SpringBootApplication
@EnableJms				// 开启支持 JMS
public class XdclassApplication {

	@Bean
	public Topic topic(){
		return new ActiveMQTopic("video.topic");
	}
	
	@Bean
	public Queue queue(){
		return new ActiveMQQueue("common.queue");
	}

	@Bean
	public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
	}
	
	public static void main(String[] args) {
		SpringApplication.run(XdclassApplication.class, args);
	}
}
## CommonConsumer.java

package net.xdclass.base_project.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class CommonConsumer {

	@JmsListener(destination="common.queue")
	public void receiveQueue(String text){
		System.out.println("CommonConsumer收到的报文为:"+text);
	}
}
## OrderConsumer.java

package net.xdclass.base_project.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class OrderConsumer {

	@JmsListener(destination="order.queue")
	public void receiveQueue(String text){
		System.out.println("OrderConsumer收到的报文为:"+text);
	}
}
## TopicSub.java

package net.xdclass.base_project.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class TopicSub {

	@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
	public void receive1(String text){
		System.out.println("video.topic 消费者:receive1="+text);
	}
	
	@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
	public void receive2(String text){
		System.out.println("video.topic 消费者:receive2="+text);
	}
	
	@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
	public void receive3(String text){
		System.out.println("video.topic 消费者:receive3="+text);
	}
}
## ProducerService.java

package net.xdclass.base_project.service;

import javax.jms.Destination;

/**
 * 功能描述:消息生产
 */
public interface ProducerService {

	/**
	 * 功能描述:指定消息队列,还有消息
	 * @param destination
	 * @param message
	 */
	public void sendMessage(Destination destination, final String message);
	
	/**
	 * 功能描述:使用默认消息队列, 发送消息
	 * @param message
	 */
	public void sendMessage( final String message);
	
	/**
	 * 功能描述:消息发布者
	 * @param msg
	 */
	public void publish(String msg);
}
## ProducerServiceImpl.java

package net.xdclass.base_project.service.impl;

import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
import net.xdclass.base_project.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

/**
 * 功能描述:消息生产者
 */
@Service
public class ProducerServiceImpl implements ProducerService{

	@Autowired
	private Queue queue;
	
	// 用来发送消息到broker的对象
	@Autowired
	private JmsMessagingTemplate jmsTemplate;
	
	// 发送消息,destination是发送到的队列,message是待发送的消息
	@Override
	public void sendMessage(Destination destination, String message) {
		jmsTemplate.convertAndSend(destination, message);
	}
	
	// 发送消息,destination是发送到的队列,message是待发送的消息
	@Override
	public void sendMessage(final String message) {
		jmsTemplate.convertAndSend( message);
	}

	//=======发布订阅相关代码=========
	@Autowired
	private Topic topic;
	
	 @Override
	public void publish(String msg) {
		this.jmsTemplate.convertAndSend(this.topic, msg);
	}
}
## OrderController.java

package net.xdclass.base_project.controller;

import javax.jms.Destination;
import net.xdclass.base_project.domain.JsonData;
import net.xdclass.base_project.service.ProducerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 功能描述:模拟微信支付回调
 */
@RestController
@RequestMapping("/api/v1")
public class OrderController {
	
	@Autowired
	private ProducerService producerService;
	
	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @return
	 */
	@GetMapping("order")
	public Object order(String msg){
		Destination destination = new ActiveMQQueue("order.queue");
		producerService.sendMessage(destination, msg);
       	return JsonData.buildSuccess();
	}
	
	@GetMapping("common")
	public Object common(String msg){
		producerService.sendMessage(msg);	
       return JsonData.buildSuccess();
	}
	
	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @return
	 */
	@GetMapping("comment")
	public Object comment(String msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
	  
      // 创建一个消息实例,包含 topic、tag 和 消息体
      Message message = new Message("commentTopic","add", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
      //同步的方式,会有返回结果,发送的是普通消息
      SendResult result = msgProducer.getProducer().send(message);
      System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
      return JsonData.buildSuccess();
	}
}

3、SpringBoot 与 RocketMQ 的整合

(1)RocketMQ
Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件。
特点:

概念:

官网地址:http://rocketmq.apache.org/。

(2)RocketMQ 本地部署

nohup sh mqnamesrv & 
tail -f nohup.out

结尾:The Name Server boot success. serializeType=JSON 表示启动成功。

nohup sh mqbroker -n 127.0.0.1:9876 &
sh mqshutdown namesrv
sh mqshutdown broker

(3)RocketMQ 可视化控制台

mvn clean package -Dmaven.test.skip=true
rocketmq.config.namesrvAddr=192.168.0.101:9876



(4)SpringBoot 整合 RocketMQ

## pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>net.xdclass</groupId>
  <artifactId>base_project</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.1.RELEASE</version>
	</parent>

	<properties>
		<rocketmq.version>4.1.0-incubating</rocketmq.version>
	</properties>
	  
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!-- 整合RocketMQ -->
		<dependency>  
	    	<groupId>org.apache.rocketmq</groupId>  
	    	<artifactId>rocketmq-client</artifactId>  
	    	<version>${rocketmq.version}</version>  
		</dependency>  
		<dependency>  
	    	<groupId>org.apache.rocketmq</groupId>  
	    	<artifactId>rocketmq-common</artifactId>  
	    	<version>${rocketmq.version}</version>  
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>
## application.properties

#通过触发器,去控制什么时候进行热加载部署新的文件
spring.devtools.restart.trigger-file=trigger.txt

#自定义启动banner文件的路径
spring.banner.location=banner.txt

# 消费者的组名
apache.rocketmq.consumer.PushConsumer=orderConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=127.0.0.1:9876
## MsgProducer.java

package net.xdclass.base_project.jms;

import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class MsgProducer {
	 // 生产者的组名
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    // NameServer 地址
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private  DefaultMQProducer producer ;
    	
    public DefaultMQProducer getProducer(){
    	return this.producer;
    }
        
    @PostConstruct
    public void init() {
        //生产者的组名
    	producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
    	//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876"); 
        producer.setNamesrvAddr(namesrvAddr);        
        producer.setVipChannelEnabled(false);
        
        try {
            // Producer对象在使用之前必须要调用start初始化,只能初始化一次
            producer.start();
        } catch (Exception e) {
            e.printStackTrace();
        } 
        
        // producer.shutdown();  一般在应用上下文,关闭的时候进行关闭,用上下文监听器
    }
}
## MsgConsumer.java

package net.xdclass.base_project.jms;

import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class MsgConsumer {
    // 消费者的组名
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    // NameServer 地址
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        
        try {
            //设置consumer所订阅的Topic和Tag,*代表全部的Tag
            consumer.subscribe("testTopic", "*");
          
            //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息
            //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                       
            //MessageListenerOrderly 这个是有序的
            //MessageListenerConcurrently 这个是无序的,并行的方式处理,效率高很多
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {           
                        System.out.println("messageExt: " + messageExt);//输出消息内容
                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);                        
                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });            
            
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
## OrderController.java

package net.xdclass.base_project.controller;

import java.io.UnsupportedEncodingException;
import net.xdclass.base_project.domain.JsonData;
import net.xdclass.base_project.jms.MsgProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 功能描述:模拟微信支付回调
 */
@RestController
@RequestMapping("/api/v1")
public class OrderController {
	
	@Autowired
	private MsgProducer msgProducer;
	
	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @param tag 消息二级分类
	 * @return
	 */
	@GetMapping("order")
	public Object order(String msg, String tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
	  
	   // 创建一个消息实例,包含 topic、tag 和 消息体
       Message message = new Message("testTopic",tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));       
       SendResult result = msgProducer.getProducer().send(message);       
       System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());     
       return JsonData.buildSuccess();
	}

	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @return
	 */
	@GetMapping("comment")
	public Object comment(String msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
	  
	  // 创建一个消息实例,包含 topic、tag 和 消息体
      Message message = new Message("commentTopic","add", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));      
      //同步的方式,会有返回结果,发送的是普通消息
      SendResult result = msgProducer.getProducer().send(message);      
      System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());    
      return JsonData.buildSuccess();
	}
}
  1. 在 producer 部分设置:
producer.setVipChannelEnabled(false);
  1. 编辑 RocketMQ 配置文件:broker.conf(下列 IP 为自己的 IP)
namesrvAddr = 192.168.0.101:9876
brokerIP1 = 192.168.0.101
JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"

即将磁盘保护的百分比设置成 98%(默认 0.9),只有磁盘空间使用率达到 98% 时才拒绝接收 producer 消息。

4、SpringBoot 多环境配置

## application-dev.properties

test.url=dev.com
## application-test.properties

test.url=test.com
## application.properties

test.url=local

#指定哪个profile
#spring.profiles.active=dev
## OrderController.java

package net.xdclass.base_project.controller;

import net.xdclass.base_project.domain.JsonData;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1")
public class OrderController {
	
	@Value("${test.url}")
	private String domain;

	/**
	 * 功能描述:微信支付回调接口
	 * @param msg 支付信息
	 * @return
	 */
	@GetMapping("order")
	public Object order(String msg){
       return JsonData.buildSuccess(domain);
	}	
}

5、SpringBoot 响应式框架 WebFlux

## pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>net.xdclass</groupId>
  <artifactId>base_project</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.1.RELEASE</version>
	</parent>  
	<dependencies>			
		<!-- <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>  -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>
## UserService.java

package net.xdclass.base_project.service;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import net.xdclass.base_project.domain.User;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class UserService {
	
	private static final Map<String, User> dataMap = new HashMap<>();
	
	static{
		dataMap.put("1", new User("1", "小X老师"));
		dataMap.put("2", new User("2", "小D老师"));
		dataMap.put("3", new User("3", "小C老师"));
		dataMap.put("4", new User("4", "小L老师"));
		dataMap.put("5", new User("5", "小A老师"));
		dataMap.put("6", new User("6", "小S老师"));
		dataMap.put("7", new User("7", "小S老师"));
	}
	
	/**
	 * 功能描述:返回用户列表
	 * @return
	 */
	public Flux<User> list(){
		Collection<User> list = UserService.dataMap.values();
		return Flux.fromIterable(list);
	}
	
	/**
	 * 功能描述:根据id查找用户
	 * @param id
	 * @return
	 */
	public Mono<User> getById(final String id){
		return Mono.justOrEmpty(UserService.dataMap.get(id));
	}
	
   /**
    * 功能描述:根据id删除用户
    * @param id
    * @return
    */
	public Mono<User> del(final String id){
		return Mono.justOrEmpty(UserService.dataMap.remove(id));
	}
}
## UserController.java

package net.xdclass.base_project.controller;

import java.time.Duration;
import net.xdclass.base_project.domain.User;
import net.xdclass.base_project.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/api/v1/user")
public class UserController {
	
	//@Autowired
	//private UserService userService;
	
	private final UserService userService;
	
	public UserController(final UserService userService) {
		this.userService = userService;
	}
	
	@GetMapping("/test")
	public Mono<String> test(){
		return Mono.just("hello 小D课堂");
	}
	
	/**
	 * 功能描述:根据id找用户
	 * @param id
	 * @return
	 */
	@GetMapping("find")
	public Mono<User> findByid(final String id){
		return userService.getById(id);
	}
	
	/**
	 * 功能描述:删除用户
	 * @param id
	 * @return
	 */
	@GetMapping("del")
	public Mono<User> del(final String id){
		return userService.del(id);
	}
	
	/**
	 * 功能描述:列表
	 * @return
	 */
	@GetMapping(value="list",produces=MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux<User> list(){
		return userService.list().delayElements(Duration.ofSeconds(2));
	}
}
## WebClientTest.java

package base_project.base;

import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

@RunWith(SpringRunner.class)  							// 底层用junit  SpringJUnit4ClassRunner
@SpringBootTest(classes={XdclassApplication.class})		// 启动整个springboot工程
public class WebClientTest {

	@Test
  	public void testBase(){
    	Mono<String> resp = WebClient.create()
        	.get()
        	// 多个参数也可以直接放到map中,参数名与placeholder对应上即可
        	.uri("http://localhost:8080/api/v1/user/find?id=1")
        	.accept(MediaType.APPLICATION_JSON)
        	.retrieve()
        	.bodyToMono(String.class);
    	System.out.println(resp.block());
  	}
	
	@Test
 	public void testPlaceHolder(){
    	Mono<String> resp = WebClient.create()
        	.get()
        	// 使用占位符
        	.uri("http://localhost:8080/api/v1/user/find?id={id}",1) 
        	.accept(MediaType.APPLICATION_JSON)
        	.retrieve()
        	.bodyToMono(String.class);
    	System.out.println(resp.block());	 
	}
}

6、SpringBoot 服务器端主动推送 SSE 技术

服务器端常用推送技术介绍:Ajax 定时拉取,WebSocket,SSE 轮询

  1. 客户端轮询:ajax定时拉取
  2. 服务端主动推送:WebSocket
  1. 服务端主动推送:SSE(Server Send Event)
## SSEController.java

package net.xdclass.base_project.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/sse")
public class SSEController {
	// 需要把 response 的类型改为 text/event-stream,才是 SSE 的类型
    @RequestMapping(value = "/get_data", produces = "text/event-stream;charset=UTF-8")
    public String push() {
    	  
          try {
              Thread.sleep(1000); 
              //第三方数据源调用
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          return "data:xdclass 行情" + Math.random() + "\n\n";
    }
}
## index.html

<!DOCTYPE html>
<html>
	<head>
		<meta charset="UTF-8">
		<title>Insert title here</title>
		<script type="text/javascript">  
			// 需要判断浏览器支不支持,可以去w3c进行查看
			var source = new EventSource('sse/get_data');
			source.onmessage = function (event) {
  				console.info(event.data);
  				document.getElementById('result').innerText = event.data
			};
		</script>  
	</head>
	<body>
		模拟股票行情
 		<div>xdclass test</div>  
    	<div id="result"></div>  
	</body>
</html>

访问地址:http://localhost:8080/index.html

7、SpringBoot 阿里云服务器生产环境部署

  1. 去除相关生产环境不需要的 jar 包,比如热部署 dev-tool。
## pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>net.xdclass</groupId>
  <artifactId>base_project</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.1.RELEASE</version>
	</parent>
	  
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>
  1. 本地 maven 打包成 jar 包
mvn clean package -Dmaven.test.skip=true 跳过测试
  1. 打包指定配置文件
  1. 服务器端安装及配置 JDK
vim /etc/profile
	
export JAVA_HOME=/usr/local/software/jdk8
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
  1. 向服务器上传 jar 包。上传工具:(windows)WinSCP、SecurityCRT、(mac)Filezilla。
  2. 远程登录服务器:ssh root@120.79.160.143
  3. 运行 jar 包:java -jar xxxx.jar,运行方式可以是:守护进程、系统服务、shell 脚本。
  4. 访问路径:http://120.79.160.143:8080/api/v1/user/find。
  5. 服务无法访问的原因:

成熟的互联网公司应该有的架构:

  1. 本地提交生产代码到 gitlab仓库
  2. Jenkins 自动化构建
  3. 运维或者开发人员发布

8、SpringBoot Actuator 监控平台

<!-- SpringBoot监控依赖 -->
<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-actuator</artifactId>  
</dependency>
#开启监控端点
management.endpoints.web.exposure.include=*
management.endpoints.web.exposure.include=*				// 开启全部
management.endpoints.web.exposure.include=metrics		// 开启某个
management.endpoints.web.exposure.exclude=metrics		// 关闭某个

标签:服务器端,boot,WebFlux,springframework,String,org,import,public,SpringBoot
来源: https://blog.csdn.net/gongxifacai_believe/article/details/115784236