小编典典

初始化具有不同值的不同Celery Workers

flask

我正在使用celery在Hadoop上运行长时间运行的任务。每个任务都会在Hadoop上执行Pig脚本,该脚本运行大约30分钟-2小时。

我当前的Hadoop设置有4个队列a,b,c和默认队列。当前,所有任务都由一个工人执行,该工人将作业提交到单个队列中。

我想再添加3个将作业提交到其他队列的工作程序,每个队列一个工作程序。

问题是队列当前是硬编码的,我希望为每个工作人员设置此变量。

我进行了很多搜索,但无法找到一种方法来传递每个Celery Workers不同的队列值并在任务中访问它。

我像这样开始我的Celery Workers。

celery -A app.celery worker

我希望在命令行本身中传递一些其他参数,并在我的任务中访问它,但是celery抱怨它不理解我的自定义参数。

我计划通过设置–concurrency=3参数在同一主机上运行所有工作线程。有什么解决办法吗?

当前场景是这样的。我每次尝试执行任务print_something时都说tasks.print_something.delay()只打印队列C。

@celery.task()
def print_something():
    print "C"

我需要让工人根据我在启动时传递给他们的值来打印可变字母。

@celery.task()
def print_something():
    print "<Variable Value Per Worker Here>"

阅读 884

收藏
2020-04-07

共2个答案

小编典典

第一步涉及在celery中添加对自定义参数的支持。如果不这样做,celery将抱怨它不理解该参数。

由于我用Flask运行celery,所以我像这样初始化celery。

def configure_celery():
    app.config.update(
        CELERY_BROKER_URL='amqp://:@localhost:5672',
        RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'            
    )
    celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

我调用此函数来初始化celery并将其存储在一个名为celery的变量中。

celery = configure_celery()

要添加自定义参数,你需要执行以下操作。

def add_hadoop_queue_argument_to_worker(parser):
    parser.add_argument(
        '--hadoop-queue', help='Hadoop queue to be used by the worker'
    )

下面使用的celery是我们从上述步骤中获得的celery。

celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker)

下一步将是使该参数在工作程序中可访问。为此,请按照下列步骤操作。

class HadoopCustomWorkerStep(bootsteps.StartStopStep):

    def __init__(self, worker, **kwargs):
        worker.app.hadoop_queue = kwargs['hadoop_queue']

通知celery使用此类来创建工人。

celery.steps['worker'].add(HadoopCustomWorkerStep)

现在,任务应该能够访问变量了。

@app.task(bind=True)
def print_hadoop_queue_from_config(self):
    print self.app.hadoop_queue

通过在命令行上运行worker进行验证。

celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n aworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n bworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n cworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n defaultworker@%h
2020-04-07
小编典典

我通常要做的是,在另一个脚本(例如manage.py)中启动工作程序(未执行任务)后,我添加带有参数的命令以启动特定任务或具有不同参数的任务。

在manager.py中:

from tasks import some_task

@click.command
def run_task(params):
    some_task.apply_async(params)

这将根据需要启动任务。

2020-04-07