The result of tag: (3 results)

RabbitMQ消息队列:工作队列

by LauCyun Oct 07,2017 09:35:17 8,884 views

RabbitMQ消息队列:"Hello, World!"中,已经写了一个从已知队列中发送和获取消息的程序。本文将创建一个工作队列(Work Queue),它会发送一些耗时的任务给多个工作者(Worker)。

工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务。

1 准备

之前的RabbitMQ消息队列:"Hello, World!"中,我们发送了一个包含“Hello World!”的字符串消息。现在,我们将发送一些字符串,把这些字符串当作复杂的任务。我们没有真实的例子,例如图片缩放、pdf文件转换。所以使用time.sleep()函数来模拟这种情况。我们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。比如"Hello..."就会耗时3秒钟。

我们对之前教程的producing.py做些简单的调整,以便可以发送随意的消息。这个程序会按照计划发送任务到我们的工作队列中。我们把它命名为new_task.py

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print " [x] Sent %r" % (message,)

我们的旧脚本consuming.py同样需要做一些改动:它需要为消息体中每一个点号(.)模拟1秒钟的操作。它会从队列中获取消息并执行,我们把它命名为worker.py

import time

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"

2 循环调度

使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。

首先,我们先同时运行两个worker.py脚本,它们都会从队列中获取消息,到底是不是这样呢?我们看看。

你需要打开三个终端,两个用来运行worker.py脚本,这两个终端就是我们的两个消费者(consumers)—— C1 和 C2。

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

第三个终端,我们用来发布新任务。你可以发送一些消息给消费者(consumers):

shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

看看到底发送了什么给我们的工作者(workers):

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)

3 消息确认

当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除。这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。

我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。

如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。

消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

消息响应默认是开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')

运行上面的代码,我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。

忘记确认

一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。

为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

4 消息持久化

如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化

首先,为了不让队列消失,需要把队列声明为持久化(durable):

channel.queue_declare(queue='hello', durable=True)

尽管这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫hello的非持久化队列。RabbitMq不允许你使用不同的参数重新定义一个队列,它会返回一个错误。但我们现在使用一个快捷的解决方法——用不同的名字,例如task_queue

channel.queue_declare(queue='task_queue', durable=True)

这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。

这时候,我们就可以确保在RabbitMq重启之后queue_declare队列不会丢失。另外,我们需要把我们的消息也要设为持久化——将delivery_mode的属性设为2

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

注意:消息持久化

将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你需要改写你的代码来支持事务(transaction)。

5 公平调度

你应该已经发现,它仍旧没有按照我们期望的那样进行分发。比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。

这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

channel.basic_qos(prefetch_count=1)

关于队列大小

如果所有的工作者都处理繁忙状态,你的队列就会被填满。你需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略。

6 测试

new_task.py的完整代码:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()

worker.py:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

使用消息响应和prefetch_count你就可以搭建起一个工作队列了。这些持久化的选项使得在RabbitMQ重启之后仍然能够恢复。

7 参考

...

Tags Read More


RabbitMQ消息队列:"Hello, World!"

by LauCyun Sep 24,2017 22:19:34 10,666 views

RabbitMQ是一个消息代理。它的工作就是接收和转发消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ就扮演着邮箱、邮局以及邮递员的角色。

RabbitMQ和邮局的主要区别在于,它处理纸张,而是接收、存储和发送消息(message)这种二进制数据。

下面是RabbitMQ和消息所涉及到的一些术语。

  • 生产(Producing)的意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用"P"来表示:

  • 队列(queue)就是存在于RabbitMQ中邮箱的名称。虽然消息的传输经过了RabbitMQ和你的应用程序,但是它只能被存储于队列当中。实质上队列就是个巨大的消息缓冲区,它的大小只受主机内存和硬盘限制。多个生产者(producers)可以把消息发送给同一个队列,同样,多个消费者(consumers)也能够从同一个队列(queue)中获取数据。队列可以绘制成这样(图上是队列的名称):

  • 在这里,消费(Consuming)和接收(receiving)是同一个意思。一个消费者(consumer)就是一个等待获取消息的程序。我们把它绘制为"C":

