Python django.conf.settings 模块,CELERY_ALWAYS_EAGER 实例源码

我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用django.conf.settings.CELERY_ALWAYS_EAGER

项目:munch-core    作者:crunchmail    | 项目源码 | 文件源码
def worker(**options):
    "Run background worker instance."
    from django.conf import settings
    if hasattr(settings, 'CELERY_ALWAYS_EAGER') and \
            settings.CELERY_ALWAYS_EAGER:
        raise click.ClickException(
            'Disable CELERY_ALWAYS_EAGER in your '
            'settings file to spawn workers.')

    from munch.core.celery import app
    os.environ['WORKER_TYPE'] = ','.join(options.pop('worker_type')).lower()
    pool_cls = options.pop('pool')
    worker = app.Worker(
        pool_cls=pool_cls, queues=settings.CELERY_DEFAULT_QUEUE, **options)
    worker.start()
    try:
        sys.exit(worker.exitcode)
    except AttributeError:
        # `worker.exitcode` was added in a newer version of Celery:
        # https://github.com/celery/celery/commit/dc28e8a5
        # so this is an attempt to be forwards compatible
        pass
项目:django-filer-celery    作者:nephila    | 项目源码 | 文件源码
def setUp(self):
        super(BaseFilerCelery, self).setUp()
        settings.CELERY_ALWAYS_EAGER = True
        current_app.conf.CELERY_ALWAYS_EAGER = True
项目:videofront    作者:openfun    | 项目源码 | 文件源码
def send_task(name, args=None, kwargs=None, **opts):
    """
    Send a task by name. Contrary to app.send_task, this function respects the
    CELERY_ALWAYS_EAGER settings, which is necessary in tests. As a
    consequence, it works only for registered tasks.
    """
    if settings.CELERY_ALWAYS_EAGER:
        task = app.tasks[name] # Raises a NotRegistered exception for unregistered tasks
        return task.apply(args=args, kwargs=kwargs, **opts)
    else:
        return app.send_task(name, args=args, kwargs=kwargs)
项目:ecs    作者:ecs-org    | 项目源码 | 文件源码
def basic_test():
    logger = basic_test.get_logger()
    logger.info("celery is running task, we write to the celery logger, and by the way, CELERY_ALWAYS_EAGER is %s" % str(settings.CELERY_ALWAYS_EAGER))
    return 'success'
项目:munch-core    作者:crunchmail    | 项目源码 | 文件源码
def cron(**options):
    "Run periodic task dispatcher."
    from django.conf import settings
    if hasattr(settings, 'CELERY_ALWAYS_EAGER') and \
            settings.CELERY_ALWAYS_EAGER:
        raise click.ClickException(
            'Disable CELERY_ALWAYS_EAGER in your '
            'settings file to spawn workers.')

    from munch.core.celery import app
    app.Beat(**options).run()
项目:Sentry    作者:NetEaseGame    | 项目源码 | 文件源码
def TaskRunner():
    settings.CELERY_ALWAYS_EAGER = True
    current_app.conf.CELERY_ALWAYS_EAGER = True
    yield
    current_app.conf.CELERY_ALWAYS_EAGER = False
    settings.CELERY_ALWAYS_EAGER = False