SpringBoot与ElasticSearch、ActiveMQ、RocketMQ的整合及多环境配置、响应式框架WebFlux、服务器端主动推送SSE技术、生产环境部署、Actuator监控平台
作者:互联网
1、SpringBoot 与 ElasticSearch 框架的整合
(1)主要的搜索框架:MySQL、Solr、ElasticSearch
- MySQL:使用 like 进行模糊查询,存在性能问题
- Solr:底层使用 Lucene,适用于中小规模数据量场景
- ElasticSearch:适用于数据量特别大的场景,PB、TB 级别。使用纯 Java开发,ElasticSearch 从 4 版本升级到 5 版本改动较大,但是 5 版本后,改动不大
(2)ElasticSearch 主要特点
- 全文检索、 结构化检索
- 数据统计、分析
- 接近实时处理,分布式搜索,可部署数百台服务器,处理 PB 级别的数据
- 搜索纠错,自动完成
- 适用场景:日志搜索,数据聚合,数据监控,报表统计分析
- 应用实例:维基百科、Stack Overflow、GitHub
(3)ElasticSearch 6.x 新特性
- 6.2.x 版本基于 Lucene 7.x,更快,性能进一步提升。其中对应的序列化组件,升级到 Jackson 2.8。6.x 版本不再支持一个索引库里面多个type,所以一个 index 索引库只能存在 1 个 type。MySQL 与 ElasticSearch 的对应关系如下:
MySQL: database table record
ElasticSearch: index type(只能存在一个) document
- 推荐使用 5.0 版本推出的 Java REST/HTTP 客户端,依赖少,比 Transport 使用更方便,在基准测试中,性能并不输于 Transport 客户端。推荐使用这种方式进行开发使用,在节点故障和特定响应代码的情况下进行故障转移,失败的连接处罚(失败的节点是否重试取决于失败的连续次数,失败的次数越多,客户端再次尝试同一节点之前等待的时间越长)。
(4)快速部署 ElasticSearch 5.6.x
- 配置 JDK 1.8
- 使用 wget 下载 ElasticSearch 安装包:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.8.tar.gz
- 解压安装包:
tar -zxvf elasticsearch-5.6.8.tar.gz
- 外网访问配置:
~]$ vim elasticsearch/conf/elasticsearch.yml
network.host: 0.0.0.0
- 报错一:
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 986513408 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/software/temp/elasticsearch-6.2.2/hs_err_pid1912.log
解决方案:内存不够,购买阿里云、腾讯云,亚马逊云的机器可以动态增加内存
6. 报错二:
[root@iZwz95j86y235aroi85ht0Z bin]# ./elasticsearch
[2018-02-22T20:14:04,870][WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [] uncaught exception in thread [main]
org.elasticsearch.bootstrap.StartupException: java.lang.RuntimeException: can not run elasticsearch as root
at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:125) ~[elasticsearch-6.2.2.jar:6.2.2]
at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:112) ~[elasticsearch-6.2.2.jar:6.2.2]
at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) ~[elasticsearch-6.2.2.jar:6.2.2]
at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124) ~[elasticsearch-cli-6.2.2.jar:6.2.2]
解决方案:只能使用非 root 用户运行 ElasticSearch,需要添加普通用户:
useradd -m es
passwd es
- 报错三:
~]$ ./elasticsearch
Exception in thread "main" java.nio.file.AccessDeniedException: /usr/local/software/temp/elasticsearch-6.2.2/config/jvm.options
解决方案:ElasticSearch 目录权限不够:
chmod 777 -R 当前es目录
- 集群测试
测试工具:Postman 工具
查看集群状态:localhost:9200/_cat/health?v
查看索引列表:localhost:9200/_cat/indices?v
(5)SpringBoot 整合 ElasticSearch
- pom.xml 文件中添加 Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
- application.properties 配置文件
spring.data.elasticsearch.cluster-name=elasticsearch
spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300
spring.data.elasticsearch.repositories.enabled=true
- Article.java
package net.xdclass.base_project.domain;
import java.io.Serializable;
import org.springframework.data.elasticsearch.annotations.Document;
/**
* 功能描述:文章对象
*/
// "blog" 与 "article" 都要小写
@Document(indexName = "blog", type = "article")
public class Article implements Serializable{
private static final long serialVersionUID = 1L;
private long id;
private String title;
private String summary;
private String content;
private int pv;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getSummary() {
return summary;
}
public void setSummary(String summary) {
this.summary = summary;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public int getPv() {
return pv;
}
public void setPv(int pv) {
this.pv = pv;
}
}
- ArticleRepository.java
package net.xdclass.base_project.repository;
import net.xdclass.base_project.domain.Article;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Repository;
@Component
//@Repository
public interface ArticleRepository extends ElasticsearchRepository<Article, Long> {
}
- ArticleController.java
package net.xdclass.base_project.controller;
import net.xdclass.base_project.domain.Article;
import net.xdclass.base_project.domain.JsonData;
import net.xdclass.base_project.repository.ArticleRepository;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/v1/article")
public class ArticleController {
@Autowired
private ArticleRepository articleRepository;
@GetMapping("save")
public Object save(long id,String title){
Article article = new Article();
article.setId(id);
article.setPv(123);
article.setContent("springboot整合elasticsearch,这个是新版本 2018年录制");
article.setTitle(title);
article.setSummary("搜索框架整合");
articleRepository.save(article);
return JsonData.buildSuccess();
}
@GetMapping("search")
public Object search(String title){
// 搜索全部文档
// QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
// 单个匹配,搜索标题为 title 的文档
QueryBuilder queryBuilder = QueryBuilders.matchQuery("title", title);
Iterable<Article> list = articleRepository.search(queryBuilder);
return JsonData.buildSuccess(list);
}
}
- 查看 ElasticSearch 中存放的数据:
查看索引信息:http://localhost:9200/_cat/indices?v
查看某个索引库结构:http://localhost:9200/blog
查看某个对象:http://localhost:9200/blog/article/1
2、SpringBoot 与 ActiveMQ 的整合
(1)JMS(Java Message Service,Java 消息服务),Java 平台中关于面向消息中间件的接口,是一种与厂商无关的 API,用来访问消息、收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系型数据库的接口。
示例:微信支付加消息队列:
- 基本概念:
- JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
- JMS生产者(Message Producer)
- JMS消费者(Message Consumer)
- JMS消息
- JMS队列
- JMS主题
- 编程模型(MQ中需要用的一些类):
- ConnectionFactory:连接工厂,JMS 用它创建连接
- Connection:JMS 客户端到 JMS Provider 的连接
- Session:一个发送或接收消息的线程
- Destination:消息的目的地,消息发送给谁.
- MessageConsumer / MessageProducer:消息的接收者,消费者
-
JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)
-
点对点类型:
-
发布/订阅类型:
- 特点:
跨平台、多语言、多项目、解耦、分布式事务、流量控制、最终一致性、RPC调用(上下游对接,数据源变动后通知下游)
(2)ActiveMQ 5.x 消息队列
- ActiveMQ 5.x 特点:
- 支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议
- 支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
- 完全支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
- Spring支持,ActiveMQ可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
- 支持在流行的J2EE服务器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中进行测试
- 使用JDBC和高性能日志支持非常快速的持久化
- ActiveMQ 5.x 安装:
- 下载地址:http://activemq.apache.org/activemq-5153-release.html
- 快速开始:http://activemq.apache.org/getting-started.html
- 如果是32位的机器,就双击 win32 目录下的 activemq.bat,如果是 64 位机器,则双击 win64 目录下的 activemq.bat
- bin 目录里面启动,选择对应的系统版本和位数,启动命令:
activeMQ start
- 启动后访问路径:http://127.0.0.1:8161/
- 用户名和密码默认都是 admin
- ActiveMQ 5.x 面板:
- Name:队列名称
- Number Of Pending Messages:等待消费的消息个数
- Number Of Consumers:当前连接的消费者数目
- Messages Enqueued:进入队列的消息总个数,包括出队列的和待消费的,这个数量只增不减
- Messages Dequeued:已经消费的消息数量
(3)SpringBoot 整合 ActiveMQ 之点对点消息和发布订阅模式
- pom.xml 中加入 Maven 依赖
## pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.xdclass</groupId>
<artifactId>base_project</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 整合消息队列ActiveMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 如果配置线程池则加入 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- application.properties 配置
## application.properties
# 整合jms测试,安装在别的机器,防火墙和端口号记得开放
spring.activemq.broker-url=tcp://127.0.0.1:61616
# 集群配置
# spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
spring.activemq.user=admin
spring.activemq.password=admin
# 下列配置要增加依赖
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100
# default point to point
# spring.jms.pub-sub-domain=true
- 各部分代码
## XdclassApplication.java
package net.xdclass.base_project;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@SpringBootApplication
@EnableJms // 开启支持 JMS
public class XdclassApplication {
@Bean
public Topic topic(){
return new ActiveMQTopic("video.topic");
}
@Bean
public Queue queue(){
return new ActiveMQQueue("common.queue");
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
public static void main(String[] args) {
SpringApplication.run(XdclassApplication.class, args);
}
}
## CommonConsumer.java
package net.xdclass.base_project.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class CommonConsumer {
@JmsListener(destination="common.queue")
public void receiveQueue(String text){
System.out.println("CommonConsumer收到的报文为:"+text);
}
}
## OrderConsumer.java
package net.xdclass.base_project.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class OrderConsumer {
@JmsListener(destination="order.queue")
public void receiveQueue(String text){
System.out.println("OrderConsumer收到的报文为:"+text);
}
}
## TopicSub.java
package net.xdclass.base_project.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TopicSub {
@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
public void receive1(String text){
System.out.println("video.topic 消费者:receive1="+text);
}
@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
public void receive2(String text){
System.out.println("video.topic 消费者:receive2="+text);
}
@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
public void receive3(String text){
System.out.println("video.topic 消费者:receive3="+text);
}
}
## ProducerService.java
package net.xdclass.base_project.service;
import javax.jms.Destination;
/**
* 功能描述:消息生产
*/
public interface ProducerService {
/**
* 功能描述:指定消息队列,还有消息
* @param destination
* @param message
*/
public void sendMessage(Destination destination, final String message);
/**
* 功能描述:使用默认消息队列, 发送消息
* @param message
*/
public void sendMessage( final String message);
/**
* 功能描述:消息发布者
* @param msg
*/
public void publish(String msg);
}
## ProducerServiceImpl.java
package net.xdclass.base_project.service.impl;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
import net.xdclass.base_project.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
/**
* 功能描述:消息生产者
*/
@Service
public class ProducerServiceImpl implements ProducerService{
@Autowired
private Queue queue;
// 用来发送消息到broker的对象
@Autowired
private JmsMessagingTemplate jmsTemplate;
// 发送消息,destination是发送到的队列,message是待发送的消息
@Override
public void sendMessage(Destination destination, String message) {
jmsTemplate.convertAndSend(destination, message);
}
// 发送消息,destination是发送到的队列,message是待发送的消息
@Override
public void sendMessage(final String message) {
jmsTemplate.convertAndSend( message);
}
//=======发布订阅相关代码=========
@Autowired
private Topic topic;
@Override
public void publish(String msg) {
this.jmsTemplate.convertAndSend(this.topic, msg);
}
}
## OrderController.java
package net.xdclass.base_project.controller;
import javax.jms.Destination;
import net.xdclass.base_project.domain.JsonData;
import net.xdclass.base_project.service.ProducerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 功能描述:模拟微信支付回调
*/
@RestController
@RequestMapping("/api/v1")
public class OrderController {
@Autowired
private ProducerService producerService;
/**
* 功能描述:微信支付回调接口
* @param msg 支付信息
* @return
*/
@GetMapping("order")
public Object order(String msg){
Destination destination = new ActiveMQQueue("order.queue");
producerService.sendMessage(destination, msg);
return JsonData.buildSuccess();
}
@GetMapping("common")
public Object common(String msg){
producerService.sendMessage(msg);
return JsonData.buildSuccess();
}
/**
* 功能描述:微信支付回调接口
* @param msg 支付信息
* @return
*/
@GetMapping("comment")
public Object comment(String msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
// 创建一个消息实例,包含 topic、tag 和 消息体
Message message = new Message("commentTopic","add", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
//同步的方式,会有返回结果,发送的是普通消息
SendResult result = msgProducer.getProducer().send(message);
System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
return JsonData.buildSuccess();
}
}
- 模拟请求:http://localhost:8080/api/v1/order?msg=12312321321312
3、SpringBoot 与 RocketMQ 的整合
(1)RocketMQ
Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件。
特点:
- 在高压下 1 毫秒内的响应延迟超过99.6%
- 适合金融类业务,高可用性跟踪和审计功能
- 支持发布订阅模式和点对点模式
- 支持拉 pull 和推 push 两种消息模式
- 单一队列支持百万消息访问量
- 支持单 master 节点,多 master 节点,多 master 多 slave 节点
概念:
- Producer:消息生产者
- Producer Group:消息生产者组,发送同类消息的一个消息生产者组
- Consumer:消费者
- Consumer Group:消费同个消息的多个实例
- Tag:标签,子主题(二级分类),用于区分同一个主题下的不同业务的消息
- Topic:主题
- Message:消息
- Broker:MQ程序,接收生产的消息,提供给消费者消费的程序
- Name Server:给生产和消费者提供路由信息,提供轻量级的服务发现和路由
官网地址:http://rocketmq.apache.org/。
(2)RocketMQ 本地部署
- 解压压缩包
- 进入 bin 目录,启动 namesrv:
nohup sh mqnamesrv &
- 查看日志:
tail -f nohup.out
结尾:The Name Server boot success. serializeType=JSON 表示启动成功。
- 启动 broker:
nohup sh mqbroker -n 127.0.0.1:9876 &
- 关闭 nameserver broker:
sh mqshutdown namesrv
sh mqshutdown broker
(3)RocketMQ 可视化控制台
- 下载源码:https://github.com/apache/rocketmq-externals
- 编译打包:
mvn clean package -Dmaven.test.skip=true
- target 目录通过
java -jar
的方式运行 - 如果无法连接获取 broker 的信息,则修改配置文件中 namesrvAddr 为本地 IP 地址及端口号,并在程序中 src/main/resources/application.properties 添加如下配置:
rocketmq.config.namesrvAddr=192.168.0.101:9876
- 访问地址:http://localhost:8080
- 注意:在阿里云,腾讯云或者虚拟机,记得检查端口号和防火墙是否启动
(4)SpringBoot 整合 RocketMQ
## pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.xdclass</groupId>
<artifactId>base_project</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<properties>
<rocketmq.version>4.1.0-incubating</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 整合RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
## application.properties
#通过触发器,去控制什么时候进行热加载部署新的文件
spring.devtools.restart.trigger-file=trigger.txt
#自定义启动banner文件的路径
spring.banner.location=banner.txt
# 消费者的组名
apache.rocketmq.consumer.PushConsumer=orderConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=127.0.0.1:9876
## MsgProducer.java
package net.xdclass.base_project.jms;
import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class MsgProducer {
// 生产者的组名
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
// NameServer 地址
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
private DefaultMQProducer producer ;
public DefaultMQProducer getProducer(){
return this.producer;
}
@PostConstruct
public void init() {
//生产者的组名
producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多个地址以 ; 隔开
//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
try {
// Producer对象在使用之前必须要调用start初始化,只能初始化一次
producer.start();
} catch (Exception e) {
e.printStackTrace();
}
// producer.shutdown(); 一般在应用上下文,关闭的时候进行关闭,用上下文监听器
}
}
## MsgConsumer.java
package net.xdclass.base_project.jms;
import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class MsgConsumer {
// 消费者的组名
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
// NameServer 地址
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
//消费者的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr(namesrvAddr);
try {
//设置consumer所订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("testTopic", "*");
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//MessageListenerOrderly 这个是有序的
//MessageListenerConcurrently 这个是无序的,并行的方式处理,效率高很多
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
System.out.println("messageExt: " + messageExt);//输出消息内容
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
## OrderController.java
package net.xdclass.base_project.controller;
import java.io.UnsupportedEncodingException;
import net.xdclass.base_project.domain.JsonData;
import net.xdclass.base_project.jms.MsgProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 功能描述:模拟微信支付回调
*/
@RestController
@RequestMapping("/api/v1")
public class OrderController {
@Autowired
private MsgProducer msgProducer;
/**
* 功能描述:微信支付回调接口
* @param msg 支付信息
* @param tag 消息二级分类
* @return
*/
@GetMapping("order")
public Object order(String msg, String tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
// 创建一个消息实例,包含 topic、tag 和 消息体
Message message = new Message("testTopic",tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = msgProducer.getProducer().send(message);
System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
return JsonData.buildSuccess();
}
/**
* 功能描述:微信支付回调接口
* @param msg 支付信息
* @return
*/
@GetMapping("comment")
public Object comment(String msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
// 创建一个消息实例,包含 topic、tag 和 消息体
Message message = new Message("commentTopic","add", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
//同步的方式,会有返回结果,发送的是普通消息
SendResult result = msgProducer.getProducer().send(message);
System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
return JsonData.buildSuccess();
}
}
- 报错一:org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local]
解决方案:多网卡问题处理。
- 在 producer 部分设置:
producer.setVipChannelEnabled(false);
- 编辑 RocketMQ 配置文件:broker.conf(下列 IP 为自己的 IP)
namesrvAddr = 192.168.0.101:9876
brokerIP1 = 192.168.0.101
- 报错二:DESC: service not available now, maybe disk full, CL:
解决方案:修改启动脚本 runbroker.sh,在里面增加一句话即可:
JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
即将磁盘保护的百分比设置成 98%(默认 0.9),只有磁盘空间使用率达到 98% 时才拒绝接收 producer 消息。
4、SpringBoot 多环境配置
- 不同环境使用不同配置,例如数据库配置,在开发的时候,我们一般用开发数据库,而在生产环境的时候,我们用正式的数据库
- 配置文件存放路径,可以存放在 classpath 根目录的 “/config” 包下,也可以存放在 classpath 的根目录下
- SpringBoot 允许通过命名约定按照一定的格式(application-{profile}.properties)来定义多个配置文件
## application-dev.properties
test.url=dev.com
## application-test.properties
test.url=test.com
## application.properties
test.url=local
#指定哪个profile
#spring.profiles.active=dev
## OrderController.java
package net.xdclass.base_project.controller;
import net.xdclass.base_project.domain.JsonData;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/v1")
public class OrderController {
@Value("${test.url}")
private String domain;
/**
* 功能描述:微信支付回调接口
* @param msg 支付信息
* @return
*/
@GetMapping("order")
public Object order(String msg){
return JsonData.buildSuccess(domain);
}
}
5、SpringBoot 响应式框架 WebFlux
- Spring WebFlux 是 Spring Framework 5.0 中引入的新的反应式 Web 框架,与 SpringMVC不同,它不需要 Servlet API,完全异步和非阻塞,并通过 Reactor 实现 Reactive Streams规范。
- Flux 和 Mono 就简单业务而言:和其他普通对象差别不大,而对复杂请求业务,就可以提升性能。
- Mono 表示的是包含 0 或者 1 个元素的异步序列,
Mono<User>
表示单一对象 User
Flux 表示的是包含 0 到 N 个元素的异步序列,Flux<User>
表示List<User>
Flux 和 Mono 之间可以进行转换。 - Spring WebFlux 有两种风格:基于功能和基于注解的。基于注解非常接近 SpringMVC模型。
- Spring WebFlux 项目不严格依赖于Servlet API,因此不能作为 war 文件部署,也不能使用src/main/webapp目录。
- Spring WebFlux 可以整合多个模板引擎,除了 REST Web 服务外,还可以使用 Spring WebFlux 提供动态 HTML 内容。Spring WebFlux 支持各种模板技术,包括Thymeleaf,FreeMarker。
- WebFlux 中,请求和响应不再是 WebMVC 中的 ServletRequest 和 ServletResponse,而是 ServerRequest 和 ServerResponse。
- pom.xml 中的依赖,如果同时存在 spring-boot-starter-web,则会优先用 spring-boot-starter-web。
## pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.xdclass</groupId>
<artifactId>base_project</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
## UserService.java
package net.xdclass.base_project.service;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import net.xdclass.base_project.domain.User;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class UserService {
private static final Map<String, User> dataMap = new HashMap<>();
static{
dataMap.put("1", new User("1", "小X老师"));
dataMap.put("2", new User("2", "小D老师"));
dataMap.put("3", new User("3", "小C老师"));
dataMap.put("4", new User("4", "小L老师"));
dataMap.put("5", new User("5", "小A老师"));
dataMap.put("6", new User("6", "小S老师"));
dataMap.put("7", new User("7", "小S老师"));
}
/**
* 功能描述:返回用户列表
* @return
*/
public Flux<User> list(){
Collection<User> list = UserService.dataMap.values();
return Flux.fromIterable(list);
}
/**
* 功能描述:根据id查找用户
* @param id
* @return
*/
public Mono<User> getById(final String id){
return Mono.justOrEmpty(UserService.dataMap.get(id));
}
/**
* 功能描述:根据id删除用户
* @param id
* @return
*/
public Mono<User> del(final String id){
return Mono.justOrEmpty(UserService.dataMap.remove(id));
}
}
## UserController.java
package net.xdclass.base_project.controller;
import java.time.Duration;
import net.xdclass.base_project.domain.User;
import net.xdclass.base_project.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/api/v1/user")
public class UserController {
//@Autowired
//private UserService userService;
private final UserService userService;
public UserController(final UserService userService) {
this.userService = userService;
}
@GetMapping("/test")
public Mono<String> test(){
return Mono.just("hello 小D课堂");
}
/**
* 功能描述:根据id找用户
* @param id
* @return
*/
@GetMapping("find")
public Mono<User> findByid(final String id){
return userService.getById(id);
}
/**
* 功能描述:删除用户
* @param id
* @return
*/
@GetMapping("del")
public Mono<User> del(final String id){
return userService.del(id);
}
/**
* 功能描述:列表
* @return
*/
@GetMapping(value="list",produces=MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<User> list(){
return userService.list().delayElements(Duration.ofSeconds(2));
}
}
- 启动方式默认是Netty,8080端口,访问地址:http://localhost:8080/api/v1/user/test。
- SpringBoot WebFlux 响应式客户端 WebClient:
## WebClientTest.java
package base_project.base;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@RunWith(SpringRunner.class) // 底层用junit SpringJUnit4ClassRunner
@SpringBootTest(classes={XdclassApplication.class}) // 启动整个springboot工程
public class WebClientTest {
@Test
public void testBase(){
Mono<String> resp = WebClient.create()
.get()
// 多个参数也可以直接放到map中,参数名与placeholder对应上即可
.uri("http://localhost:8080/api/v1/user/find?id=1")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class);
System.out.println(resp.block());
}
@Test
public void testPlaceHolder(){
Mono<String> resp = WebClient.create()
.get()
// 使用占位符
.uri("http://localhost:8080/api/v1/user/find?id={id}",1)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class);
System.out.println(resp.block());
}
}
6、SpringBoot 服务器端主动推送 SSE 技术
服务器端常用推送技术介绍:Ajax 定时拉取,WebSocket,SSE 轮询
- 客户端轮询:ajax定时拉取
- 服务端主动推送:WebSocket
- 全双工的,本质上是一个额外的 TCP 连接,建立和关闭时握手使用 HTTP 协议,其他数据传输不使用 HTTP 协议
- 更加复杂一些,适用于需要进行复杂双向数据通讯的场景
- 服务端主动推送:SSE(Server Send Event)
- HTML5 新标准,用来从服务端实时推送数据到浏览器端
- 直接建立在当前 HTTP 连接上,本质上是保持一个 HTTP 长连接,轻量协议
- 简单的服务器数据推送的场景,使用服务器推送事件
## SSEController.java
package net.xdclass.base_project.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/sse")
public class SSEController {
// 需要把 response 的类型改为 text/event-stream,才是 SSE 的类型
@RequestMapping(value = "/get_data", produces = "text/event-stream;charset=UTF-8")
public String push() {
try {
Thread.sleep(1000);
//第三方数据源调用
} catch (InterruptedException e) {
e.printStackTrace();
}
return "data:xdclass 行情" + Math.random() + "\n\n";
}
}
## index.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
<script type="text/javascript">
// 需要判断浏览器支不支持,可以去w3c进行查看
var source = new EventSource('sse/get_data');
source.onmessage = function (event) {
console.info(event.data);
document.getElementById('result').innerText = event.data
};
</script>
</head>
<body>
模拟股票行情
<div>xdclass test</div>
<div id="result"></div>
</body>
</html>
访问地址:http://localhost:8080/index.html
7、SpringBoot 阿里云服务器生产环境部署
- 去除相关生产环境不需要的 jar 包,比如热部署 dev-tool。
## pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.xdclass</groupId>
<artifactId>base_project</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 本地 maven 打包成 jar 包
mvn clean package -Dmaven.test.skip=true 跳过测试
- 打包指定配置文件
- 使用 Maven 的 profiles
- 使用 SpringBoot 的 profile=active
- 服务器端安装及配置 JDK
- Linux 下使用 wget 下载 JDK8。
- 配置环境变量:
vim /etc/profile
export JAVA_HOME=/usr/local/software/jdk8
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
- 使用
source /etc/profile
,让配置立刻生效。
- 向服务器上传 jar 包。上传工具:(windows)WinSCP、SecurityCRT、(mac)Filezilla。
- 远程登录服务器:
ssh root@120.79.160.143
。 - 运行 jar 包:
java -jar xxxx.jar
,运行方式可以是:守护进程、系统服务、shell 脚本。 - 访问路径:http://120.79.160.143:8080/api/v1/user/find。
- 服务无法访问的原因:
- 阿里云防火墙是否开启,可以选择关闭,关闭是不安全的,可以选择开放端口
- 阿里云的安全访问组,开启对应的端口,如果应用是以 80 端口启动,则默认可以访问
成熟的互联网公司应该有的架构:
- 本地提交生产代码到 gitlab仓库
- Jenkins 自动化构建
- 运维或者开发人员发布
8、SpringBoot Actuator 监控平台
- Spring Boot包含许多附加功能,可帮助您在将应用程序投入生产时监视和管理应用程序。 可以选择使用 HTTP 端点或 JMX 来管理和监控您的应用程序,自动应用于审计,健康和指标收集,一句话,SpringBoot 提供用于监控和管理生产环境的模块。
- pom.xml 中加入依赖
<!-- SpringBoot监控依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
- 出于安全考虑,除 /actuator/health 和 /actuator/info 之外的所有执行器默认都是禁用的。management.endpoints.web.exposure.include 属性可用于启用执行器。
在 application.properties 中加入如下配置:
#开启监控端点
management.endpoints.web.exposure.include=*
- 在设置 management.endpoints.web.exposure.include 之前,要确保暴露的执行器不包含敏感信息,或通过将其放置在防火墙进行控制,不对外进行使用。禁用的端点将从应用程序上下文中完全删除,可以只暴露部分访问接口:
management.endpoints.web.exposure.include=* // 开启全部
management.endpoints.web.exposure.include=metrics // 开启某个
management.endpoints.web.exposure.exclude=metrics // 关闭某个
- 或者可以使用 Spring-Boot-Admin 进行管理,相关资料:https://www.cnblogs.com/ityouknow/p/8440455.html。
- 或者自己编写脚本监控 CPU、内存、磁盘使用状况和 Nginx 服务器的 HTTP 响应状态码:200,404,5xx。
- 常用的 URL:
/actuator/health:查看应用健康指标
/actuator/metrics:查看应用基本指标列表
/actuator/metrics/{name}:通过上述列表查看具体指标
/actuator/env:显示来自 Spring 的 ConfigurableEnvironment 属性值
标签:服务器端,boot,WebFlux,springframework,String,org,import,public,SpringBoot 来源: https://blog.csdn.net/gongxifacai_believe/article/details/115784236