我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用celery.chord()。
def loads(payload): if payload.get('type') != 'normal': raise Exception('celery task loader only support normal mode') tasks = payload.get('tasks', []) cts = [] for task in tasks: ops = [load(id, task.get('args'), task.get('on_error')) if i == 0 else load(id, None, task.get('on_error')) for i, id in enumerate(task['ids'])] cts.append(chain(ops)) callback = payload.get('callback') if callback: return chord(header=group(cts), body=func.load(callback).s()) return group(cts)
def test_create_chord_exclude_body(self): """If the body task of a chord is not a UserTask, it should be cleanly omitted from the status.""" chord([ sample_task.s(self.user.id, '1', user_task_name='Chord: 1 & 2'), sample_task.s(self.user.id, '2', user_task_name='I should be ignored') ])(normal_task.s('3')) assert UserTaskStatus.objects.count() == 4 chord_status = UserTaskStatus.objects.get(task_class='celery.chord') assert chord_status.task_id assert chord_status.parent is None assert chord_status.is_container assert chord_status.name == 'Chord: 1 & 2' assert chord_status.total_steps == 2 verify_state(chord_status, False) group_status = UserTaskStatus.objects.get(task_class='celery.group') assert group_status.task_id assert group_status.parent_id == chord_status.id assert group_status.is_container assert group_status.name == 'Chord: 1 & 2' assert group_status.total_steps == 2 verify_state(group_status, False) header_tasks = UserTaskStatus.objects.filter(parent=group_status) assert len(header_tasks) == 2 for status in header_tasks: assert status.task_id assert status.parent_id == group_status.id assert not status.is_container assert status.name in ['SampleTask: 1', 'SampleTask: 2'] assert status.total_steps == 1 verify_state(status, False)
def produce_hot_repo_report(period, ref_date=None): # 1. parse date ref_date_str = strf_date(period, ref_date=ref_date) # 2. fetch and join fetch_jobs = group([ fetch_hot_repos.s(ref_date_str, 100, 1), fetch_hot_repos.s(ref_date_str, 100, 2), fetch_hot_repos.s(ref_date_str, 100, 3), fetch_hot_repos.s(ref_date_str, 100, 4), fetch_hot_repos.s(ref_date_str, 100, 5) ]) # 3. group by language and # 4. create csv return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get()
def make_signature(self, info, required_kwargs): """Calculates the required signature to execute a step in the pipeline. :param dict info: info dict generated by pipeline describing a task :param dict required_kwargs: Keyword arguments that :func:`@pipeline` elements might require. :returns: celery.Signature that will run the task as described. Will be celery.chord for map/reduce tasks """ # Avoid circular import - used for map/reduce tasks from .tasks import lazy_async_apply_map new_kwargs = {k: v for k, v in required_kwargs.items() if k in info.get('required_kwarg_names', [])} missing_kwargs = list(set(info.get('required_kwarg_names', [])) - set(new_kwargs.keys())) if missing_kwargs: raise MissingArgument( '{} requires {} keyword arguments specified'.format( info['func'], ', '.join(missing_kwargs), ), ) task = info['func'].s( **new_kwargs ) # Check for mapper mapper_name = info.get('mapper') reducer_name = info.get('reducer') # If only one is defined, this is an error if bool(mapper_name) != bool(reducer_name): raise DependencyError( 'Both mapper and reducer are required together info="{}"'.format(info)) if mapper_name: # implies reducer_name as well # This is a map/reduce task try: mapper = self.mappers[mapper_name] except KeyError: raise DependencyError('Missing mapper "{}"'.format(mapper_name)) try: reducer = self.reducers[reducer_name] except KeyError: raise DependencyError('Missing reducer "{}"'.format(reducer_name)) # lazy_async_apply_map must always be called in a chord for now, see: # https://github.com/celery/celery/issues/2722 task = ( mapper.s(*args, **new_kwargs) | chord(lazy_async_apply_map.s(task), reducer.s(*args, **new_kwargs)) ) return task
def _create_chord(self, eager): """Create a celery chord and verify some assertions about the corresponding status records""" chord([ sample_task.s(self.user.id, '1'), sample_task.s(self.user.id, '2', user_task_name='Chord: 1 & 2, then 3') ])(sample_task.s(self.user.id, '3')) assert UserTaskStatus.objects.count() == 5 chord_status = UserTaskStatus.objects.get(task_class='celery.chord') assert chord_status.task_id assert chord_status.parent is None assert chord_status.is_container assert chord_status.name == 'Chord: 1 & 2, then 3' assert chord_status.total_steps == 3 verify_state(chord_status, eager) group_status = UserTaskStatus.objects.get(task_class='celery.group') assert group_status.task_id assert group_status.parent_id == chord_status.id assert group_status.is_container assert group_status.name == 'Chord: 1 & 2, then 3' assert group_status.total_steps == 2 verify_state(group_status, eager) header_tasks = UserTaskStatus.objects.filter(parent=group_status) assert len(header_tasks) == 2 for status in header_tasks: assert status.task_id assert status.parent_id == group_status.id assert not status.is_container assert status.name in ['SampleTask: 1', 'SampleTask: 2'] assert status.total_steps == 1 verify_state(status, eager) body_status = UserTaskStatus.objects.get(parent=chord_status, is_container=False) assert body_status.task_id assert body_status.name == 'SampleTask: 3' assert body_status.total_steps == 1 verify_state(body_status, eager)