标签:return rabbitmq 接口 goodsId 秒杀 private 06 message public
秒杀项目06-接口优化
- 上一部分回顾
- 思路
- 1. Redis预减库存减少数据库访问
- 2. 内存标记减少Redis访问
- 3. 请求先入队缓冲,异步下单,增强用户体验
- 4. RabbitMQ安装与Spring Boot集成
- 5. Nginx水平扩展
- 6. 压测
上一部分回顾
超卖问题
- 数据库加唯一索引: 防止用户重复购买
- SQL加库存数量判断: 防止库存变成负数
思路
- 系统初始化,把商品库存数量加载到Redis
- 收到请求,Redis预减库存,库存不足,直接返回,否则进入3
- 请求入队,立即返回排队中
- 请求出队,生成订单,减少库存
- 客户端轮询,是否秒杀成功
1. Redis预减库存减少数据库访问
程序初始化时就把每一个秒杀商品的库存信息放到redis中,当有秒杀请求时,我们先通过redis中的库存信息进行预减库存,当redis中的库存<0时,就直接返回了,后面的请求对服务器的压力就很小了。
2. 内存标记减少Redis访问
通过一个Map来存储一个秒杀商品和秒杀商品是否秒杀结束的映射,当redis缓存中的对应的商品库存信息小于0时,我们就对map进行添加映射,来表示该商品已经秒杀结束,当后面的请求过来时,直接从map中取商品是否秒杀结束的信息,秒杀结束直接返回,这样对服务器的压力就更小了。
3. 请求先入队缓冲,异步下单,增强用户体验
通过RabbitMQ消息队列进行异步下单,对用户的秒杀请求进行入队缓冲。
MiaoshaController.java
@Autowired
private GoodsService goodsService;
@Autowired
private RedisService redisService;
@Autowired
private OrderService orderService;
@Autowired
private MiaoshaService miaoshaService;
@Autowired
private MQSender sender;
private Map<Long, Boolean> localOverMap = new HashMap<>();
@RequestMapping(value = "/do_miaosha", method= RequestMethod.POST)
@ResponseBody
public Result<Integer> doMiaosha(Model model, MiaoshaUser user, @RequestParam("goodsId")long goodsId) {
model.addAttribute("user", user);
if (user == null) {
return Result.fail(CodeMsg.SESSION_ERROR);
}
//内存标记,减少redis访问
boolean over = localOverMap.get(goodsId);
if(over) {
return Result.fail(CodeMsg.MIAO_SHA_OVER);
}
//预减库存
long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock, ""+goodsId);//10
if(stock < 0) {
localOverMap.put(goodsId, true);
return Result.fail(CodeMsg.MIAO_SHA_OVER);
}
//判断是否已经秒杀到了
MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId);
if(order != null) {
return Result.fail(CodeMsg.REPEAT_MIAOSHA);
}
//入队
MiaoshaMessage mm = new MiaoshaMessage();
mm.setUser(user);
mm.setGoodsId(goodsId);
sender.sendMiaoshaMessage(mm);
return Result.success(0);//排队中
/*// 判断库存
GoodsVo goods = goodsService.getGoodsVoByGoodsId(goodsId);
int stock = goods.getStockCount();
if (stock <= 0) {
return Result.fail(CodeMsg.MIAO_SHA_OVER);
}
// 判断是否已经秒杀到了
MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId);
if (order != null) {
return Result.fail(CodeMsg.REPEAT_MIAOSHA);
}
// 减库存 下订单 写入秒杀订单
OrderInfo orderInfo = miaoshaService.miaosha(user, goods);
return Result.success(orderInfo);*/
}
/**
* Controller实现了InitializingBean,系统初始化会调用该方法
**/
@Override
public void afterPropertiesSet() throws Exception {
List<GoodsVo> goodsList = goodsService.listGoodsVo();
if(goodsList == null) {
return;
}
for(GoodsVo goods : goodsList) {
redisService.set(GoodsKey.getMiaoshaGoodsStock, ""+goods.getId(), goods.getStockCount());
localOverMap.put(goods.getId(), false);
}
}
/**
* orderId:成功
* -1:秒杀失败
* 0: 排队中
*
* 该请求获取秒杀结果信息
**/
@RequestMapping(value="/result", method=RequestMethod.GET)
@ResponseBody
public Result<Long> miaoshaResult(Model model,MiaoshaUser user,@RequestParam("goodsId")long goodsId) {
model.addAttribute("user", user);
if(user == null) {
return Result.fail(CodeMsg.SESSION_ERROR);
}
long result =miaoshaService.getMiaoshaResult(user.getId(), goodsId);
return Result.success(result);
}
MQSender.java
@Service
public class MQSender {
private static final Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMiaoshaMessage(MiaoshaMessage mm) {
String msg = RedisService.beanToString(mm);
log.info("send message: " + msg);
amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg);
}
}
MQReceiver.java
@Service
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
@Autowired
private RedisService redisService;
@Autowired
private GoodsService goodsService;
@Autowired
private OrderService orderService;
@Autowired
private MiaoshaService miaoshaService;
@RabbitListener(queues = MQConfig.MIAOSHA_QUEUE)
public void receive(String message) {
log.info("receive miaoshamessage: " + message);
MiaoshaMessage mm = redisService.stringToBean(message, MiaoshaMessage.class);
MiaoshaUser user = mm.getUser();
long goodsId = mm.getGoodsId();
GoodsVo goods = goodsService.getGoodsVoByGoodsId(goodsId);
int stock = goods.getStockCount();
if(stock <= 0) {
return;
}
//判断是否已经秒杀到了
/*MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId);
if(order != null) {
return;
}*/
//减库存 下订单 写入秒杀订单
miaoshaService.miaosha(user, goods);
}
}
MQConfig.java
@Configuration
public class MQConfig {
public static final String MIAOSHA_QUEUE = "miaosha.queue";
/**
* Direct模式 交换机Exchange
* */
@Bean
public Queue queue() {
//return new Queue(QUEUE, true);
return new Queue(MIAOSHA_QUEUE, true);
}
}
MiaoshaService.java
@Service
public class MiaoshaService {
@Autowired
private GoodsService goodsService;
@Autowired
private OrderService orderService;
@Autowired
private RedisService redisService;
@Transactional
public OrderInfo miaosha(MiaoshaUser user, GoodsVo goods) {
// 减库存 下订单 写入秒杀订单
boolean success = goodsService.reduceStock(goods);
if (success) {
// order_info miaosha_order
return orderService.createOrder(user, goods);
}
setGoodsOver(goods.getId());
return null;
}
private void setGoodsOver(Long goodsId) {
redisService.set(MiaoshaKey.isGoodsOver, ""+goodsId, true);
}
private boolean getGoodsOver(long goodsId) {
return redisService.exists(MiaoshaKey.isGoodsOver, ""+goodsId);
}
public long getMiaoshaResult(Long userId, long goodsId) {
MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(userId, goodsId);
if(order != null) {//秒杀成功
return order.getOrderId();
}else {
boolean isOver = getGoodsOver(goodsId);
if(isOver) {
return -1;
}else {
return 0;
}
}
}
}
OrderService.java
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RedisService redisService;
public MiaoshaOrder getMiaoshaOrderByUserIdGoodsId(long userId, long goodsId) {
//return orderMapper.getMiaoshaOrderByUserIdGoodsId(userId, goodsId);
return redisService.get(OrderKey.getMiaoshaOrderByUidGid, ""+userId+"_"+goodsId, MiaoshaOrder.class);
}
@Transactional
public OrderInfo createOrder(MiaoshaUser user, GoodsVo goods) {
OrderInfo orderInfo = new OrderInfo();
orderInfo.setCreateDate(new Date());
orderInfo.setDeliveryAddrId(0L);
orderInfo.setGoodsCount(1);
orderInfo.setGoodsId(goods.getId());
orderInfo.setGoodsName(goods.getGoodsName());
orderInfo.setGoodsPrice(goods.getMiaoshaPrice());
orderInfo.setOrderChannel(1);
orderInfo.setStatus(1);
orderInfo.setStatus(0);
orderInfo.setUserId(user.getId());
//insert方法会把自增的主键设置到对象属性中
orderMapper.insert(orderInfo);
MiaoshaOrder miaoshaOrder = new MiaoshaOrder();
miaoshaOrder.setGoodsId(goods.getId());
miaoshaOrder.setOrderId(orderInfo.getId());
miaoshaOrder.setUserId(user.getId());
orderMapper.insertMiaoshaOrder(miaoshaOrder);
redisService.set(OrderKey.getMiaoshaOrderByUidGid, ""+user.getId()+"_"+goods.getId(), miaoshaOrder);
return orderInfo;
}
public OrderInfo getOrderById(long orderId) {
return orderMapper.selectById(orderId);
}
}
4. RabbitMQ安装与Spring Boot集成
4.1 RabbitMQ安装
Erlang与RabbitMQ版本对应关系
安装Erlang
RabbitMQ下载
上传到服务器上
开始安装erlang
- 添加依赖
yum -y install ncurses-devel
- 解压
tar -xzvf otp_src_xxx.tar.gz
- 对erlang进行配置
./configure --prefix=/usr/local/erlang --without-javac
- 编译安装erlang
make & make install
- 将erlang中命令添加到环境变量并刷新配置
echo 'export PATH=$PATH:/usr/local/erlang/bin' >> /etc/profile
source /etc/profile
开始安装RabbitMQ
- 安装依赖
yum -y install xz
yum -y install python
yum -y install xmlto
yum -y install python-simplejson
- 解压
xz -d rabbitmqxxxx
tar -xvzf rabbitmqxxxx
- 启动rabbitmq并查看启动日志
# 后台启动
./rabbitmq-server -detached
tail -f /usr/local/rabbitmq//var/log/rabbitmq/rabbit@hmx.log
4. 配置rabbitmq环境变量
echo 'export PATH=$PATH:/usr/local/rabbitmq/sbin' >> /etc/profile
- 查看rabbitmq启动状态
netstat -nap | grep 5672
4.2 SpringBoot集成RabbitMQ上
1. 添加依赖spring-boot-starter-amqp
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.2</version>
</dependency>
2. 添加配置文件
#rabbitmq
spring.rabbitmq.host=192.168.174.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 消费者数量
spring.rabbitmq.listener.simple.concurrency= 10
# 消费者最大数量
spring.rabbitmq.listener.simple.max-concurrency= 10
# 每个消费者可以未确认的最大未确认消息数。
spring.rabbitmq.listener.simple.prefetch= 1
# 是否在启动时自动启动容器。
spring.rabbitmq.listener.simple.auto-startup=true
# 默认情况下是否重新排队被拒绝的交货。
spring.rabbitmq.listener.simple.default-requeue-rejected= true
# 是否启用发布重试。
spring.rabbitmq.template.retry.enabled=true
# 第一次和第二次尝试传递消息之间的持续时间。
spring.rabbitmq.template.retry.initial-interval=1000
# 最大重试次数
spring.rabbitmq.template.retry.max-attempts=3
# 最大重试间隔
spring.rabbitmq.template.retry.max-interval=10000
# 应用于前一个重试间隔的乘数。
spring.rabbitmq.template.retry.multiplier=1.0
允许guest用户被远程访问
- 在/rabbitmq/etc/rabbitmq/目录下创建rabbitmq.conf配置文件,并添加如下内容
loopback_users = none
- 重启rabbitmq
# 停止rabbitmq
rabbitmqctl stop
# 启动rabbitmq
rabbitmq-server
3. 创建消息接收者
MQConfig.java
@Configuration
public class MQConfig {
public static final String QUEUE = "queue";
@Bean
public Queue queue() {
return new Queue(QUEUE,true);
}
}
MQReceiver.java
@Service
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitListener(queues = MQConfig.QUEUE)
public void receive(String message) {
log.info("receive message: " + message);
}
}
4. 创建消息发送者
将RedisService中的beanToString和stringToBean改成静态方法
MQSender.java
@Service
public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void send(Object message) {
String msg = RedisService.beanToString(message);
log.info("send message: " + msg);
amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
}
}
DemoController.java
@Controller
@RequestMapping("/demo")
public class DemoController {
@Autowired
private UserService userService;
@Autowired
private RedisService redisService;
@Autowired
MQSender sender;
@RequestMapping("/mq")
@ResponseBody
public Result<String> mq() {
sender.send("Hello, World");
return Result.success("Hello, World");
}
}
启动应用,访问接口
4.3 SpringBoot集成RabbitMQ下-四种交换机(exchange)
Direct Exchange
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
Fanout Exchange
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
Topic Exchange
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”。
Headers Exchange
不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。
匹配规则x-match有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息
MQConfig.java
@Configuration
public class MQConfig {
public static final String MIAOSHA_QUEUE = "miaosha.queue";
public static final String QUEUE = "queue";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String HEADER_QUEUE = "header.queue";
public static final String TOPIC_EXCHANGE = "topicExchage";
public static final String FANOUT_EXCHANGE = "fanoutxchage";
public static final String HEADERS_EXCHANGE = "headersExchage";
/**
* Direct模式 交换机Exchange
* */
@Bean
public Queue queue() {
return new Queue(QUEUE, true);
}
/**
* Topic模式 交换机Exchange
* */
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1, true);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2, true);
}
@Bean
public TopicExchange topicExchage(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");
}
/**
* Fanout模式 交换机Exchange
* */
@Bean
public FanoutExchange fanoutExchage(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding FanoutBinding1() {
return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());
}
@Bean
public Binding FanoutBinding2() {
return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());
}
/**
* Header模式 交换机Exchange
* */
@Bean
public HeadersExchange headersExchage(){
return new HeadersExchange(HEADERS_EXCHANGE);
}
@Bean
public Queue headerQueue1() {
return new Queue(HEADER_QUEUE, true);
}
@Bean
public Binding headerBinding() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("header1", "value1");
map.put("header2", "value2");
return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();
}
}
MQSender.java
@Service
public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void send(Object message) {
String msg = RedisService.beanToString(message);
log.info("send message: " + msg);
amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
}
public void sendTopic(Object message) {
String msg = RedisService.beanToString(message);
log.info("send topic message:"+msg);
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1");
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2");
}
public void sendFanout(Object message) {
String msg = RedisService.beanToString(message);
log.info("send fanout message:"+msg);
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
}
public void sendHeader(Object message) {
String msg = RedisService.beanToString(message);
log.info("send fanout message:"+msg);
MessageProperties properties = new MessageProperties();
properties.setHeader("header1", "value1");
properties.setHeader("header2", "value2");
Message obj = new Message(msg.getBytes(), properties);
amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);
}
}
MQReceiver.java
@Service
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitListener(queues = MQConfig.QUEUE)
public void receive(String message) {
log.info("receive message: " + message);
}
@RabbitListener(queues=MQConfig.TOPIC_QUEUE1)
public void receiveTopic1(String message) {
log.info(" topic queue1 message:"+message);
}
@RabbitListener(queues=MQConfig.TOPIC_QUEUE2)
public void receiveTopic2(String message) {
log.info(" topic queue2 message:"+message);
}
@RabbitListener(queues=MQConfig.HEADER_QUEUE)
public void receiveHeaderQueue(byte[] message) {
log.info(" header queue message:"+new String(message));
}
}
DemoController.java
@Controller
@RequestMapping("/demo")
public class DemoController {
@Autowired
private UserService userService;
@Autowired
private RedisService redisService;
@Autowired
private GoodsService goodsService;
@Autowired
MQSender sender;
@RequestMapping("/mq/header")
@ResponseBody
public Result<String> mqHeaders() {
sender.sendHeader("Hello, World");
return Result.success("Hello, World");
}
@RequestMapping("/mq/fanout")
@ResponseBody
public Result<String> mqFanout() {
sender.sendFanout("Hello, World");
return Result.success("Hello, World");
}
@RequestMapping("/mq/topic")
@ResponseBody
public Result<String> mqTopic() {
sender.sendTopic("Hello, World");
return Result.success("Hello, World");
}
@RequestMapping("/mq")
@ResponseBody
public Result<String> mq() {
sender.send("Hello, World");
return Result.success("Hello, World");
}
}
测试
- Direct Exchange
- Topic Exchange
- Fanout Exchange
- Headers Exchange
5. Nginx水平扩展
可以将项目的jar包放到多台器上运行,配置一下反向代理的路径,负载均衡。达到高可用的效果。
6. 压测
将项目更新后的新的jar包放到服务器上运行,然后进行压测,在使用这么多的优化手段后,可以感觉到qps高了很多。
标签:return,rabbitmq,接口,goodsId,秒杀,private,06,message,public
来源: https://blog.csdn.net/qq_43478625/article/details/120863297
本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。