Python celery 模块,chord() 实例源码

我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用celery.chord()

项目:falsy    作者:pingf    | 项目源码 | 文件源码
def loads(payload):
    if payload.get('type') != 'normal':
        raise Exception('celery task loader only support normal mode')
    tasks = payload.get('tasks', [])
    cts = []
    for task in tasks:
        ops = [load(id, task.get('args'), task.get('on_error')) if i == 0 else load(id, None, task.get('on_error')) for
               i, id in enumerate(task['ids'])]
        cts.append(chain(ops))
    callback = payload.get('callback')
    if callback:
        return chord(header=group(cts), body=func.load(callback).s())
    return group(cts)
项目:django-user-tasks    作者:edx    | 项目源码 | 文件源码
def test_create_chord_exclude_body(self):
        """If the body task of a chord is not a UserTask, it should be cleanly omitted from the status."""
        chord([
            sample_task.s(self.user.id, '1', user_task_name='Chord: 1 & 2'),
            sample_task.s(self.user.id, '2', user_task_name='I should be ignored')
        ])(normal_task.s('3'))
        assert UserTaskStatus.objects.count() == 4
        chord_status = UserTaskStatus.objects.get(task_class='celery.chord')
        assert chord_status.task_id
        assert chord_status.parent is None
        assert chord_status.is_container
        assert chord_status.name == 'Chord: 1 & 2'
        assert chord_status.total_steps == 2
        verify_state(chord_status, False)

        group_status = UserTaskStatus.objects.get(task_class='celery.group')
        assert group_status.task_id
        assert group_status.parent_id == chord_status.id
        assert group_status.is_container
        assert group_status.name == 'Chord: 1 & 2'
        assert group_status.total_steps == 2
        verify_state(group_status, False)

        header_tasks = UserTaskStatus.objects.filter(parent=group_status)
        assert len(header_tasks) == 2
        for status in header_tasks:
            assert status.task_id
            assert status.parent_id == group_status.id
            assert not status.is_container
            assert status.name in ['SampleTask: 1', 'SampleTask: 2']
            assert status.total_steps == 1
            verify_state(status, False)
项目:toptal-blog-celery-toy-ex    作者:Rustem    | 项目源码 | 文件源码
def produce_hot_repo_report(period, ref_date=None):
    # 1. parse date
    ref_date_str = strf_date(period, ref_date=ref_date)

    # 2. fetch and join
    fetch_jobs = group([
        fetch_hot_repos.s(ref_date_str, 100, 1),
        fetch_hot_repos.s(ref_date_str, 100, 2),
        fetch_hot_repos.s(ref_date_str, 100, 3),
        fetch_hot_repos.s(ref_date_str, 100, 4),
        fetch_hot_repos.s(ref_date_str, 100, 5)
    ])
    # 3. group by language and
    # 4. create csv
    return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get()
项目:capillary    作者:celery-capillary    | 项目源码 | 文件源码
def make_signature(self, info, required_kwargs):
        """Calculates the required signature to execute a step in the pipeline.

        :param dict info: info dict generated by pipeline describing a task
        :param dict required_kwargs: Keyword arguments that :func:`@pipeline`
                                     elements might require.

        :returns: celery.Signature that will run the task as described.
                  Will be celery.chord for map/reduce tasks
        """
        # Avoid circular import - used for map/reduce tasks
        from .tasks import lazy_async_apply_map

        new_kwargs = {k: v for k, v in required_kwargs.items() if k in info.get('required_kwarg_names', [])}

        missing_kwargs = list(set(info.get('required_kwarg_names', [])) - set(new_kwargs.keys()))
        if missing_kwargs:
            raise MissingArgument(
                '{} requires {} keyword arguments specified'.format(
                    info['func'],
                    ', '.join(missing_kwargs),
                ),
            )

        task = info['func'].s(
            **new_kwargs
        )

        # Check for mapper
        mapper_name = info.get('mapper')
        reducer_name = info.get('reducer')
        # If only one is defined, this is an error
        if bool(mapper_name) != bool(reducer_name):
            raise DependencyError(
                'Both mapper and reducer are required together info="{}"'.format(info))

        if mapper_name:  # implies reducer_name as well
            # This is a map/reduce task
            try:
                mapper = self.mappers[mapper_name]
            except KeyError:
                raise DependencyError('Missing mapper "{}"'.format(mapper_name))

            try:
                reducer = self.reducers[reducer_name]
            except KeyError:
                raise DependencyError('Missing reducer "{}"'.format(reducer_name))

            # lazy_async_apply_map must always be called in a chord for now, see:
            # https://github.com/celery/celery/issues/2722
            task = (
                mapper.s(*args, **new_kwargs) |
                chord(lazy_async_apply_map.s(task), reducer.s(*args, **new_kwargs))
            )
        return task
项目:django-user-tasks    作者:edx    | 项目源码 | 文件源码
def _create_chord(self, eager):
        """Create a celery chord and verify some assertions about the corresponding status records"""
        chord([
            sample_task.s(self.user.id, '1'),
            sample_task.s(self.user.id, '2', user_task_name='Chord: 1 & 2, then 3')
        ])(sample_task.s(self.user.id, '3'))
        assert UserTaskStatus.objects.count() == 5
        chord_status = UserTaskStatus.objects.get(task_class='celery.chord')
        assert chord_status.task_id
        assert chord_status.parent is None
        assert chord_status.is_container
        assert chord_status.name == 'Chord: 1 & 2, then 3'
        assert chord_status.total_steps == 3
        verify_state(chord_status, eager)

        group_status = UserTaskStatus.objects.get(task_class='celery.group')
        assert group_status.task_id
        assert group_status.parent_id == chord_status.id
        assert group_status.is_container
        assert group_status.name == 'Chord: 1 & 2, then 3'
        assert group_status.total_steps == 2
        verify_state(group_status, eager)

        header_tasks = UserTaskStatus.objects.filter(parent=group_status)
        assert len(header_tasks) == 2
        for status in header_tasks:
            assert status.task_id
            assert status.parent_id == group_status.id
            assert not status.is_container
            assert status.name in ['SampleTask: 1', 'SampleTask: 2']
            assert status.total_steps == 1
            verify_state(status, eager)

        body_status = UserTaskStatus.objects.get(parent=chord_status, is_container=False)
        assert body_status.task_id
        assert body_status.name == 'SampleTask: 3'
        assert body_status.total_steps == 1
        verify_state(body_status, eager)