视频中是使用Redis来实现的消息队列,但是一般来说很少业务使用redis来实现消息队列,一般使用更合适的专门的消息队列中间件
从上述看的出来redis的消息队列是很难比专门的消息队列中间件有优势的,同时为了让自己的点评项目有更多的亮点,对应的业务点使用更匹配的技术,所以使用RabbitMq来实现消息队列
搭建RabbitMQ过程
在虚拟机上docker实现搭建RabbitMQ,如何搭建docker?这里就不详细讲诉,不过还是要有虚拟机的
首先拉取镜像
docker run --privileged=true -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
mkdir -p /usr/local/docker/rabbitmq
再创建rabbitmq容器,下面的命令已经能够创建之后直接启动了,同时建议不要去掉–privileged=true这个参数,可能导致rabbitmq无法启动成功
docker run --privileged=true -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
启动之后,在浏览器中输入
http://192.168.200.130:15672/#/
->也就是http://你的虚拟机地址:15672
登录的密码以及账号默认都是guest
进入后页面是这样的
创建交换机以及queues
随后首先创建一个交换机(Exchange),命名为seckill.direct
随后点击queues再创建queues,名字为seckill.order.queue
创建好queue以及交换机后,回到交换机处,为交换机绑定一个queue,以便以后交换机接受到指定Rounting key的消息后,能够将信息交给指定queue消费信息,实现消息队列(也就是将订单消息交给交换机,交换机将消息给到queue,queue收到消息后就能消费信息,使用订单信息完成扣减库存等)
- 首先点击你创建的交换机
- 点击binding,指定要绑定的queue以及Routing Key
好了,基础环境搭建好了,现在能够测试一下了
首先在spring项目中添加依赖以及做好配置
- 导入RabbitMQ的场景启动器
org.springframework.boot
spring-boot-starter-amqp
- 配置连接信息
rabbitmq:
host: 192.168.200.130
port: 5672
virtual-host: /
测试消息队列是否正常工作
在utils包下创建监听指定queue的类,一旦有消息交给交换机,交换机就会根据RountingKey分发给指定的queue来消费消息
package com.hmdp.utils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SeckillOrderListener1 {
@RabbitListener(queues = {"seckill.order.queue"})
public void recieveMessage(Object message){
System.out.println("监听到了"+message);
}
}
当然了,创建了有人消费,就要有人生产消息
随后为该类创建测试类,给交换机发送消息即可
@SpringBootTest
@RunWith(SpringRunner.class)
public class SeckillOrderListenerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage(){
for(int i=0;i<100;i++){
rabbitTemplate.convertAndSend("seckill.direct","seckill.order","测试发送消息"+i);
}
}
}
启动服务并且运行测试类
可以看到可以正常收消息以及发送
添加序列化配置
自定义RabbitMQ中json格式的消息的序列化机制
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
还有注意,需要配置先开启手动提交,否则业务执行失败,监听到的消息就丢失了
rabbitmq:
host: 192.168.200.130
port: 5672
virtual-host: /
listener:
simple:
acknowledge-mode: manual
修改业务代码
消息队列功能正常之后就很方便了
首先就是直接将订单交给阻塞队列的部分直接变成交给消息队列RabbitMQ,首先先向服务里注入RabbitmqTemplate
@Resource
private RabbitTemplate rabbitTemplate;
在seckillVoucher方法中修改订单交给消息队列,其他的不用的可以直接注释了,包括原有的创建的线程,init()方法,以及BlockingQueue orderTasks
public Result seckillVoucher(Long voucherId) {
// 1、执行Lua脚本,判断用户是否具有秒杀资格
Long result = null;
try {
result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
UserHolder.getUser().getId().toString()
);
} catch (Exception e) {
log.error("Lua脚本执行失败");
throw new RuntimeException(e);
}
if (result != null && !result.equals(0L)) {
// result为1表示库存不足,result为2表示用户已下单
int r = result.intValue();
return Result.fail(r == 2 ? "不能重复下单" : "库存不足");
}
//有购买之歌
Long orderId = redisIdWorker.nextId("order");
//TODO 保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
// 将订单保存到阻塞队列中
rabbitTemplate.convertAndSend("seckill.direct","seckill.order",voucherOrder);
// orderTasks.add(voucherOrder);
// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
// this.proxy=proxy;
return Result.ok(orderId);
}
而且还要还要注意handleVoucherOrder的逻辑是否正确,现在不再需要代理对象来调用createVoucher方法(也许?毕竟已经有lua脚本了)
public void handleVoucherOrder(VoucherOrder voucherOrder){
Long voucherId = voucherOrder.getVoucherId();
//2.扣减库存
boolean success = iSeckillVoucherService.update().setSql("stock=stock-1")
.eq("voucher_id", voucherId)
//======判断当前库存是否大于0就可以决定是否能抢池子中的券了
.gt("stock", 0)
.update();
//3.创建订单
save(voucherOrder);
}
创建消费者
创建一个Listener文件夹,创建消费者专门监听交换器,收到信息传来的订单就执行处理订单方法,调用VoucherOrderService中的handleVoucherOrder来处理订单,下单并修改数据库
@Component
public class SeckillOrderListener {
@Autowired
VoucherOrderServiceImpl voucherOrderService;
@RabbitListener(queues = {"seckill.order.queue"})
public void recieveMessage(Message message, Channel channel, VoucherOrder voucherOrder){
try {
voucherOrderService.handleVoucherOrder(voucherOrder);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
ok了,改造完成,下面就是下单的测试结果了
下单前
下单后
数据库下单后
redis下单后