MQ 消息队列
队列具有特性:
- 解藕
- 异步执行
- 消峰
遵循AMQP协议
RabbitMQ 支持事务操作
支持模式:(点对点、发布订阅)
- 生产者-队列-消费者
- 生产者-队列-多消费者
- 生产者-交换机-队列-消费者
- 交换机模式 fanout 广播所有消费者
- Direct 指定routekey,匹配消费者发布
- Topic 指定routekey,指定通配符的形式,匹配消费者发布
生产者:
生产者连接到RabbitMQ Broker,建立一个连接(Connection)开启一个信道(Channel)
生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
生产者通过路由键将交换器和队列绑定起来
生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
相应的交换器根据接收到的路由键查找相匹配的队列如果找到,则将从生产者发送过来的消息存入相应的队列中如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
关闭信道
关闭连接
消费者:
消费者连接到RabbitMQ Broker ,建立一个连接(Connection),开启一个信道(Channel)
消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数
等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息
消费者确认(ack) 接收到的消息
RabbitMQ从队列中删除相应己经被确认的消息
关闭信道
关闭连接信道
信道
TCP连接的“昂贵”性
假设消费者要消费消息,并根据服务需求合理调度线程,若只进行TCP连接,那么当高并发的时候,每秒可能都有成千上万的TCP连接,不仅仅是对TCP连接的浪费,也很快会超过操作系统每秒所能建立连接的数量。 如果能在一条TCP连接上操作,又能保证各个线程之间的私密性就完美了,于是信道的概念出现了。
信道是建立在Connection之上的虚拟连接。当应用程序与Rabbit Broker建立TCP连接的时候,客户端紧接着可以创建一个AMQP信道(Channel) ,每个信道都会被指派一个唯一的ID。 RabbitMQ 处理的每条AMQP指令都是通过信道完成的。信道就像电缆里的光纤束,一条电缆内含有许多光纤束,允许所有的连接通过多条光线束进行传输和接收。
关于生产者消费者我们需要了解几个概念:
Producer:生产者,即消息投递者一方。
消息:消息一般分两个部分——消息体(payload)和标签。标签用来描述这条消息,如:一个交换器的名称或者一个路由Key,Rabbit通过解析标签来确定消息的去向,payload是消息内容可以使一个json、数组等等。
Consumer:消费者,就是接收消息的一方。消费者订阅RabbitMQ的队列,当消费者消费一条消息时,只是消费消息的消息体。在消息路由的过程中,会丢弃标签,存入到队列中的只有消息体。
Broker:消息中间件的服务节点
Queue:队列,是RabbitMQ的内部对象,用于存储消息。RabbitMQ中消息只能存储在队列中,生产者投递消息到队列,消费者从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(轮询)给多个消费者进行 消费,而不是每个消费者都收到所有的消息进行消费。
注意:RabbitMQ不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑定多个队列,由多个消费者来订阅这些队列的方式。
Exchange:交换器。在RabbitMQ中,生产者并非直接将消息投递到队列中。真实情况是,生产者将消息发送到Exchange(交换器),由交换器将消息路由到一个或多个队列中。如果路由不到,或返回给生产者,或直接丢弃,或做其它处理。
RoutingKey:路由Key。生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。这个路由Key需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。在交换器类型和绑定键固定的情况下, 生产者可以在发送消息给交换器时通过指定RoutingKey来决定消息流向哪里。
Binding:RabbitMQ通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键,这样RabbitMQ就可以指定如何正确的路由到队列了。
从这里我们可以看到在RabbitMQ中交换器和队列实际上可以是一对多,也可以是多对多关系。交换器和队列就像我们关系数据库中的两张表。它们同归BindingKey做关联(多对多关系表)。在我们投递消息时,可以通过Exchange和RoutingKey (对应BindingKey)就可以找到相对应的队列。
了解协议 AMQP
https://baike.baidu.com/item/AMQP/8354716?fr=aladdin
了解队列
应用场景
https://www.cnblogs.com/jajian/p/10257555.html
-
复杂系统的解耦 系统A产生核心数据需要同步成百个系统,由队列路由转发就可以了。
-
复杂链路的异步调用
-
瞬时高峰的削峰处理
RabbitMQ基础知识 https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
参考: https://segmentfault.com/a/1190000012016574
https://www.cnblogs.com/oskyhg/p/8521705.html
RabbitMQ持久化消息 RabbitMQ实现持久化消息需满足以下3个条件:
delivery_mode=2 使用durable=True声明exchange是持久化 使用durable=True声明queue是持久化
delivery_mode delivery_mode=2指明message为持久的. delivery_mode 投递消息模式 1 . ram 2 . disc 设置为disc后能从AMQP服务器崩溃中恢复消息–持久化 但效率比 ram:disc = 3:1 durable durable (默认false) rabbitmq重启后queue和Exchange会被清除,包括数据。
basic.qos函数来进行流量控制:$channel->basic_qos(null, 1, null); 第二个参数prefetch_count来控制每次消费数量。
代码参考:https://github.com/WalkingSun/Jump/blob/master/commands/RabbitmqController.php
各大队列使用优缺:
- RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此综合考虑过后,公司采取了RabbitMQ
- Kafka的优势在于专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计
- RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。
持久化 持久化
ack 业务消费服务器收到队列消息宕机,队列系统已经删了数据,如果是减库存的操作,这个单子就会被遗漏,丢失
性能优化
rabbitmq 队列除了发布和消费吞吐量外,有个评价MQ队列效率的重要指标 Consumer utillsation
在最佳利用率情况下,这个值能够达到100%,并且生产中总是应该达到100%,为了达到这个利用率,需要增加消费者的数量。
但在很多系统中,这个值可能低于5%。这一般是由于下列原因所致
- 消费者太少;
- 消费端的ack太慢;
- 消费者太多;
- 非自动ack;
- 增加fetch值(这个根据我们的实际经验,lan内影响很有限)