Java 类org.apache.camel.processor.aggregate.AggregationStrategy 实例源码

项目:Camel    文件:MulticastDefinition.java   
protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception {
    final AggregationStrategy strategy = createAggregationStrategy(routeContext);

    boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
    boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
    boolean isStreaming = getStreaming() != null && getStreaming();
    boolean isStopOnException = getStopOnException() != null && getStopOnException();
    boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate();

    boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
    ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing);

    long timeout = getTimeout() != null ? getTimeout() : 0;
    if (timeout > 0 && !isParallelProcessing) {
        throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
    }
    if (onPrepareRef != null) {
        onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
    }

    MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing,
                                  threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate);
    return answer;
}
项目:Camel    文件:PollEnrichDefinition.java   
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {

    // if no timeout then we should block, and there use a negative timeout
    long time = timeout != null ? timeout : -1;
    boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint();
    Expression exp = getExpression().createExpression(routeContext);

    PollEnricher enricher = new PollEnricher(exp, time);

    AggregationStrategy strategy = createAggregationStrategy(routeContext);
    if (strategy == null) {
        enricher.setDefaultAggregationStrategy();
    } else {
        enricher.setAggregationStrategy(strategy);
    }
    if (getAggregateOnException() != null) {
        enricher.setAggregateOnException(getAggregateOnException());
    }
    if (getCacheSize() != null) {
        enricher.setCacheSize(getCacheSize());
    }
    enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);

    return enricher;
}
项目:Camel    文件:TarFileMultipleFilesSplitterTest.java   
private AggregationStrategy updateHeader() {
    return new AggregationStrategy() {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange != null) {
                String processedFiles = oldExchange.getIn().getHeader(PROCESSED_FILES_HEADER_NAME, String.class);
                if (processedFiles == null) {
                    processedFiles = oldExchange.getIn().getHeader(TarIterator.TARFILE_ENTRY_NAME_HEADER, String.class);
                }
                processedFiles = processedFiles + "," + newExchange.getIn().getHeader(TarIterator.TARFILE_ENTRY_NAME_HEADER, String.class);
                newExchange.getIn().setHeader(PROCESSED_FILES_HEADER_NAME, processedFiles);
            }
            return newExchange;
        }

    };
}
项目:Camel    文件:EnrichDefinition.java   
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {

    Expression exp = getExpression().createExpression(routeContext);
    boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
    boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint();

    Enricher enricher = new Enricher(exp);
    enricher.setShareUnitOfWork(isShareUnitOfWork);
    enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
    AggregationStrategy strategy = createAggregationStrategy(routeContext);
    if (strategy != null) {
        enricher.setAggregationStrategy(strategy);
    }
    if (aggregateOnException != null) {
        enricher.setAggregateOnException(aggregateOnException);
    }
    return enricher;
}
项目:Camel    文件:CassandraAggregationTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            AggregationStrategy aggregationStrategy = new AggregationStrategy() {
                @Override
                public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                    if (oldExchange == null) {
                        return newExchange;
                    }
                    String oldBody = oldExchange.getIn().getBody(String.class);
                    String newBody = newExchange.getIn().getBody(String.class);
                    oldExchange.getIn().setBody(oldBody + "," + newBody);
                    return oldExchange;
                }
            };
            from("direct:input")
                    .aggregate(header("aggregationId"), aggregationStrategy)
                    .completionSize(3).completionTimeout(3000L)
                    .aggregationRepository(aggregationRepository)
                    .to("mock:output");
        }
    };
}
项目:Camel    文件:Splitter.java   
@Override
public boolean process(Exchange exchange, final AsyncCallback callback) {
    final AggregationStrategy strategy = getAggregationStrategy();

    // if no custom aggregation strategy is being used then fallback to keep the original
    // and propagate exceptions which is done by a per exchange specific aggregation strategy
    // to ensure it supports async routing
    if (strategy == null) {
        AggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
        if (isShareUnitOfWork()) {
            original = new ShareUnitOfWorkAggregationStrategy(original);
        }
        setAggregationStrategyOnExchange(exchange, original);
    }

    return super.process(exchange, callback);
}
项目:Camel    文件:MulticastProcessor.java   
public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
                          boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming,
                          boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork,
                          boolean parallelAggregate) {
    notNull(camelContext, "camelContext");
    this.camelContext = camelContext;
    this.processors = processors;
    this.aggregationStrategy = aggregationStrategy;
    this.executorService = executorService;
    this.shutdownExecutorService = shutdownExecutorService;
    this.streaming = streaming;
    this.stopOnException = stopOnException;
    // must enable parallel if executor service is provided
    this.parallelProcessing = parallelProcessing || executorService != null;
    this.timeout = timeout;
    this.onPrepare = onPrepare;
    this.shareUnitOfWork = shareUnitOfWork;
    this.parallelAggregate = parallelAggregate;
}
项目:Camel    文件:MulticastProcessor.java   
protected AggregationStrategy getAggregationStrategy(Exchange exchange) {
    AggregationStrategy answer = null;

    // prefer to use per Exchange aggregation strategy over a global strategy
    if (exchange != null) {
        Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
        Map<Object, AggregationStrategy> map = CastUtils.cast(property);
        if (map != null) {
            answer = map.get(this);
        }
    }
    if (answer == null) {
        // fallback to global strategy
        answer = getAggregationStrategy();
    }
    return answer;
}
项目:Camel    文件:MulticastProcessor.java   
/**
 * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}.
 *
 * @param exchange            the exchange
 * @param aggregationStrategy the strategy
 */
protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
    Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
    Map<Object, AggregationStrategy> map = CastUtils.cast(property);
    if (map == null) {
        map = new ConcurrentHashMap<Object, AggregationStrategy>();
    } else {
        // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties
        // we just create a new copy if we need to change the map
        map = new ConcurrentHashMap<Object, AggregationStrategy>(map);
    }
    // store the strategy using this processor as the key
    // (so we can store multiple strategies on the same exchange)
    map.put(this, aggregationStrategy);
    exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map);
}
项目:Camel    文件:EnricherAsyncUnhandledExceptionTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        public void configure() throws Exception {
            from("direct:in")
                .to("mock:pickedUp")
                // using the async utility component to ensure that the async routing engine kicks in
                .enrich("async:out?reply=Reply", new AggregationStrategy() {
                    @Override
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        throw new RuntimeException("Bang! Unhandled exception");
                    }
                });

        }
    };
}
项目:Camel    文件:AggregatorConcurrencyTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        public void configure() throws Exception {
            from(uri)
                .aggregate(constant(true), new AggregationStrategy() {
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        Exchange answer = oldExchange != null ? oldExchange : newExchange;
                        COUNTER.getAndIncrement();

                        Integer newIndex = newExchange.getIn().getHeader("index", Integer.class);
                        int total = SUM.addAndGet(newIndex);
                        answer.getIn().setHeader("total", total);

                        LOG.debug("Index: " + newIndex + ". Total so far: " + total);
                        return answer;
                    }
                }).completionTimeout(60000).completionPredicate(property(Exchange.AGGREGATED_SIZE).isEqualTo(100))
                .to("direct:foo");

            from("direct:foo").setBody().header("total").to("mock:result");
        }
    };
}
项目:Camel    文件:AggregatorAndOnExceptionTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            errorHandler(deadLetterChannel("mock:error"));

            onException(CamelException.class).maximumRedeliveries(2);

            from("seda:start")
                .aggregate(header("id"),
                    new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            return newExchange;
                        }
                    }).completionSize(2).completionTimeout(500L)
                .to("mock:result");
        }
    };
}
项目:Camel    文件:CassandraAggregationSerializedHeadersTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            AggregationStrategy aggregationStrategy = new AggregationStrategy() {
                @Override
                public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                    if (oldExchange == null) {
                        return newExchange;
                    }
                    String oldBody = oldExchange.getIn().getBody(String.class);
                    String newBody = newExchange.getIn().getBody(String.class);
                    oldExchange.getIn().setBody(oldBody + "," + newBody);
                    return oldExchange;
                }
            };
            from("direct:input")
                    .aggregate(header("aggregationId"), aggregationStrategy)
                    .completionSize(3).completionTimeout(3000L)
                    .aggregationRepository(aggregationRepository)
                    .to("mock:output");
        }
    };
}
项目:Camel    文件:AggregratedJmsRouteTest.java   
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        public void configure() throws Exception {
            from(timeOutEndpointUri).to("jms:queue:test.b");

            from("jms:queue:test.b").aggregate(header("cheese"), new AggregationStrategy() {
                public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        LOG.error("aggregration delay sleep inturrepted", e);
                        fail("aggregration delay sleep inturrepted");
                    }
                    return newExchange;
                }
            }).completionTimeout(2000L).to("mock:result");

            from(multicastEndpointUri).to("jms:queue:point1", "jms:queue:point2", "jms:queue:point3");
            from("jms:queue:point1").process(new MyProcessor()).to("jms:queue:reply");
            from("jms:queue:point2").process(new MyProcessor()).to("jms:queue:reply");
            from("jms:queue:point3").process(new MyProcessor()).to("jms:queue:reply");
            from("jms:queue:reply").aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionSize(3)
                .to("mock:reply");
        }
    };
}
项目:Camel    文件:ProcessorDefinition.java   
/**
 * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
 * Creates an aggregator allowing you to combine a number of messages together into a single message.
 *
 * @param aggregationStrategy the strategy used for the aggregation
 * @return the expression clause to be used as builder to configure the correlation expression
 */
