Python celery.schedules 模块,crontab() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用celery.schedules.crontab()

项目:gitmate-2    作者:GitMateIO    | 项目源码 | 文件源码
def scheduler(cls,
                  interval: (crontab, float),
                  *args,
                  queue: Enum = TaskQueue.SHORT,
                  **kwargs):  # pragma: no cover
        """
        Registers the decorated function as a periodic task. The task should
        not accept any arguments.

        :param interval:    Periodic interval in seconds as float or crontab
                            object specifying task trigger time. See
                            http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab
        :param queue:       Queue to use for the scheduled task.
        :param args:        Arguments to pass to scheduled task.
        :param kwargs:      Keyword arguments to pass to scheduled task.
        """
        def _wrapper(function: Callable):
            task = celery.task(function,
                               base=ExceptionLoggerTask,
                               queue=queue.value)
            celery.add_periodic_task(interval, task.s(), args, kwargs)
            return function
        return _wrapper
项目:celery-beatx    作者:mixkorshun    | 项目源码 | 文件源码
def decode_schedule(obj):
    if obj is None:
        return None

    _type = obj['__type__']
    value = obj['__value__']

    if _type == 'datetime':
        return decode_datetime(value)
    elif _type == 'crontab':
        return crontab(*value.split('\t'))
    elif _type == 'solar':
        return solar(**value)
    elif _type == 'schedule':
        return schedule(**value)
    else:
        raise NotImplementedError(
            'Cannot deserialize schedule %(type)s type' % {
                'type': _type
            }
        )
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def install_default_entries(self, data):
        entries = {}
        if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
            # Add backend clean up
            entries.setdefault(
                'celery.backend_cleanup', {
                    'task': 'celery.backend_cleanup',
                    'schedule': schedules.crontab('0', '4', '*')
                }
            )

        self.update_from_dict(entries)
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def schedule(self):
        return schedules.crontab(
            minute=self.minute,
            hour=self.hour,
            day_of_week=self.day_of_week,
            day_of_month=self.day_of_month,
            month_of_year=self.month_of_year
        )
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def schedule(self):
        if self.crontab:
            return self.crontab.schedule
        if self.interval:
            return self.interval.schedule
项目:gitmate-2    作者:GitMateIO    | 项目源码 | 文件源码
def scheduled_responder(cls,
                            plugin: str,
                            interval: (crontab, float),
                            queue: Enum = TaskQueue.SHORT,
                            **kwargs):
        """
        Registers the decorated function as responder and register
        `run_plugin_for_all_repos` as periodic task with plugin name and
        a responder event as arguments.

        :param plugin: Name of plugin with which responder will be registered.
        :param interval: Periodic interval in seconds as float or crontab
                object specifying task trigger time.
                See http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab
        :param queue: Queue to use for the scheduled_responder's tasks.
        :param kwargs: Keyword arguments to pass to `run_plugin_for_all_repos`.

        >>> from gitmate_hooks.utils import ResponderRegistrar
        >>> @ResponderRegistrar.scheduled_responder('test', 10.0)
        ... def test_responder(igitt_repo):
        ...     print('Hello, World!')

        This will register a `test.test_responder` responder and schedule
        `run_plugin_for_all_repos` with arguments `('test',
        'test.test_responder')` with 10 seconds interval.
        """
        def _wrapper(function: Callable):
            action = '{}.{}'.format(plugin, function.__name__)
            periodic_task_args = (plugin, action)
            function = cls.responder(plugin, action)(function)
            task = celery.task(run_plugin_for_all_repos,
                               base=ExceptionLoggerTask,
                               queue=queue.value)
            celery.add_periodic_task(
                interval, task.s(), periodic_task_args, kwargs)
            return function
        return _wrapper
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def schedule(self):
        return schedules.crontab(minute=self.minute,
                                 hour=self.hour,
                                 day_of_week=self.day_of_week,
                                 day_of_month=self.day_of_month,
                                 month_of_year=self.month_of_year)
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def __str__(self):
        fmt = '{0.name}: {0.crontab}'
        return fmt.format(self)
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def schedule(self):
        if self.crontab:
            return self.crontab.schedule
        if self.interval:
            return self.interval.schedule
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def install_default_entries(self, data):
        entries = {}
        if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
            entries.setdefault(
                'celery.backend_cleanup', {
                    'task': 'celery.backend_cleanup',
                    'schedule': schedules.crontab('*/5', '*', '*'),
                    'options': {'expires': 12 * 3600},
                },
            )
        self.update_from_dict(entries)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def install_default_entries(self, data):
        entries = {}
        if self.app.conf.result_expires:
            entries.setdefault(
                'celery.backend_cleanup', {
                    'task': 'celery.backend_cleanup',
                    'schedule': schedules.crontab('0', '4', '*'),
                    'options': {'expires': 12 * 3600},
                },
            )
        self.update_from_dict(entries)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def schedule(self):
        return schedules.crontab(minute=self.minute,
                                 hour=self.hour,
                                 day_of_week=self.day_of_week,
                                 day_of_month=self.day_of_month,
                                 month_of_year=self.month_of_year,
                                 nowfun=lambda: make_aware(now()))
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def validate_unique(self, *args, **kwargs):
        super(PeriodicTask, self).validate_unique(*args, **kwargs)
        if not self.interval and not self.crontab and not self.solar:
            raise ValidationError({
                'interval': [
                    'One of interval, crontab, or solar must be set.'
                ]
            })
        if self.interval and self.crontab and self.solar:
            raise ValidationError({
                'crontab': [
                    'Only one of interval, crontab, or solar must be set'
                ]
            })
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def __str__(self):
        fmt = '{0.name}: {{no schedule}}'
        if self.interval:
            fmt = '{0.name}: {0.interval}'
        if self.crontab:
            fmt = '{0.name}: {0.crontab}'
        if self.solar:
            fmt = '{0.name}: {0.solar}'
        return fmt.format(self)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def schedule(self):
        if self.interval:
            return self.interval.schedule
        if self.crontab:
            return self.crontab.schedule
        if self.solar:
            return self.solar.schedule
