黑马点评day4:使用RabbitMQ实现消息队列

视频中是使用Redis来实现的消息队列,但是一般来说很少业务使用redis来实现消息队列,一般使用更合适的专门的消息队列中间件

从上述看的出来redis的消息队列是很难比专门的消息队列中间件有优势的,同时为了让自己的点评项目有更多的亮点,对应的业务点使用更匹配的技术,所以使用RabbitMq来实现消息队列

搭建RabbitMQ过程

在虚拟机上docker实现搭建RabbitMQ,如何搭建docker?这里就不详细讲诉,不过还是要有虚拟机的

首先拉取镜像

docker run --privileged=true -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
mkdir -p /usr/local/docker/rabbitmq

再创建rabbitmq容器,下面的命令已经能够创建之后直接启动了,同时建议不要去掉–privileged=true这个参数,可能导致rabbitmq无法启动成功

docker run --privileged=true -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

启动之后,在浏览器中输入

http://192.168.200.130:15672/#/
->也就是http://你的虚拟机地址:15672

登录的密码以及账号默认都是guest

进入后页面是这样的

创建交换机以及queues

随后首先创建一个交换机(Exchange),命名为seckill.direct

随后点击queues再创建queues,名字为seckill.order.queue

创建好queue以及交换机后,回到交换机处,为交换机绑定一个queue,以便以后交换机接受到指定Rounting key的消息后,能够将信息交给指定queue消费信息,实现消息队列(也就是将订单消息交给交换机,交换机将消息给到queue,queue收到消息后就能消费信息,使用订单信息完成扣减库存等)

  • 首先点击你创建的交换机
  • 点击binding,指定要绑定的queue以及Routing Key

好了,基础环境搭建好了,现在能够测试一下了

首先在spring项目中添加依赖以及做好配置

  1. 导入RabbitMQ的场景启动器

    org.springframework.boot
    spring-boot-starter-amqp

  1. 配置连接信息
  rabbitmq:
    host: 192.168.200.130
    port: 5672
    virtual-host: /

测试消息队列是否正常工作

在utils包下创建监听指定queue的类,一旦有消息交给交换机,交换机就会根据RountingKey分发给指定的queue来消费消息

package com.hmdp.utils;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SeckillOrderListener1 {
    @RabbitListener(queues = {"seckill.order.queue"})
    public void recieveMessage(Object message){
        System.out.println("监听到了"+message);
    }
}

当然了,创建了有人消费,就要有人生产消息

随后为该类创建测试类,给交换机发送消息即可

@SpringBootTest
@RunWith(SpringRunner.class)
public class SeckillOrderListenerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessage(){
        for(int i=0;i<100;i++){
            rabbitTemplate.convertAndSend("seckill.direct","seckill.order","测试发送消息"+i);

        }
    }
}

启动服务并且运行测试类

可以看到可以正常收消息以及发送

添加序列化配置

自定义RabbitMQ中json格式的消息的序列化机制

@Configuration
public class RabbitMQConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

还有注意,需要配置先开启手动提交,否则业务执行失败,监听到的消息就丢失了

  rabbitmq:
    host: 192.168.200.130
    port: 5672
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual

修改业务代码

消息队列功能正常之后就很方便了

首先就是直接将订单交给阻塞队列的部分直接变成交给消息队列RabbitMQ,首先先向服务里注入RabbitmqTemplate


    @Resource
    private RabbitTemplate rabbitTemplate;

在seckillVoucher方法中修改订单交给消息队列,其他的不用的可以直接注释了,包括原有的创建的线程,init()方法,以及BlockingQueue orderTasks

   public Result seckillVoucher(Long voucherId) {
        // 1、执行Lua脚本,判断用户是否具有秒杀资格
        Long result = null;
        try {
            result = stringRedisTemplate.execute(
                    SECKILL_SCRIPT,
                    Collections.emptyList(),
                    voucherId.toString(),
                    UserHolder.getUser().getId().toString()
            );
        } catch (Exception e) {
            log.error("Lua脚本执行失败");
            throw new RuntimeException(e);
        }
        if (result != null && !result.equals(0L)) {
            // result为1表示库存不足,result为2表示用户已下单
            int r = result.intValue();
            return Result.fail(r == 2 ? "不能重复下单" : "库存不足");
        }
        //有购买之歌
        Long orderId = redisIdWorker.nextId("order");
        //TODO 保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(UserHolder.getUser().getId());
        voucherOrder.setVoucherId(voucherId);
        // 将订单保存到阻塞队列中
        rabbitTemplate.convertAndSend("seckill.direct","seckill.order",voucherOrder);
//        orderTasks.add(voucherOrder);
//        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
//        this.proxy=proxy;
        return Result.ok(orderId);
    }

而且还要还要注意handleVoucherOrder的逻辑是否正确,现在不再需要代理对象来调用createVoucher方法(也许?毕竟已经有lua脚本了)

    public void handleVoucherOrder(VoucherOrder voucherOrder){
        Long voucherId = voucherOrder.getVoucherId();
        //2.扣减库存
        boolean success = iSeckillVoucherService.update().setSql("stock=stock-1")
                .eq("voucher_id", voucherId)
                //======判断当前库存是否大于0就可以决定是否能抢池子中的券了
                .gt("stock", 0)
                .update();
        //3.创建订单
        save(voucherOrder);
    }

创建消费者

创建一个Listener文件夹,创建消费者专门监听交换器,收到信息传来的订单就执行处理订单方法,调用VoucherOrderService中的handleVoucherOrder来处理订单,下单并修改数据库

@Component
public class SeckillOrderListener {
    @Autowired
    VoucherOrderServiceImpl voucherOrderService;
    @RabbitListener(queues = {"seckill.order.queue"})
    public void recieveMessage(Message message, Channel channel, VoucherOrder voucherOrder){
        try {
            voucherOrderService.handleVoucherOrder(voucherOrder);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

ok了,改造完成,下面就是下单的测试结果了

下单前

下单后

数据库下单后

redis下单后

版权声明

   站内部分内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供网络资源分享服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请联系我们一经核实,立即删除。并对发布账号进行永久封禁处理。在为用户提供最好的产品同时,保证优秀的服务质量。


本站仅提供信息存储空间,不拥有所有权,不承担相关法律责任。
大数据

mercalli使用教程(mercari使用教程)

2025-3-5 10:23:58

大数据

hadoop分布式搭建步骤(分布式 hadoop)

2025-3-5 10:24:00

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