RabbitMQ消息队列

作者: 炒焖煎糖板栗

RabbitMQ简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现 image-20211016103857772

核心概念

Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。 Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。 Exchange类型

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型:direct(默认),fanout,topic,和headers,不同类型的Exchange转发消息的策略有所区别 Direct Exchange

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"dog”,则只转发 routingkey 标记为"dog"的消息,不会转发"dog.puppy”,也不会转发"dog.guard” 等等。它是完全匹配、单播的模式。 image-20211017114901124 Fanout Exchange

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。 image-20211017115035522 Topic Exchange topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开 。它同样也会识别两个通配符:符号”“和符号 。匹配0个或多个单词,匹配一个单词。 image-20211017115210823 Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。 Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。 Exchange和Queue的绑定可以是多对多的关系。 Connection

网络连接,比如一个TCP连接。 Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都 是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。 Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个mini版的RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时 指定,RabbitMQ 默认的vhost是/。 Broker

表示消息队列服务器实体

Docker安装RabbitMQ

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p  25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

访问15672端口 image-20211016103739247 https://www.rabbitmq.com/networking.html

SpringCloud整合RabbitMQ

引入RabbitMQ包

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

引入RabbitMQ,RabbitAutoConfiguration就会自动生效

给容器中自动配置了RabbitTemplate、AmqpAdmin等等 配置文件

spring.rabbitmq.host=192.168.195.100
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

测试类

@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {
    @Autowired
    AmqpAdmin amqpAdmin;
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void sendMessageTest() {
        //因为存到rabbit中是经过序列化的,所以加上配置转成json发出去
        OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity();
        orderReturnReasonEntity.setId(1L);
        orderReturnReasonEntity.setCreateTime(new Date());
        rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnReasonEntity);
        log.info("消息发送成功");
    }
    @Test
    void createExchange() {
        //创建了一个Direct类型的交换机  是否持久化 是否自动删除
        DirectExchange directExchange=new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange创建成功");
    }
    @Test
    void createQueue() {
        Queue queue=new Queue("hello-java-Queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("Queue创建成功");
    }
    @Test
    void createBinding() {
        //将exchange指定的交换机和Directnation目的地进行绑定,使用routingkey作为路由键
        Binding binding=new Binding("hello-java-Queue",
                Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello-java",null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding绑定成功");
    }
}

先创建交换机,然后创建对队列,绑定路由键,利用rabbitTemplate发送消息 image-20211017102318987 @RabbitListenter\&@RabbitHandler接收消息 @RabbitListenter监听消息

    @RabbitListener(queues = )
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel)
    {
        System.out.println("接收到消息内容:"+message+"内容==》"+content);
    }

image-20211017110606120

如果有多个客户端,只有一个会收到消息,并且只有当一个消息处理完才会收到下一个消息

如果需要监听一个队列里的多个消息,消息的类型都不一样利用@RabbitHandler

监听hello-java-Queue队列里不同的消息

@RabbitListener(queues = )
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    @RabbitHandler
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content)
    {
        System.out.println("接收到消息内容:"+message+"内容==》"+content);
    }
    @RabbitHandler
    public  void  recieveMessage2(OrderEntity orderEntity)
    {
        System.out.println("接收到消息内容:"+orderEntity);
    }
}

控制器

@Slf4j
@Controller
public class RabbitController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMq")
    public String sendMessageTest() {
        for (int i = 0; i < 10; i++) {
            if(i%2==0) {
                //因为存到rabbit中是经过序列化的,所以加上配置转成json发出去
                OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity();
                orderReturnReasonEntity.setId(1L);
                orderReturnReasonEntity.setCreateTime(new Date());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnReasonEntity);
                log.info("消息发送成功");
            }
            else {
                OrderEntity orderEntity=new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderEntity);
                log.info("消息发送成功");
            }
        }
        return  "";
    }
}

image-20211017112512949

RabbitMQ消息确认机制-可靠抵达

保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制 publisher confirmCallback 确认模式 publisher returnCallback 未投递到 queue 退回模式 consumer ack机制 image-20211017121332850 可靠抵达-ConfirmCallback 如果要使用confirmCallback ,需要配置

开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
  1. 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。
  2. CorrelationData:用来表示当前消息唯一性。
  3. 生产者只要把消息发送给Broker,消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。
  4. 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。
    @PostConstruct
    public  void initRabbitTemplate()
    {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /*
            * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
            * @param ack 消息是否成功收到
            * @param cuase 失败的原因
            * */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cuase) {
                System.out.println("confirm... cottrlationData"+correlationData+",[ack]"+ack+",cuase"+cuase);
            }
        });
    }
    
    可靠抵达-ReturnCallback

开启发送消息抵达队列的确认

spring.rabbitmq.publisher-returns=true

只要抵达队列,以异步发动有限回调们这个returnconfig
spring.rabbitmq.template.mandatory=true