public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
    AggregateDefinition answer = new AggregateDefinition();
    ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(answer);
    answer.setExpression(clause);
    answer.setAggregationStrategy(aggregationStrategy);
    addOutput(answer);
    return clause;
}
项目:Camel    文件:Splitter.java   
@Deprecated
public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
                boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
                boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) {
    this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
            streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, false);
}
项目:Camel    文件:Splitter.java   
public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
                boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
                boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork,
                boolean parallelAggregate) {
    super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService,
            shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, parallelAggregate);
    this.expression = expression;
    notNull(expression, "expression");
    notNull(destination, "destination");
}
项目:Camel    文件:MulticastProcessor.java   
@Deprecated
public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
                          boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
                          boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
    this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
            streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false);
}
项目:Camel    文件:MulticastProcessor.java   
@Override
public void run() {
    AggregationStrategy strategy = getAggregationStrategy(null);
    if (strategy instanceof DelegateAggregationStrategy) {
        strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
    }
    if (strategy instanceof TimeoutAwareAggregationStrategy) {
        // notify the strategy we timed out
        Exchange oldExchange = result.get();
        if (oldExchange == null) {
            // if they all timed out the result may not have been set yet, so use the original exchange
            oldExchange = original;
        }
        ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout);
    } else {
        // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
        LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue());
    }
    LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue());
    timedOut.set(true);

    // mark that index as timed out, which allows us to try to retrieve
    // any already completed tasks in the next loop
    if (completion instanceof SubmitOrderedCompletionService) {
        ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
    }

    // we timed out so increment the counter
    aggregated.incrementAndGet();
}
项目:Camel    文件:MulticastProcessor.java   
/**
 * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange}
 * which must be done after use.
 *
 * @param exchange the current exchange
 */
protected void removeAggregationStrategyFromExchange(Exchange exchange) {
    Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
    Map<Object, AggregationStrategy> map = CastUtils.cast(property);
    if (map == null) {
        return;
    }
    // remove the strategy using this processor as the key
    map.remove(this);
}
项目:Camel    文件:RecipientListProcessor.java   
@Deprecated
public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
                              boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
                              boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
    super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
            streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false);
    this.producerCache = producerCache;
    this.iter = iter;
}
项目:Camel    文件:RecipientListProcessor.java   
public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
                              boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
                              boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) {
    super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
            streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, parallelAggregate);
    this.producerCache = producerCache;
    this.iter = iter;
}
项目:Camel    文件:AggregationStrategies.java   
/**
 * Creates a {@link AggregationStrategyBeanAdapter} for using a POJO as the aggregation strategy.
 */
public static AggregationStrategy beanAllowNull(Object bean, String methodName) {
    AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(bean, methodName);
    adapter.setAllowNullOldExchange(true);
    adapter.setAllowNullNewExchange(true);
    return adapter;
}
项目:Camel    文件:AggregationStrategies.java   
/**
 * Creates a {@link AggregationStrategyBeanAdapter} for using a POJO as the aggregation strategy.
 */
