折腾Open edX趣味之一是它很杂。

大量的工具和架构设计在这里都有具体的场景,于是从使用中学习,先跑起来,然后修修改改,如果系统没有坏掉,就继续探索。

我很喜欢这种增量式学习,在这里以实战的方式学习linux command 、数据库、Python 、require.js、任务队列、缓存、REST、OAuth…

今天是Celery

#Celery入门 ##What 我喜欢将Celery视为执行异步任务的工具,这个观念让我容易把握它。

Celery是一个专注于实时处理的任务队列,同时也支持任务调度。

Celery 是用 Python 编写的,但协议可以用任何语言实现

###典型使用场景 * 我们可能需要执行一段费时的任务, 这一时间远超用户能忍受的范围. 当这一任务不需要立即执行时, 我们就可以使用Celery在后台执行, 而不影响当前用户浏览网页 * 定期执行某些任务,类似linux中定时任务:crontab. 比如每天半夜2点执行一下数据分析, 然后将数据储存到数据库中. 我们可以编写这一任务, 然后让Celery来定时完成执行任务.

上述的任务即是task,对应为一段python代码

##基础概念 * 任务队列:是一种在线程或机器间分发任务的机制。 * Broker:中文意思是 经纪人 ,其实就是消息队列 ,用来发送和接受消息。这个Broker有几个方案可供选择:RabbitMQ,Redis(丢数据),数据库(不推荐) * Backend:在Celery的配置中的一个配置项 CELERY_RESULT_BACKEND ,作用是保存结果和状态,如果你需要跟踪任务的状态,那么需要设置这一项

Broker和Backend可以都使用redis

##一个案例 ###安装依赖 * pip install celery * sudo apt-get install rabbitmq-server(启动:rabbitmq-server)

###创建tasks.py

1
2
3
4
5
6
7
8
from celery import Celery

app = Celery('tasks', broker='amqp://celery:celery@localhost//') #edx环境中rabbitmq已经默认安装,用户名和密码均为celery:celery

@app.task
def add(x, y):
        print "before running x+y"
        return x + y

###启动一个worker 在tasks.py同级目录下执行:celery -A tasks worker --loglevel=info

在生产环境中一般使用supervisor把它丢到后台运行

###调度任务 使用ipython进入交互模式

1
2
3
:::text
>>> from tasks import add
>>> add.delay(1, 2)

我们可以在worker进程中看到

1
2
3
4
5
6
7
8
9
:::text
[2015-11-23 16:13:26,341: INFO/MainProcess] Connected to amqp://celery:**@127.0.0.1:5672//
[2015-11-23 16:13:26,355: INFO/MainProcess] mingle: searching for neighbors
[2015-11-23 16:13:27,379: INFO/MainProcess] mingle: sync with 7 nodes
[2015-11-23 16:13:27,379: INFO/MainProcess] mingle: sync complete
[2015-11-23 16:13:27,387: WARNING/MainProcess] celery@vultr.guest ready.
[2015-11-23 16:13:30,303: INFO/MainProcess] Received task: tasks.add[af9f8162-5af0-4cd3-846b-4b6d4da3a4fc]
[2015-11-23 16:13:30,303: WARNING/Worker-1] before running x+y
[2015-11-23 16:13:30,304: INFO/MainProcess] Task tasks.add[af9f8162-5af0-4cd3-846b-4b6d4da3a4fc] succeeded in 0.000959093216807s: 3

上边的例子有返回值(执行结果),我们为了知道任务的执行结果和状态。需要设置backend。于是变为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from celery import Celery
from time import sleep
app = Celery('tasks', broker='amqp://celery:celery@localhost//')

app.conf.update(
        CELERY_RESULT_BACKEND='cache+memcached://127.0.0.1:11211/',
            )

@app.task
def add(x, y):
        print "before running x+y"
        sleep(10)
        return x + y

进入ipython

1
2
3
4
5
6
7
:::text
from tasks import add
r=add.delay(1, 2)
r.ready() #False
r.result #为空,等十秒
r.ready() #True
r.result #3

##分布式 Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。

具体可参考:

#Celery在Open edX中的使用 在Open edX中,使用RabbitMQ作为Broker,默认的用户名和密码都为celery,

用CACHES作为Celery的backend(”django.core.cache.backends.memcached.MemcachedCache”)

具体的使用场景主要是各类耗时的任务,典型的有成绩单生成,发送大量提醒邮件,任务的源码分布在各个django app的tasks.py文件里,诸如

celery的启用代码

参考/edx/app/supervisor/conf.d/workers.conf

形如:

1
2
:::text
command=/edx/app/edxapp/venvs/edxapp/bin/python /edx/app/edxapp/edx-platform/manage.py cms --settings=aws celery worker --loglevel=info --queues=edx.cms.core.low --hostname=edx.cms.core.low.%%h --concurrency=1

#参考 * celery/celery * Using Celery with Django * Celery - Distributed Task Queue * Celery - 分布式任务队列 * 基于 Celery 的后台任务 * Celery的安装和使用 * 任务调度利器:Celery


补遗

###相关工具与主题 * 监控 * get task result * _update_instructor_task AsyncResult