Python celery 模块,shared_task() 实例源码

我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用celery.shared_task()

项目:django-tmpl    作者:jarrekk    | 项目源码 | 文件源码
def shared_task_email(func):
    """
    Replacement for @shared_task decorator that emails admins if an exception is raised.
    """
    @wraps(func)
    def new_func(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except:
            subject = "Celery task failure"
            message = traceback.format_exc()
            mail_admins(subject, message)
            raise
    return shared_task(new_func)
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def shared_task(f):
        f.delay = f
        return f
项目:django-remote-submission    作者:ornl-ndav    | 项目源码 | 文件源码
def shared_task(func):
        """Naive wrapper in case Celery does not exist."""
        def delay(*args, **kwargs):
            return func(*args, **kwargs)

        func.delay = delay
        return func
项目:api-django    作者:lafranceinsoumise    | 项目源码 | 文件源码
def create_geocoder(model):
    def geocode_model(pk):
        try:
            item = model.objects.get(pk=pk)
        except model.DoesNotExist:
            return

        if geocode_element(item):
            item.save()

    geocode_model.__name__ = "geocode_{}".format(model.__name__.lower())

    return shared_task(geocode_model)