只有当消息没有抵达队列才会触发方法

  @PostConstruct
    public  void initRabbitTemplate()
    {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /*
            * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
            * @param ack 消息是否成功收到
            * @param cuase 失败的原因
            * */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cuase) {
                System.out.println("confirm... cottrlationData"+correlationData+",[ack]"+ack+",cuase"+cuase);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            //投递失败的详细信息 回复的状态码 回复的文本内容 当时这个消息给给哪个交换机 当时消息的路由键
            @Override
            public void returnedMessage(Message message, int replaycode, String replytext, String exchange, String routekey) {
                System.out.println("Fail...message"+message+",[replaycode]"+replaycode+",[replytext]"+replytext+",[exchange]"+exchange+",[routekey]"+routekey);
            }
        });
    }

可靠抵达-Ack消息确认机制

在不开启手动确认的时候,发送消息突然服务器关机会导致消息丢失,因此需要开启手动模式保证消息的可达性

手动确认消息达到
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费者手动确认模式下 只要没有明确确认消息,就一直是unached状态,即使关机 消息也不会丢失,会重新变为Ready

    @RabbitHandler
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException {
        System.out.println("接收到消息内容:"+message+"内容==》"+content);
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        //签收消息
        channel.basicAck(deliveryTag,false);//true就是重新发回服务器
        System.out.println("消息签收"+deliveryTag);
    }
    @RabbitHandler
    public  void  recieveMessage2(Message message,OrderEntity orderEntity,Channel channel) throws IOException {
        System.out.println("接收到消息内容:"+orderEntity);
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        //签收消息
        channel.basicNack(deliveryTag,false,true);// 退货  true就是重新发回服务器
        System.out.println("没有签收"+deliveryTag);
    }

消息处理成功,ack(),接受下一个消息,此消息broker就会移除

消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack

消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人 如何签收

channel.basicAck(deliveryTag,false) 签收
channel.basicNack(deliveryTag,false,true); 拒签

原文创作:炒焖煎糖板栗

原文链接:https://www.cnblogs.com/cg-ww/p/15422014.html

更多推荐

更多
  • Pulsar消息队列-一套高可用实时消息系统实现 实时消息【即时通信】系统,有群聊和单聊两种方式,其形态异于消息队列:1 大量的 group 信息变动,群聊形式的即时通信系统在正常服务形态下,瞬时可能有大量用户登入登出。2 ...
  • Pulsar消息队列-Pulsar对比Kafka笔记 很多人查看 Pulsar 之前可能对 Kafka 很熟悉,参照上图可见二者内部结构的区别,Pulsar 和 Kafka 都是以 Topic 描述一个基本的数据集合,Topic 数据又分为若干 Partition,即对数据进行逻辑上的 ...
  • Pulsar消息队列-对 2017 年一套 IM 系统的反思 信系统的开发,前前后后参与或者主导了六七个 IM 系统的研发。上一次开发的 IM 系统的时间点还是 2018 年,关于该系统的详细描述见 [一套高可用实时消息系统实现][1] ...
  • Apache InLong编译-如何编译 下载源码,编译二进制文件,编译Docker镜像,下载源码load/main/)下载源码. 编译二进制文件mvn clean install -DskipTests,编译Docker镜像 mvn clean package -...
  • Apache InLong编译-入库 Hive 示例 安装 Hive,安装 InLong,新建接入,审批接入,配置 Agent 采集文件, 本节用一个简单的示例,帮助您快速体验 InLong 的完整流程。 安装 Hivee,这里推荐使用 Docker 进行快速安装,在开始之前,我们需要安装 ...
  • Apache InLong编译-使用 Pulsar 示例 安装 Pulsar,安装 Hive,安装 InLong,创建数据接入,数据接入审批,配置 Agent 采集文件,数据落地检查,问题排查,配置数据流 Group 信息,配置数据流,配置文件 Agent,配置数据格式,配置 Hive ...
  • Apache InLong文档-Sort 插件 总览,扩展 Extract Node,扩展 Load Node,集成 Extract 和 Load Node 到 InLong Sort 主流程,总览 InLong Sort 是一个基于 Apache Flink SQL 的 ETL ...
  • Apache InLong文档-Dashboard 插件 总览,集成新的 Load Node 到 InLong Dashboard 的主流程,总览 本文面向 InLong新增一个 Load Node,让插件开发变得简单。 InLong Dashboard 本身作为前端控制台,Inlong ...
  • Apache InLong文档-Manager 插件 总览,扩展读取节点,扩展写入节点,总览 -, **Apache Kafka** , **ClickHouse** 等, 详细内容可参考 [数据节点] 扩展读取节点 -
  • Apache InLong文档-Agent 插件 总览,概念和模型,流程图示,开发流程,接口,任务配置,Message,Reader,Sink,Source,总览 在 Standard Architecture 支持以插件的方式扩展新的采集类型,本文将指导开发者如何自定义新的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多