Python异步任务队列

在开发的过程中,有时会需要用到类似下面的这些操作

  • 用户注册时发送认证邮件
  • 带有Web界面的爬虫
  • 定时计划任务

这些任务的共同特点是执行所需的时间较长,但是我们又不希望其阻塞后续的操作。因此我们将这些任务放进任务队列里来运行。

Python常见的异步任务队列实现有功能较丰富的Celery和轻量级的RQ,本文以celery为例。

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统。
Celery经常和RabbitMQ同时提起,但实际上,RabbitMQ和celery并不是同一层面的东西。Celery需要存储介质来存储任务(称为broker),可选的broker有RabbitMQ, redis, mysql, mongodb等。

声明任务

将作为celery任务的函数使用@app.task修饰器进行修饰。其中@app是一个celery实例。

#tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost/')

@app.task
def celery_task(x, y):
    return x + y

启动worker

使用如下命令启动单个worker,其中tasks表示包含celery任务的python模块

celery -A tasks worker --loglevel=info

启动后出现如下界面,表示celery已经开始监听任务。

或使用celery multi命令启动多个worker

celery multi start worker1 worker2 worker3 worker4 worker5 -A tasks -l info -c10 --pidfile=pid/task%n.pid --logfile=log/task%n.log

如果需要关闭worker,将上面命令中的start换成stopwait

调用celery任务

from tasks import celery_task
celery_task.apply_async(args=(arg,))
#或
celery_task.delay(arg)

广播任务

正常情况下,一个任务只能有一个消费者,也就是说,当一个任务被一个worker取走后,这个任务就在队列中不再存在了,其他worker无法消费该任务。如果需要让一个任务被所有worker执行,就需要用到广播。
官方文档中称RabbitMQ和Redis均可支持广播。但是,广播在使用Redis作为broker时会遇到一系列莫名其妙的问题。因此,如果需要使用广播,请尽量使用RabbitMQ作为broker。

首先我们需要配置一条队列作为广播队列。

from celery import Celery
from kombu.common import Broadcast

celery = Celery('tasks', broker='amqp://guest:guest@localhost//')
celery.conf.update(
    CELERY_QUEUES=(
        Broadcast('broadcast'), #此处设置消息队列broadcast为广播模式,及该队列上的消息会发送至所有监听它的worker
    ),
    CELERY_ROUTES={
        '*': {
            'queue': 'broadcast'
        }
    }
)

需要执行任务时,将队列指定为刚才配置的广播队列即可。

celery_task.apply_async(args=(arg,),queue='broadcast')

终止任务

celery_task.revoke(terminate=True)

如果上述命令不能有效地终止任务,可以添加一个signal='SIGKILL'参数。如果不加这个参数,默认发送的信号是SIGTERM

celery_task.revoke(terminate=True,signal='SIGKILL')

RQ

除了celery之外,我们还有一种更轻量级的选择。
RQ(Redis Queue)是一个基于Redis的轻量级任务队列库,可以轻松地与Python应用进行集成。
它的用法非常简单
创建一个任务

import requests
from redis import Redis
from rq import Queue

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

q = Queue(connection=Redis())
result = q.enqueue(count_words_at_url, 'http://nvie.com')

在项目目录下启动一个worker,以开始执行已入队的任务。

rq worker

此时rq已经在后台监听新任务,运行上面的python脚本,即可得到如下的结果:

*** Listening for work on default
Got count_words_at_url('http://nvie.com') from default
Job result = 818
*** Listening for work on default
Show CommentsClose Comments

Leave a comment