接下来我们用Python写两个小程序。一个发送单条消息的生产者(producer)和一个接收消息并将其输出的消费者(consumer)。传递的消息是Hello World

1 前言

下图中,“P”代表生产者,“C”代表消费者,中间的盒子代表为消费者保留的消息缓冲区,也就是我们的队列。

生产者(producer)把消息发送到一个名为hello的队列中。消费者(consumer)从这个队列中获取消息。

RabbitMQ库

RabbitMQ使用的是AMQP 0.9.1协议。这是一个用于消息传递的开放、通用的协议。针对不同编程语言有大量的RabbitMQ客户端可用。在这个系列教程中,RabbitMQ团队推荐使用Pika这个Python客户端。大家可以通过pip这个包管理工具进行安装:

pip install pika

2 生产者

我们第一个程序producing.py会发送一个消息到队列中。首先要做的事情就是建立一个到RabbitMQ服务器的连接。

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host="172.17.0.201",
        port=5672,
        credentials=credentials
    )
)
channel = connection.channel()

现在我们已经跟本地机器的代理建立了连接。

接下来,在发送消息之前,我们需要确认服务于消费者的队列已经存在。如果将消息发送给一个不存在的队列,RabbitMQ会将消息丢弃掉。下面我们创建一个名为hello的队列用来将消息投递进去。

channel.queue_declare(queue="hello", durable=True)  # 队列持久化

这时候我们就可以发送消息了,我们第一条消息只包含了Hello World!字符串,我们打算把它发送到hello队列。

在RabbitMQ中,消息是不能直接发送到队列中的,这个过程需要通过交换机(exchange)来进行。但是为了不让细节拖累我们的进度,这里我们只需要知道如何使用由空字符串表示的默认交换机即可。默认交换机比较特别,它允许我们指定消息究竟需要投递到哪个具体的队列中,队列名字需要在routing_key参数中指定。

channel.basic_publish(
    exchange='',
    routing_key="hello",
    body="hello world!"
    properties=pika.BasicProperties(delivery_mode=2, )
)

在退出程序之前,我们需要确认网络缓冲已经被刷写、消息已经投递到RabbitMQ。通过安全关闭连接可以做到这一点。

print("Sent 'Hello World!'")
connection.close()

发送不成功!

如果这是你第一次使用RabbitMQ,并且没有看到“Sent”消息出现在屏幕上,你可能会抓耳挠腮不知所以。这也许是因为没有足够的磁盘空间给代理使用所造成的(代理默认需要200MB的空闲空间),所以它才会拒绝接收消息。查看一下代理的日志文件进行确认,如果需要的话也可以减少限制。配置文件文档会告诉你如何更改磁盘空间限制(disk_free_limit)。

3 消费者

我们的第二个程序consuming.py,将会从队列中获取消息并将其打印到屏幕上。

这次我们还是需要要先连接到RabbitMQ服务器。连接服务器的代码和之前是一样的。

下一步也和之前一样,我们需要确认队列是存在的。我们可以多次使用queue_declare命令来创建同一个队列,但是只有一个队列会被真正的创建。

channel.queue_declare(queue='hello')

你也许要问: 为什么要重复声明队列呢 —— 我们已经在前面的代码中声明过它了。如果我们确定了队列是已经存在的,那么我们可以不这么做,比如此前预先运行了send.py程序。可是我们并不确定哪个程序会首先运行。这种情况下,在程序中重复将队列重复声明一下是种值得推荐的做法。

列出所有队列

你也许希望查看RabbitMQ中有哪些队列、有多少消息在队列中。此时你可以使用rabbitmqctl工具(使用有权限的用户):

sudo rabbitmqctl list_queues

(在Windows中不需要sudo命令)

rabbitmqctl list_queues

从队列中获取消息相对来说稍显复杂。需要为队列定义一个回调(callback)函数。当我们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。