项目:settei    作者:spoqa    | 项目源码 | 文件源码
def test_worker_schedule():
    # timedelta
    conf = WorkerConfiguration({
        'worker': {
            'broker_url': 'redis://',
            'celery_result_backend': 'redis://',
            'celerybeat_schedule': {
                'add-every-30-seconds': {
                    'task': 'tasks.add',
                    'schedule': 'timedelta(seconds=30)',
                },
            },
        }
    })
    assert conf.worker_schedule == {
        'add-every-30-seconds': {
            'task': 'tasks.add',
            'schedule': datetime.timedelta(seconds=30),
            'args': (),
        },
    }
    assert conf.worker_config['CELERYBEAT_SCHEDULE'] == conf.worker_schedule
    # crontab
    conf2 = WorkerConfiguration({
        'worker': {
            'broker_url': 'redis://',
            'celery_result_backend': 'redis://',
            'celerybeat_schedule': {
                'add-every-minute': {
                    'task': 'tasks.add',
                    'schedule': "crontab(minute='*')",
                    'args': [16, 16],
                },
            },
        }
    })
    assert conf2.worker_schedule == {
        'add-every-minute': {
            'task': 'tasks.add',
            'schedule': crontab(minute='*'),
            'args': (16, 16),
        },
    }
    assert conf2.worker_config['CELERYBEAT_SCHEDULE'] == conf2.worker_schedule
    # import path
    conf3 = WorkerConfiguration({
        'worker': {
            'broker_url': 'redis://',
            'celery_result_backend': 'redis://',
            'celerybeat_schedule': {
                'add-every-minute': {
                    'task': 'tasks.add',
                    'schedule': "celery.schedules:crontab(minute='*')",
                    'args': [16, 16],
                },
            },
        }
    })
    assert conf3.worker_schedule == conf2.worker_schedule
    assert conf3.worker_config['CELERYBEAT_SCHEDULE'] == conf3.worker_schedule
项目:celery-beatx    作者:mixkorshun    | 项目源码 | 文件源码
def encode_schedule(value):
    if value is None:
        return None
    elif isinstance(value, datetime):
        return {
            '__type__': 'datetime',
            '__value__': encode_datetime(value)
        }
    elif isinstance(value, crontab):
        return {
            '__type__': 'crontab',
            '__value__': '%(minute)s\t%(hour)s\t%(day_of_week)s\t'
                         '%(day_of_month)s\t%(month_of_year)s' % {
                             'minute': value._orig_minute,
                             'hour': value._orig_hour,
                             'day_of_week': value._orig_day_of_week,
                             'day_of_month': value._orig_day_of_month,
                             'month_of_year': value._orig_month_of_year,
                         }
        }
    elif isinstance(value, solar):
        return {
            '__type__': 'solar',
            '__value__': {
                'event': value.event,
                'lat': value.lat,
                'lon': value.lon
            }
        }
    elif isinstance(value, schedule):
        return {
            '__type__': 'schedule',
            '__value__': {
                'run_every': value.run_every.total_seconds(),
                'relative': bool(value.relative),
            }
        }
    else:
        raise NotImplementedError(
            'Cannot serialize schedule %(type)s type' % {
                'type': type(value).__name__
            }
        )