2024/05/14

MQ 消息队列

队列具有特性:

  • 解藕
  • 异步执行
  • 消峰

遵循AMQP协议

RabbitMQ 支持事务操作

支持模式:(点对点、发布订阅)

  • 生产者-队列-消费者
  • 生产者-队列-多消费者
  • 生产者-交换机-队列-消费者
    • 交换机模式 fanout 广播所有消费者
    • Direct 指定routekey,匹配消费者发布
    • Topic 指定routekey,指定通配符的形式,匹配消费者发布

生产者:

生产者连接到RabbitMQ Broker,建立一个连接(Connection)开启一个信道(Channel)

生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等

生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等

生产者通过路由键将交换器和队列绑定起来

生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息

相应的交换器根据接收到的路由键查找相匹配的队列如果找到,则将从生产者发送过来的消息存入相应的队列中如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

关闭信道

关闭连接

消费者:

消费者连接到RabbitMQ Broker ,建立一个连接(Connection),开启一个信道(Channel)

消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数

等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息

消费者确认(ack) 接收到的消息

RabbitMQ从队列中删除相应己经被确认的消息

关闭信道

关闭连接信道

信道

TCP连接的“昂贵”性

假设消费者要消费消息,并根据服务需求合理调度线程,若只进行TCP连接,那么当高并发的时候,每秒可能都有成千上万的TCP连接,不仅仅是对TCP连接的浪费,也很快会超过操作系统每秒所能建立连接的数量。 如果能在一条TCP连接上操作,又能保证各个线程之间的私密性就完美了,于是信道的概念出现了。

信道是建立在Connection之上的虚拟连接。当应用程序与Rabbit Broker建立TCP连接的时候,客户端紧接着可以创建一个AMQP信道(Channel) ,每个信道都会被指派一个唯一的ID。 RabbitMQ 处理的每条AMQP指令都是通过信道完成的。信道就像电缆里的光纤束,一条电缆内含有许多光纤束,允许所有的连接通过多条光线束进行传输和接收。

关于生产者消费者我们需要了解几个概念:

Producer:生产者,即消息投递者一方。

消息:消息一般分两个部分——消息体(payload)和标签。标签用来描述这条消息,如:一个交换器的名称或者一个路由Key,Rabbit通过解析标签来确定消息的去向,payload是消息内容可以使一个json、数组等等。

Consumer:消费者,就是接收消息的一方。消费者订阅RabbitMQ的队列,当消费者消费一条消息时,只是消费消息的消息体。在消息路由的过程中,会丢弃标签,存入到队列中的只有消息体。

Broker:消息中间件的服务节点

Queue:队列,是RabbitMQ的内部对象,用于存储消息。RabbitMQ中消息只能存储在队列中,生产者投递消息到队列,消费者从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(轮询)给多个消费者进行 消费,而不是每个消费者都收到所有的消息进行消费。

注意:RabbitMQ不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑定多个队列,由多个消费者来订阅这些队列的方式。

Exchange:交换器。在RabbitMQ中,生产者并非直接将消息投递到队列中。真实情况是,生产者将消息发送到Exchange(交换器),由交换器将消息路由到一个或多个队列中。如果路由不到,或返回给生产者,或直接丢弃,或做其它处理。

RoutingKey:路由Key。生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。这个路由Key需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。在交换器类型和绑定键固定的情况下, 生产者可以在发送消息给交换器时通过指定RoutingKey来决定消息流向哪里。

Binding:RabbitMQ通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键,这样RabbitMQ就可以指定如何正确的路由到队列了。

从这里我们可以看到在RabbitMQ中交换器和队列实际上可以是一对多,也可以是多对多关系。交换器和队列就像我们关系数据库中的两张表。它们同归BindingKey做关联(多对多关系表)。在我们投递消息时,可以通过Exchange和RoutingKey (对应BindingKey)就可以找到相对应的队列。

了解协议 AMQP

https://baike.baidu.com/item/AMQP/8354716?fr=aladdin

了解队列

应用场景

https://www.cnblogs.com/jajian/p/10257555.html

https://mp.weixin.qq.com/s?__biz=MzU0OTk3ODQ3Ng==&mid=2247484149&idx=1&sn=98186297335e13ec7222b3fd43cfae5a&chksm=fba6eaf6ccd163e0c2c3086daa725de224a97814d31e7b3f62dd3ec763b4abbb0689cc7565b0&scene=21#wechat_redirect

  • 复杂系统的解耦 系统A产生核心数据需要同步成百个系统,由队列路由转发就可以了。

  • 复杂链路的异步调用

  • 瞬时高峰的削峰处理

RabbitMQ基础知识 https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

参考: https://segmentfault.com/a/1190000012016574

https://www.cnblogs.com/oskyhg/p/8521705.html

RabbitMQ持久化消息 RabbitMQ实现持久化消息需满足以下3个条件:

delivery_mode=2 使用durable=True声明exchange是持久化 使用durable=True声明queue是持久化

delivery_mode delivery_mode=2指明message为持久的. delivery_mode 投递消息模式 1 . ram 2 . disc 设置为disc后能从AMQP服务器崩溃中恢复消息–持久化 但效率比 ram:disc = 3:1 durable durable (默认false) rabbitmq重启后queue和Exchange会被清除,包括数据。

basic.qos函数来进行流量控制:$channel->basic_qos(null, 1, null); 第二个参数prefetch_count来控制每次消费数量。

代码参考:https://github.com/WalkingSun/Jump/blob/master/commands/RabbitmqController.php

各大队列使用优缺:

  • RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此综合考虑过后,公司采取了RabbitMQ
  • Kafka的优势在于专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计
  • RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。

image

持久化 持久化

ack 业务消费服务器收到队列消息宕机,队列系统已经删了数据,如果是减库存的操作,这个单子就会被遗漏,丢失

https://mp.weixin.qq.com/s?__biz=MzU0OTk3ODQ3Ng==&mid=2247484204&idx=1&sn=6fc43b0620857b653dbef20693d1c6c6&chksm=fba6eb2fccd16239056e4b52dc0895585292b830bfd2652dea81b7360556fe36aceac0951761&scene=21

性能优化

rabbitmq 队列除了发布和消费吞吐量外,有个评价MQ队列效率的重要指标 Consumer utillsation

在最佳利用率情况下,这个值能够达到100%,并且生产中总是应该达到100%,为了达到这个利用率,需要增加消费者的数量。

但在很多系统中,这个值可能低于5%。这一般是由于下列原因所致

  • 消费者太少;
  • 消费端的ack太慢;
  • 消费者太多;
  • 非自动ack;
  • 增加fetch值(这个根据我们的实际经验,lan内影响很有限)

Search

    Table of Contents