def callback(ch, method, properties, body):
    print("Received %r" % body)

下一步,我们需要告诉RabbitMQ这个回调函数将会从名为"hello"的队列中接收消息:

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

要成功运行这些命令,我们必须保证队列是存在的,我们的确可以确保它的存在——因为我们之前已经使用queue_declare将其声明过了。

no_ack参数稍后会进行介绍。

最后,我们运行一个用来等待消息数据并且在需要的时候运行回调函数的无限循环。

print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

4 测试

producing.py的完整代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host="172.17.0.201",
        port=5672,
        credentials=credentials
    )
)
channel = connection.channel()

channel.queue_declare(queue="hello", durable=True)  # 队列持久化

channel.basic_publish(
    exchange='',
    routing_key="hello",
    body="hello world!"
    properties=pika.BasicProperties(delivery_mode=2, )
)

print("Sent 'Hello World!'")
connection.close()

consuming.py的完整代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host="172.17.0.201",
        port=5672,
        credentials=credentials
    )
)
channel = connection.channel()

channel.queue_declare(queue="hello", durable=True)  # 队列持久化

def callback(ch, method, properties, body):
    print("Received %r" % body)

channel.basic_consume(
    callback,
    queue="hello",
    no_ack=True
)

print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

现在我们可以在终端中尝试一下我们的程序了。

首先我们启动生产者,生产者程序每次执行后都会停止运行。

$ python producing.py
Sent 'Hello World!'

启动一个消费者,它会持续的运行来等待投递到达。

$ python consuming.py
Waiting for messages. To exit press CTRL+C
Received 'Hello World!'

成功了!我们已经通过RabbitMQ发送第一条消息。你也许已经注意到了,consuming.py程序并没有退出。它一直在准备获取消息,你可以通过Ctrl-C来中止它。

试下在新的终端中再次运行producing.py

我们已经学会如何发送消息到一个已知队列中并接收消息。

5 参考

...

Tags Read More


RabbitMQ消息队列:安装与配置

by LauCyun Sep 22,2017 16:38:44 23,759 views

RabbitMQ是一个在 AMQP 基础上完整的,可复用的企业消息系统。他遵循 Mozilla Public License 开源协议。RabbitMQ相关资料:

本文主要介绍RabbitMQ的安装和基础配置,先介绍一下环境:

  • OS:CentOS 6.5
  • RabbitMQ:3.6.12
  • Erlang:20.0

1 安装Erlang

Erlang是一种通用的面向并发的编程语言,具体介绍:Erlang (programming language) - Wikipedia

方法1(使用Erlang Solutions安装):

Erlang Solutions:https://packages.erlang-solutions.com/erlang/

将Erlang Solutions仓库添加到系统中:

wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
# Erlang Solutions key
rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc

/etc/yum.repos.d/erlang_solutions.repo的内容如下(则Erlang Solutions安装成功):

[erlang-solutions]
name=Centos $releasever - $basearch - Erlang Solutions
baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
gpgcheck=1
gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
enabled=1

安装erlange:

sudo yum install erlang

方法2:

RabbitMQ官网提供Erlang安装包,下载地址:http://www.rabbitmq.com/releases/erlang/

下载好之后,安装下面两个文件:

yum localinstall -y erlang-19.0.4-1.el6.x86_64.rpm
yum localinstall -y esl-erlang-compat-18.1-1.noarch.rpm

方法3(源码安装):

当然,也可以通过源码来安装Erlang,先到www.erlang.org/download.html找到适合自己机器运行的版本,将Erlang下载到本地:

wget http://erlang.org/download/otp_src_20.0.tar.gz

解压并安装:

tar -zxvf otp_src_20.0.tar.gz
cd otp_src_20.0
./configure
make && make install

注意,这里是使用默认的路径进行安装,如有需要可以自行更改。

安装完后输入erl以下提示即为安装成功:

[root@localhost ~]# erl
Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V9.0  (abort with ^G)
1>

2 安装RabbitMQ

