Java 类org.apache.camel.processor.Splitter 实例源码

项目:Camel    文件:RouteBuilderTest.java   
public void testSplitter() throws Exception {

        List<Route> routes = buildSplitter();

        log.debug("Created routes: " + routes);

        assertEquals("Number routes created", 1, routes.size());
        for (Route route : routes) {
            Endpoint key = route.getEndpoint();
            assertEquals("From endpoint", "direct://a", key.getEndpointUri());

            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
            Channel channel = unwrapChannel(consumer.getProcessor());
            assertIsInstanceOf(Splitter.class, channel.getNextProcessor());
        }
    }
项目:syndesis    文件:RecordSplitterEndpoint.java   
/**
 * Creates a consumer endpoint that splits up the List of Maps into exchanges of single
 * Maps, and within each exchange it converts each Map to JSON.
 */
@Override
public Consumer createConsumer(final Processor processor) throws Exception {
    final ToJSONProcessor toJsonProcessor = new ToJSONProcessor();
    Processor pipeline = Pipeline.newInstance(getCamelContext(), toJsonProcessor, processor);
    final Expression expression = ExpressionBuilder.bodyExpression(List.class);
    final Splitter splitter = new Splitter(getCamelContext(), expression, pipeline, null);
    return endpoint.createConsumer(splitter);
}
项目:ipf-flow-manager    文件:SplitterCopyAspectTest.java   
@Before
public void setUp() throws Exception {
    CamelContext context = new DefaultCamelContext();
    messages = new ArrayList<>();
    splitter = new Splitter(
            context,
            new TestExpression(),
            new TestProcessor(),
            new UseLatestAggregationStrategy());
}
项目:Camel    文件:SplitDefinition.java   
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
    Processor childProcessor = this.createChildProcessor(routeContext, true);
    aggregationStrategy = createAggregationStrategy(routeContext);

    boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
    boolean isStreaming = getStreaming() != null && getStreaming();
    boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
    boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate();
    boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
    ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing);

    long timeout = getTimeout() != null ? getTimeout() : 0;
    if (timeout > 0 && !isParallelProcessing) {
        throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
    }
    if (onPrepareRef != null) {
        onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
    }

    Expression exp = getExpression().createExpression(routeContext);

    Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
                        isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException(),
                        timeout, onPrepare, isShareUnitOfWork, isParallelAggregate);
    return answer;
}
项目:Camel    文件:ManagedSplitter.java   
public ManagedSplitter(CamelContext context, Splitter processor, SplitDefinition definition) {
    super(context, processor, definition);
    this.processor = processor;
}