RabbitMQ 学习
一、概要
一个不错的入门教程: http://blog.csdn.net/linvo/article/details/5750987 写的挺好的,只是刚开始看可能不太懂,模模糊糊,多看几遍,试着写点代码之后,再看。就比较清晰了。
官方文档使用了 using the pika 0.9.8 Python client 。本文使用 http://github.com/celery/py-amqp amqp 1.4.6
至于安装,自己找下教程吧。不难,先安装 Erlang,再安装RabbitMQ。然后配置一下,有个web控制台。之后就是python编程使用了。
再加一个不错的中文资料: http://blog.chinaunix.net/topic/surpershi/
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。
Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 producer:消息生产者,就是投递消息的程序。 consumer:消息消费者,就是接受消息的程序。 channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
消息队列的使用过程大概如下:
(1)客户端连接到消息队列服务器,打开一个channel。 (2)客户端声明一个exchange,并设置相关属性。 (3)客户端声明一个queue,并设置相关属性。 (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。 (5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了 routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符 号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还 有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分: (1)exchange持久化,在声明时指定durable => 1 (2)queue持久化,在声明时指定durable => 1 (3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
二、基本使用
入门教程看会之后,就差不多了。
下面示例代码:
consumer 消费者
# amqp_consumer.py
# -*- coding: utf-8 -*-
__author__ = 'lpe234'
__date__ = '2014-12-15'
import amqp
conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
chan = conn.channel()
chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)
chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False, )
chan.queue_bind(queue="po_box", exchange="sorting_room", routing_key="1111")
def receive_callback(msg):
print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)
chan.basic_consume(queue='po_box', no_ack=True, callback=receive_callback, consumer_tag="consumer")
while True:
chan.wait()
chan.basic_cancel("consumer")
chan.close()
conn.close()
producer 生产者
# amqp_publisher.py
# -*- coding: utf-8 -*-
__author__ = 'lpe234'
__date__ = '2014-12-15'
import amqp
import json
conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
chan = conn.channel()
for x in xrange(10):
msg = json.dumps({'id': str(x)+'111', 'lists': [{'id': 12345}, {'id': 12345}, {'id': 15656}, {'id': '4545'}, ]})
print msg
msg = amqp.Message(msg)
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg, exchange="sorting_room", routing_key="1111")
chan.close()
conn.close()
代码基本都是在 csdn 那个博客里面弄下来的。稍微的修改了以下。
启动时,先运行 consumer 消费者进程,它会先连接, 并创建 Queue和 Exchange ,然后一直等待队列中的消息。
然后,启动 publisher ,它会先连接,然后向指定 Exchange 交换机推送带有特定 routing_key 路由键的消息。
如果消费者对应的 Queue 队列与 Exchange 交换机 的 routing_key 路由键 相对应的话。那么消费者就会接收到相应消息。至此,整个传递过程结束。
三、补充
注释代码
# -*- coding: utf-8 -*-
__author__ = 'lpe234'
__date__ = '2014-12-15'
import amqp
"""
amqp rabbitmq DEMO测试
先启动 amqp_consumer.py 消费者,创建
"""
conn = amqp.Connection(host='localhost:5672', userid='guest', password='guest', virtual_host='/', insist=False)
# 每个channel都被分配了一个整数标识,自动由Connection()类的.channel()方法维护。可以使用.channel(x)来指定channel标识。
chan = conn.channel(channel_id=1)
# 当多个 channel_id 相同时,实际为同一 channel
# 现在已经有了一个可用的连接和channel。
# 现在将代码分为两类,生产者(producer)和消费者(consumer)。
# 创建一个消费者程序,会创建一个"po_box"的队列和一个叫"sorting_room"的交换机。
chan.queue_declare(queue='po_box', durable=True, exclusive=False, auto_delete=False)
chan.exchange_declare(exchange='sorting_room', type='direct', durable=True, auto_delete=False)
# 创建了"po_box" 的队列,durable重启之后会重新建立,auto_delete=False最后一个消费者断开之后不会自动删除,exclusive私有队列
# 创建了"sorting_room"的交换机,type指定交换机类型,
# 现在已经有了一个可以接收消息的队列和一个可以发送消息的交换机。不过还需要创建一个绑定
chan.queue_bind(queue='po_box', exchange='sorting_room', routing_key='jason')
# 这个绑定非常直接,任何送到交换机"sorting_room"的具有路由键"jason"的消息都被路由到"po_box" 队列
# 现在有两个方法,从队列中取出消息。
# 第一个是调用 chan.basic_get(), 主动从队列中拉出下一条消息(若没有则返回 None)
# msg = chan.basic_get(queue='po_box')
# if msg:
# print msg.body
# chan.basic_ack(msg.delivery_tag)
# 第二种
def receive_callback(msg):
print msg.body
chan.basic_consume(queue='po_box', no_ack=True, callback=receive_callback, consumer_tag='testtag')
while True:
chan.wait()
chan.basic_cancel('testtag')
# chan.wait() 放在无限循环里面,这个函数会等待在队列上,知道下一个消息到达队列。
# chan.basic_cancel() 用来注销该回调函数
# no_ack 这个参数,可以传给 chan.basic_get(), chan.basic_consume。是否等待回馈,
其他的后续再补充吧
- 基于PhalApi的Redis拓展
- PhalApi-Image -- 图像处理
- 为什么微信红包长这样?
- Visual Studio 2008 每日提示(三十六)
- zephir-(6)运算符
- zephir-(12)php函数和异常处理
- phalapi-进阶篇7(使用缓存以及用redis拓展解决实际问题)
- 5个不为人知的Java API使用技巧
- zephir-(3)你的第一个PHP拓展
- zephir-(7)数组
- phalapi-进阶篇5(数据库读写分离以及多库使用)
- phalapi-进阶篇4(notrom进阶以及事务操作)
- 自动机器学习:利用遗传算法优化递归神经网络
- zephir-(8)类和对象1
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PAT (Basic Level) Practice (中文)1006 换个格式输出整数
- PAT (Basic Level) Practice (中文)1008 数组元素循环右移问题
- Ceph分布式存储日常运维管理手册
- MyBatis为了解决二级缓存脏读问题,究竟做了那些骚操作!
- PAT (Basic Level) Practice (中文)1009 说反话
- PAT (Basic Level) Practice (中文)1011 A+B 和 C
- PAT (Basic Level) Practice (中文)1013 数素数
- PAT (Basic Level) Practice (中文)1012 数字分类
- PAT (Basic Level) Practice (中文)1016 部分A+B
- PAT (Basic Level) Practice (中文)1086 就不告诉你
- PAT (Basic Level) Practice (中文)1061 判断题
- 使用IDEA写Python之pytest环境搭建及第一个程序编写
- PAT (Basic Level) Practice (中文)1026 程序运行时间
- PAT (Basic Level) Practice (中文)1091 N-自守数
- PAT (Basic Level) Practice (中文)1007 素数对猜想