public static AggregationStrategy beanAllowNull(Class<?> type, String methodName) {
    AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(type, methodName);
    adapter.setAllowNullOldExchange(true);
    adapter.setAllowNullNewExchange(true);
    return adapter;
}
项目:Camel    文件:SplitWithCustomAggregationStrategyTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").
                    setBody().simple("<search><key>foo-${id}</key><key>bar-${id}</key><key>baz-${id}</key></search>").
                    to("direct:splitInOut").
                    to("mock:result");

            from("direct:splitInOut").
                    setHeader("com.example.id").simple("${id}").
                    split(xpath("/search/key"), new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String oldBody = oldExchange.getIn().getBody(String.class);
                            String newBody = newExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(oldBody + newBody);

                            return oldExchange;
                        }
                    }).parallelProcessing().streaming().
                        to("direct:processLine").
                    end().
                    transform().simple("<results>${in.body}</results>");

            from("direct:processLine").
                    to("log:line").
                    transform().simple("<index>${in.header.CamelSplitIndex}</index>${in.body}");
        }
    };
}
项目:Camel    文件:MulticastParallelStressTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start")
                .multicast(new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    }).parallelProcessing()
                        .to("direct:a", "direct:b", "direct:c", "direct:d")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").delay(20).setBody(body().append("A"));

            from("direct:b").setBody(body().append("B"));

            from("direct:c").delay(50).setBody(body().append("C"));

            from("direct:d").delay(10).setBody(body().append("D"));
        }
    };
}
项目:Camel    文件:MulticastParallelTwoTimeoutTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start")
                .multicast(new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    })
                    .parallelProcessing().timeout(1000).to("direct:a", "direct:b", "direct:c")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").delay(3000).setBody(constant("A"));

            from("direct:b").setBody(constant("B"));

            from("direct:c").delay(4000).setBody(constant("C"));
        }
    };
}
项目:Camel    文件:JettySimplifiedHandle404Test.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            // disable error handling
            errorHandler(noErrorHandler());

            // START SNIPPET: e1
            // We set throwExceptionOnFailure to false to let Camel return any response from the remove HTTP server without thrown
            // HttpOperationFailedException in case of failures.
            // This allows us to handle all responses in the aggregation strategy where we can check the HTTP response code
            // and decide what to do. As this is based on an unit test we assert the code is 404
            from("direct:start").enrich("http://localhost:{{port}}/myserver?throwExceptionOnFailure=false&user=Camel", new AggregationStrategy() {
                public Exchange aggregate(Exchange original, Exchange resource) {
                    // get the response code
                    Integer code = resource.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
                    assertEquals(404, code.intValue());
                    return resource;
                }
            }).to("mock:result");

            // this is our jetty server where we simulate the 404
            from("jetty://http://localhost:{{port}}/myserver")
                    .process(new Processor() {
                        public void process(Exchange exchange) throws Exception {
                            exchange.getOut().setBody("Page not found");
                            exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 404);
                        }
                    });
            // END SNIPPET: e1
        }
    };
}
项目:Camel    文件:MulticastParallelTimeout3Test.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            // START SNIPPET: e1
            from("direct:start")
                .multicast(new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    })
                    .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").to("mock:A").setBody(constant("A"));

            from("direct:b").to("mock:B").setBody(constant("B"));

            from("direct:c").delay(1000).to("mock:C").setBody(constant("C"));
            // END SNIPPET: e1
        }
    };
}
项目:Camel    文件:MulticastParallelStreamingTimeoutTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start")
                .multicast(new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    })
                    .parallelProcessing().streaming().timeout(2000).to("direct:a", "direct:b", "direct:c")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").delay(3000).setBody(constant("A"));

            from("direct:b").delay(500).setBody(constant("B"));

            from("direct:c").setBody(constant("C"));
        }
    };
}
项目:Camel    文件:MulticastParallelStreamingTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start")
                .multicast(new AggregationStrategy() {
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        if (oldExchange == null) {
                            return newExchange;
                        }

                        String body = oldExchange.getIn().getBody(String.class);
                        oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                        return oldExchange;
                    }
                }).parallelProcessing().streaming()
                    .to("direct:a", "direct:b")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").delay(500).setBody(constant("A"));

            from("direct:b").setBody(constant("B"));
        }
    };
}
项目:Camel    文件:MulticastParallelTwoTimeoutMiddleTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            // START SNIPPET: e1
            from("direct:start")
                .multicast(new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    })
                    .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c", "direct:d", "direct:e")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").to("mock:A").setBody(constant("A"));

            from("direct:b").delay(1000).to("mock:B").setBody(constant("B"));

            from("direct:c").to("mock:C").setBody(constant("C"));

            from("direct:d").delay(1000).to("mock:D").setBody(constant("D"));

            from("direct:e").to("mock:E").setBody(constant("E"));
        }
    };
}
项目:Camel    文件:MulticastParallelTimeout2Test.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            // START SNIPPET: e1
            from("direct:start")
                .multicast(new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    })
                    .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").to("mock:A").setBody(constant("A"));

            from("direct:b").delay(1000).to("mock:B").setBody(constant("B"));

            from("direct:c").to("mock:C").setBody(constant("C"));
            // END SNIPPET: e1
        }
    };
}
项目:Camel    文件:MulticastParallelMiddleTimeoutTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start")
                .multicast(new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    })
                    .parallelProcessing().timeout(2000).to("direct:a", "direct:b", "direct:c")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").setBody(constant("A"));

            from("direct:b").delay(4000).setBody(constant("B"));

            from("direct:c").delay(500).setBody(constant("C"));
        }
    };
}
项目:Camel    文件:RecipientListParallelTimeoutTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start")
                .recipientList(header("slip")).aggregationStrategy(
                        new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    })
                    .parallelProcessing().timeout(1000)
                .to("mock:result");

            from("direct:a").delay(5000).setBody(constant("A"));

            from("direct:b").setBody(constant("B"));

            from("direct:c").delay(500).setBody(constant("C"));
        }
    };
}
项目:Camel    文件:MulticastParallelTimeoutTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            // START SNIPPET: e1
            from("direct:start")
                .multicast(new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    })
                    .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").delay(1000).to("mock:A").setBody(constant("A"));

            from("direct:b").to("mock:B").setBody(constant("B"));

            from("direct:c").to("mock:C").setBody(constant("C"));
            // END SNIPPET: e1
        }
    };
}
项目:Camel    文件:MulticastThreadPoolProfileTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            // register thread pool profile
            ThreadPoolProfile profile = new ThreadPoolProfileBuilder("myProfile").poolSize(5).maxPoolSize(10).maxQueueSize(20).build();
            context.getExecutorServiceManager().registerThreadPoolProfile(profile);

            from("direct:start")
                .multicast(new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    })
                    // and refer to the profile here
                    .parallelProcessing().executorServiceRef("myProfile").to("direct:a", "direct:b")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").delay(100).setBody(constant("A"));

            from("direct:b").setBody(constant("B"));
        }
    };
}
项目:Camel    文件:MulticastParallelStreamingTwoTimeoutTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start")
                .multicast(new AggregationStrategy() {
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                return newExchange;
                            }

                            String body = oldExchange.getIn().getBody(String.class);
                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                            return oldExchange;
                        }
                    })
                    .parallelProcessing().streaming().timeout(2000).to("direct:a", "direct:b", "direct:c")
                // use end to indicate end of multicast route
                .end()
                .to("mock:result");

            from("direct:a").delay(3000).setBody(constant("A"));

            from("direct:b").setBody(constant("B"));

            from("direct:c").delay(4000).setBody(constant("C"));
        }
    };
}
项目:Camel    文件:AggregatorExceptionInPredicateTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            onException(IllegalArgumentException.class).handled(true).to("mock:handled");

            from("direct:start")
                .aggregate(header("id"))
                .completionTimeout(500)
                .aggregationStrategy(new AggregationStrategy() {

                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        Object body = newExchange.getIn().getBody();
                        if ("Damn".equals(body)) {
                            throw new IllegalArgumentException();
                        }
                        return newExchange;
                    }
                })
                .to("mock:result");

            from("direct:predicate")
                .aggregate(new Expression() {

                    public <T> T evaluate(Exchange exchange, Class<T> type) {
                        if (exchange.getIn().getBody().equals("Damn")) {
                            throw new IllegalArgumentException();
                        }
                        return ExpressionBuilder.headerExpression("id").evaluate(exchange, type);
                    }
                }, new UseLatestAggregationStrategy())
                .completionTimeout(500)
                .to("mock:result");
        }
    };
}
项目:Camel    文件:AggregateProcessorTest.java   
public void testAggregateProcessorCompletionPredicateEager() throws Exception {
    MockEndpoint mock = getMockEndpoint("mock:result");
    mock.expectedBodiesReceived("A+B+END");
    mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "predicate");

    Processor done = new SendProcessor(context.getEndpoint("mock:result"));
    Expression corr = header("id");
    AggregationStrategy as = new BodyInAggregatingStrategy();
    Predicate complete = body().isEqualTo("END");

    AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
    ap.setCompletionPredicate(complete);
    ap.setEagerCheckCompletion(true);
    ap.start();

    Exchange e1 = new DefaultExchange(context);
    e1.getIn().setBody("A");
    e1.getIn().setHeader("id", 123);

    Exchange e2 = new DefaultExchange(context);
    e2.getIn().setBody("B");
    e2.getIn().setHeader("id", 123);

    Exchange e3 = new DefaultExchange(context);
    e3.getIn().setBody("END");
    e3.getIn().setHeader("id", 123);

    Exchange e4 = new DefaultExchange(context);
    e4.getIn().setBody("D");
    e4.getIn().setHeader("id", 123);

    ap.process(e1);
    ap.process(e2);
    ap.process(e3);
    ap.process(e4);

    assertMockEndpointsSatisfied();

    ap.stop();
}