RabbitMQ架构

RabbitMQ

RabbitMQ是一个高可用的消息中间件,支持多种协议和集群扩展。并且支持消息持久化和镜像队列,适用于对消息可靠性较高的场合,基本模型如下。

img

其客户端使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
message.ack()
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()

示例中的发布端和消费端是同一方,而实际中的使用方式一般有多种场景,topic模式、fanout模式、direct模式和RPC模式。

img
img

  1. topic模式,按照设置的路由信息(routing key)将消息路由到一个或者多个消费端,而消息只能由一个消费者消费一次。一个消费者可以设置多个路由信息,可以同时获取多个消费者发送的消息;
  2. fanout模式,与topic模式唯一的区别是同一消息会发送到订阅(binding)的多个消费者;
  3. direct模式,一对一模式,实际中比较少用;
  4. RPC模式,结合topic和direct模式,发送消息的同时指定要接受的消息。

RabbitMQ监控树

为了高可靠,Erlang中实际的工作进程(Erlang进程,并不是系统进程)都有一个监控进程,监控进程负责(一个或多个)工作进程的创建、销毁和重启。监控进程和工作进程的关系如图。

img

  1. 方块图是监控进程;
  2. 圆圈是工作进程;
  3. 方块中的”1“(one_for_one)和”a“(one_for_all)代表不同的监控策略

one_for_one 监控策略,一个工作进程崩溃,则只重启崩溃的工作进程。

img

one_for_all监控策略,一个工作进程崩溃,则销毁并重启所有工作进程

img
在RabbitMQ中还有一种simple_one_for_one监控策略,与one_for_one监控策略相同,只不过重启工作进程时的启动参数是固定的。RabbitMQ网络框架也遵循该原则。

img

RabbitMQ消息架构

当client端链接服务器时,RabbitMQ会启动一系列监控和工作进程来处理网络连接。

img
为了降低TCP链接数量,多个消费者共享同一个链接Connection,但是每个消费者独享一个管道channel,用consumer_tag标识。consumer_tag在Connection唯一,从1开始累加,当重连接时需要匹配该tag。每个消费者对应独立的一套rabbit_channel_sup_sup->rabbit_channel_sup->rabbit_channel|rabbit_writer|rabbit_limiter系列进程。

RabbitMQ网络框架时序图

img
client建立链接后,RabbitMQ通过tcp_acceptor进程处理accept成功后返回的clientfd。

img
rabbit_reader从TCP链接中读取数据,然后根据协议回调函数处理客户端的各种请求。

RabbitMQ消息处理流程

img
RabbitMQ先验证权限;然后检查Exchange是否存在,不存在则创建;检查消息是否合法以及是否需要confirm等;根据路由信息选择消费队列;检查消费队列是否存在,有则将消息发送给消息队列;检查消费者是否存在,存在则将消息发送给消费者client端。

img
RabbitMQ会根据不同的消息的不同类型做不同的处理:

  1. 不持久化消息,如果没有消费者则直接丢掉,不会入消费队列;如果有,则先入消息队列,按照入队顺序依次发送给消费者。
  2. 持久化消息,将消息持久化成功后才给发送端发ack,然后再发送给消费者。

(完)