首先为了避免各种签名错误,我们把公钥加入可信任的列表:

# centos
rpm --import https://www.rabbitmq.com/rabbitmq-signing-key-public.asc
# ubuntu
wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
apt-key add rabbitmq-signing-key-public.asc

然后我们开始更新和安装RabbitMQ:

# centos
wget https://dl.bintray.com/rabbitmq/rabbitmq-server-rpm/rabbitmq-server-3.6.12-1.el6.noarch.rpm
yum install -y rabbitmq-server-3.6.12-1.el6.noarch.rpm
# ubuntu
apt-get update
apt-get install rabbitmq-server

如果安装出现如下错误:

Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
Setting up Install Process
Examining rabbitmq-server-3.6.12-1.el6.noarch.rpm: rabbitmq-server-3.6.12-1.el6.noarch
Marking rabbitmq-server-3.6.12-1.el6.noarch.rpm to be installed
Resolving Dependencies
--> Running transaction check
---> Package rabbitmq-server.noarch 0:3.6.12-1.el6 will be installed
--> Processing Dependency: socat for package: rabbitmq-server-3.6.12-1.el6.noarch
--> Finished Dependency Resolution
Error: Package: rabbitmq-server-3.6.12-1.el6.noarch (/rabbitmq-server-3.6.12-1.el6.noarch)
           Requires: socat
 You could try using --skip-broken to work around the problem
 You could try running: rpm -Va --nofiles --nodigest

解决方法:

yum -y install socat

此时会报错没有socat包或是找不到socat包,解决方法安装centos的epel的扩展源:

yum -y install epel-release

之后重新安装socat

3 启动RabbitMQ

先看下自己的主机名:hostname,我的主机名是:laucyun

先修改一下 hosts 文件:vim /etc/hosts,添加一行:

127.0.0.1 laucyun

启动:

service rabbitmq-server start

启动一般都比较慢,所以别急

停止:

service rabbitmq-server stop

重启:

service rabbitmq-server restart

设置开机启动:

chkconfig rabbitmq-server on

4 配置RabbitMQ

4.1 修改配置文件rabbitmq.config

查找默认配置位置:find / -name "rabbitmq.config.example",搜索结果是:

$ find / -name "rabbitmq.config.example"
/usr/share/doc/rabbitmq-server-3.6.12/rabbitmq.config.example

复制默认配置:

$ cp /usr/share/doc/rabbitmq-server-3.6.12/rabbitmq.config.example /etc/rabbitmq/

修改配置文件名:

$ cd /etc/rabbitmq
$ mv rabbitmq.config.example rabbitmq.config

编辑配置文件,开启用户远程访问:vim rabbitmq.config

在 64 行:%% {loopback_users, []},(注意:该语句最后有一个逗号,等下是要去掉的),将其改为:{loopback_users, []}

开启Web界面管理:

$ rabbitmq-plugins enable rabbitmq_management

重启RabbitMQ服务:

$ service rabbitmq-server restart

开放防火墙端口:

$ iptables -I INPUT -p tcp -m tcp --dport 15672 -j ACCEPT
$ iptables -I INPUT -p tcp -m tcp --dport 5672 -j ACCEPT
$ service iptables save
$ service iptables restart

浏览器访问:http://192.168.0.149:15672 默认管理员账号:guest默认管理员密码:guest,如图1:


图1 RabbitMQ登录界面

4.2 添加新授权用户

Admin > User > Add a user中添加新用户,如图2:


图2 添加新授权用户

注意:用户的Tags是可以通过下面那行快捷输入的。

同样也可以通过命令行添加新授权用户,如下:

$ rabbitmqctl add_user admin admin
Creating user "admin"
$ rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator]
$ rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/"
$ rabbitmqctl list_users
Listing users
admin   [administrator]

4.3 添加Host

Admin > Virtual Hosts > Add a new virtual host 中添加新Host,如图3:


图3 添加Host

给添加的Host设置权限:


图4 添加的 Host 设置权限

5 参考

...

Tags Read More