博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Celery中文翻译-Application
阅读量:6458 次
发布时间:2019-06-23

本文共 4556 字,大约阅读时间需要 15 分钟。

Celery在使用前必须实例化,称为application或app。app是线程安全的,具有不同配置、组件、task的多个Celery应用可以在同一个进程空间共存。

# 创建Celery应用>>> from celery import Celery>>> app = Celery()>>> app

最后一行文本化显示了Celery应用:包含应用所属类的名称,当前主模块名,以及内存地址。唯一重要的信息是模块名称。

Main Name

在Celery中发送task消息时,该消息仅包含要执行的task的名称。每一个worker维护一个task名称和对应函数的映射,这称为task registry

当定义一个task时,该task将注册到本地:

>>> @app.task... def add(x, y):...     return x + y>>> add<@task: __main__.add>>>> add.name__main__.add>>> app.tasks['__main__.add']<@task: __main__.add>

当Celery无法检测task函数属于哪个模块时,使用main模块名生成初始task名称。

这种方式仅适用于以下两种场景:

  1. 定义task的模块作为程序运行
  2. app在python shell中创建
# tasks.pyfrom celery import Celeryapp = Celery()@app.taskdef add(x, y): return x + yif __name__ == '__main__':    app.worker_main()

如果直接运行tasks.py,task名将以__main__为前缀,但如果tasks.py被其他程序导入,task名将以tasks为前缀。如下:

>>> from tasks import add>>> add.nametasks.add

也可以直接指定主模块名:

>>> app = Celery('tasks')>>> app.main'tasks'>>> @app.task... def add(x, y):...     return x + y>>> add.nametasks.add

Configuration

可以通过直接设置,或使用专用配置模块对Celery进行配置。

通过app.conf属性查看或直接设置配置:

>>> app.conf.timezone'Europe/London'>>> app.conf.enable_utc = True

或用app.conf.update方法一次更新多个配置:

>>> app.conf.update(...     enable_utc=True,...     timezone='Europe/London',...)

config_from_object

app.config_from_object()方法从配置模块或对象中导入配置。需要注意的是:调用config_from_object()方法将重置在这之前配置的任何设置

使用模块名

app.config_from_object()方法接收python模块的完全限定名(fully qualified name)或具体到其中的某个属性名,例如"celeryconfig", "myproj.config.celery", 或"myproj.config:CeleryConfig":

from celery import Celeryapp = Celery()app.config_from_object('celeryconfig')

只要能够正常执行import celeryconfig,app就能正常配置。

使用模块对象

也可以传入一个已导入的模块对象,但不建议这样做。

import celeryconfigfrom celery import Celeryapp = Celery()app.config_from_object(celeryconfig)

更推荐使用模块名的方式,因为这样在使用prefork pool时不需要序列化该模块。如果在实际应用中出现配置问题或序列化错误,请尝试使用模块名的方式。

使用配置类或对象

from celery import Celeryapp = Celery()class Config:    enable_utc = True    timezone = 'Europe/London'app.config_from_object(Config)

config_from_envvar

app.config_from_envvar()方法从环境变量中接收配置模块名。

import osfrom celery import Celery#: Set default configuration module nameos.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')app = Celery()app.config_from_envvar('CELERY_CONFIG_MODULE')

通过环境变量指定配置模块:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info

Censored configuration

如果要显示Celery配置,可能需要过滤某些敏感信息如密码、密钥等。Celery提供了几种用于帮助显示配置的实用方法。

humanize()

该方法返回列表字符串形式的配置,默认只包含改动过的配置,如果要显示内置的默认配置,设置with_defaults参数为True:

>>> app.conf.humanize(with_defaults=False, censored=True)

table()

该方法返回字典形式的配置:

>>> app.conf.table(with_defaults=False, censored=True)

Celery可能不会移除所有的敏感信息,因为它使用正则表达式匹配键并判断是否移除。如果用户添加了包含敏感信息的自定义配置,可以使用Celery可能标记为敏感配置的名称来命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。

Laziness

应用实例是惰性的。

创建Celery实例只会执行以下操作:

  1. 创建用于event的logical clock instance
  2. 创建task registry
  3. 设置为当前应用(除非禁用了set_as_current参数)
  4. 调用app.on_init()回调函数(默认不执行任何操作)

app.task()装饰器不会在task定义时立即创建task,而是在task使用时或finalized应用后创建。

下例说明了在使用task或访问其属性前,都不会创建task:

>>> @app.task>>> def add(x, y):...    return x + y>>> type(add)
>>> add.__evaluated__()False>>> add # <-- causes repr(add) to happen<@task: __main__.add>>>> add.__evaluated__()True

应用的Finalization指显式地调用app.finalize()方法或隐式地访问app.tasks属性。

finalized应用将会:

  1. 复制必须在应用间共享的task。task默认是共享的,但如果禁用了task装饰器的shared属性,将属于应用私有。
  2. 评估所有待处理的task装饰器
  3. 确保所有task绑定到当前应用。将task绑定到某个应用,以便可以从配置中读取默认值。

Breaking the chain

虽然可以依赖于当前应用,但最佳实践是将应用实例传递给任何需要它的对象,这个行为可以称为app chain

# 依赖于当前应用(bad)from celery import current_appclass Scheduler(object):    def run(self):        app = current_app
# 传递应用实例(good)class Scheduler(object):    def __init__(self, app):        self.app = app

在开发模式设置CELERY_TRACE_APP环境变量,可以在应用链断开时抛出异常:

$ CELERY_TRACE_APP=1 celery worker -l info

Abstract Tasks

使用task()装饰器创建的task都继承自celery.app.task模块的Task基类。继承该类可以自定义task类:

from celery import Task# 或者 from celery.app.task import Taskclass DebugTask(Task):    def __call__(self, *args, **kwargs):        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))        return super(DebugTask, self).__call__(*args, **kwargs)

如果要重写__call__()方法,记得调用super。这样在task直接调用时会执行基类的默认事件。

Task基类是特殊的,因为它并未绑定到任何特定的应用。一旦task绑定到应用,它将读取配置以设置默认值等。

  1. 通过base参数指定基类

    @app.task(base=DebugTask)def add(x, y):    return x + y
  2. 通过app.Task属性指定基类

    >>> from celery import Celery, Task>>> app = Celery()>>> class MyBaseTask(Task):...    queue = 'hipri'>>> app.Task = MyBaseTask>>> app.Task
    >>> @app.task... def add(x, y):... return x + y>>> add<@task: __main__.add>>>> add.__class__.mro()[
    >,
    ,
    ,
    ]

转载地址:http://ssizo.baihongyu.com/

你可能感兴趣的文章
memcache数据库和redis数据库的区别(理论)
查看>>
我的友情链接
查看>>
MyBatis+Spring结合
查看>>
Office 365之SkyDrive Pro
查看>>
无缝滚动实现原理分析【公告栏】
查看>>
Java Web 高性能开发
查看>>
CentOS 4.4双网卡绑定,实现负载均衡
查看>>
Scala之柯里化和隐式转换
查看>>
获取androdmanifest里面的meta-data
查看>>
mysql拷贝表的几种方式
查看>>
用设计模式去掉没必要的状态变量 —— 状态模式
查看>>
健忘的正则
查看>>
[转]CMake快速入门教程:实战
查看>>
IntelliJ IDEA创建JavaWeb工程及配置Tomcat部署
查看>>
Markdown用法
查看>>
求最大值及其下标
查看>>
轮播插件swiper.js?
查看>>
网路流24题总结
查看>>
15 个 Android 通用流行框架大全
查看>>
IE8兼容@media和mp4视频的解决方案
查看>>