Java 类org.apache.camel.Processor 实例源码

项目:eds    文件:DemoInterceptor.java   
@Override
    public Processor wrapProcessorInInterceptors(CamelContext context,
            final ProcessorDefinition<?> definition, final Processor target,
            final Processor nextTarget) throws Exception {
        return new DelegateAsyncProcessor(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
//              if(!camelConfig.isRunning()){
//                  System.err.println("系统将关闭,不在处理任务");
//                  return ;
//              }
                System.out.println("defainition :"+definition);
                System.out.println("nextTarget :"+nextTarget);
                target.process(exchange);
            }
        });
    }
项目:syndesis    文件:SqlConnectorComponent.java   
@Override
public Processor getAfterProducer() {
    @SuppressWarnings("unchecked")
    final Processor processor = exchange -> {
        String jsonBean = "";
        if (exchange.getIn().getBody(List.class) != null) {
            //Only grabbing the first record (map) in the list
            @SuppressWarnings("rawtypes")
            List<Map> maps = exchange.getIn().getBody(List.class);
            jsonBean = JSONBeanUtil.toJSONBean(maps.iterator().next());
        } else {
            jsonBean = JSONBeanUtil.toJSONBean(exchange.getIn().getBody(Map.class));
        }
        exchange.getIn().setBody(jsonBean);
    };
    return processor;
}
项目:syndesis    文件:SalesforceConnector.java   
@Override
protected final Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters)
    throws Exception {
    final DefaultConnectorEndpoint connectorEndpoint = (DefaultConnectorEndpoint) super.createEndpoint(uri, remaining, parameters);

    final DataType inputDataType = connectorEndpoint.getInputDataType();
    final UnmarshallProcessor unmarshallInputProcessor = new UnmarshallInputProcessor(inputDataType);
    final Processor existingBeforeProducer = connectorEndpoint.getBeforeProducer();
    if (existingBeforeProducer == null) {
        connectorEndpoint.setBeforeProducer(unmarshallInputProcessor);
    } else {
        connectorEndpoint.setBeforeProducer(Pipeline.newInstance(getCamelContext(), unmarshallInputProcessor, existingBeforeProducer));
    }

    final DataType outputDataType = connectorEndpoint.getOutputDataType();
    final UnmarshallProcessor unmarshallOutputProcessor = new UnmarshallOutputProcessor(outputDataType);
    final Processor existingAfterProducer = connectorEndpoint.getAfterProducer();
    if (existingAfterProducer == null) {
        connectorEndpoint.setAfterProducer(unmarshallOutputProcessor);
    } else {
        connectorEndpoint.setAfterProducer(Pipeline.newInstance(getCamelContext(), unmarshallOutputProcessor, existingAfterProducer));
    }

    return connectorEndpoint;
}
项目:syndesis    文件:SalesforceConnectorTest.java   
@Test
public void shouldNotRemoveExistingProcessors() throws Exception {
    final DefaultConnectorEndpoint endpoint = (DefaultConnectorEndpoint) connectorWithExistingProcessors
        .createEndpoint("salesforce-connector");

    final Processor createdBeforeProducer = endpoint.getBeforeProducer();
    assertThat(createdBeforeProducer).isInstanceOf(Pipeline.class);
    final Pipeline beforePipeline = (Pipeline) createdBeforeProducer;
    assertThat(beforePipeline.getProcessors()).isInstanceOf(List.class).hasSize(2);
    assertThat(((List<Processor>) beforePipeline.getProcessors()).get(0)).isInstanceOf(UnmarshallInputProcessor.class);
    assertThat(((List<Processor>) beforePipeline.getProcessors()).get(1)).isSameAs(beforeProcessor);

    final Processor createdAfterProducer = endpoint.getAfterProducer();
    assertThat(createdAfterProducer).isInstanceOf(Pipeline.class);
    final Pipeline afterPipeline = (Pipeline) createdAfterProducer;
    assertThat(afterPipeline.getProcessors()).isInstanceOf(List.class).hasSize(2);
    assertThat(((List<Processor>) afterPipeline.getProcessors()).get(0)).isInstanceOf(UnmarshallOutputProcessor.class);
    assertThat(((List<Processor>) afterPipeline.getProcessors()).get(1)).isSameAs(afterProcessor);
}
项目:connectors    文件:TimerSqlStoredRoute.java   
@Override
public void configure() throws Exception {

    String jsonBody = "{\"a\":20,\"b\":30}";

    from("timer://myTimer?period=1000")
    .setBody().constant(jsonBody)  

    .to("sql-stored-connector:DEMO_ADD( "
            + "INTEGER ${body[a]}, "
            + "INTEGER ${body[b]}, "
            + "OUT INTEGER c)")
    .process(new Processor() {

        public void process(Exchange exchange)
                throws Exception {
            System.out.println(exchange.getIn()
                    .getBody().getClass());
            System.out.println(exchange.getIn()
                    .getBody());
        }
    });
}
项目:connectors    文件:SqlStoredStartRoute.java   
@Override
public void configure() throws Exception {

    from("sql-stored-start-connector:DEMO_OUT( "
            + "OUT INTEGER C )?schedulerPeriod=5000")
    .process(new Processor() {

        public void process(Exchange exchange)
                throws Exception {
            System.out.println(exchange.getIn()
                    .getBody().getClass());
            System.out.println(exchange.getIn()
                    .getBody());
        }
    });
}
项目:Camel    文件:MailConvertersTest.java   
@Test
public void testMultipartToString() throws Exception {
    MockEndpoint mock = getMockEndpoint("mock:result");
    mock.expectedMessageCount(1);

    template.send("direct:a", new Processor() {
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody("Hello World");
            exchange.getIn().setHeader(MailConstants.MAIL_ALTERNATIVE_BODY, "Alternative World");
        }
    });

    assertMockEndpointsSatisfied();

    Message mailMessage = mock.getReceivedExchanges().get(0).getIn().getBody(MailMessage.class).getMessage();
    assertNotNull(mailMessage);

    Object content = mailMessage.getContent();
    Multipart mp = assertIsInstanceOf(Multipart.class, content);

    String s = MailConverters.toString(mp);
    assertEquals("Alternative World", s);
}
项目:Camel    文件:FileAsyncStressReadLockLockFileTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            // leverage the fact that we can limit to max 50 files per poll
            // this will result in polling again and potentially picking up files
            // that already are in progress
            from("file:target/filestress?maxMessagesPerPoll=50&readLock=fileLock").routeId("foo").noAutoStartup()
                .threads(10)
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        // simulate some work with random time to complete
                        Random ran = new Random();
                        int delay = ran.nextInt(250) + 10;
                        Thread.sleep(delay);
                    }
                }).to("mock:result");
        }
    };
}
项目:Camel    文件:HBaseProducerTest.java   
@Test
public void testPutAndGetMultiColumns() throws Exception {
    testPutMultiColumns();
    if (systemReady) {
        Exchange resp = template.request("direct:start", new Processor() {
            public void process(Exchange exchange) throws Exception {
                exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
                for (int col = 0; col < column[0].length; col++) {
                    exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]);
                    exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]);
                    exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]);
                }
            }
        });

        for (int col = 0; col < column[0].length; col++) {
            assertEquals(body[0][col][0], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(col + 1)));
        }
    }
}
项目:Camel    文件:HttpsRouteTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        public void configure() throws URISyntaxException {
            JettyHttpComponent componentJetty = (JettyHttpComponent) context.getComponent("jetty");
            componentJetty.setSslPassword(pwd);
            componentJetty.setSslKeyPassword(pwd);
            URL keyStoreUrl = this.getClass().getClassLoader().getResource("jsse/localhost.ks");
            componentJetty.setKeystore(keyStoreUrl.toURI().getPath());

            from("jetty:https://localhost:" + port1 + "/test").to("mock:a");

            Processor proc = new Processor() {
                public void process(Exchange exchange) throws Exception {
                    exchange.getOut().setBody("<b>Hello World</b>");
                }
            };
            from("jetty:https://localhost:" + port1 + "/hello").process(proc);

            from("jetty:https://localhost:" + port2 + "/test").to("mock:b");
        }
    };
}
项目:Camel    文件:JettyXsltTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    port = AvailablePortFinder.getNextAvailable(8000);

    return new RouteBuilder() {
        public void configure() {
            from("jetty:http://localhost:" + port + "/test")
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        String name = exchange.getIn().getHeader("name", String.class);
                        ObjectHelper.notNull(name, "name");

                        name = "org/apache/camel/itest/jetty/" + name;
                        InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(exchange.getContext().getClassResolver(), name);
                        String xml = exchange.getContext().getTypeConverter().convertTo(String.class, is);

                        exchange.getOut().setBody(xml);
                        exchange.getOut().setHeader(Exchange.CONTENT_TYPE, "text/xml");
                    }
                });
        }
    };
}
项目:Ardulink-2    文件:MqttCamelRouteBuilder.java   
private Processor divideByValueOf(final ValueBuilder valueBuilder) {
    return new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            Message in = exchange.getIn();
            BigDecimal sum = new BigDecimal(checkNotNull(
                    in.getBody(Number.class), "Body of %s is null", in)
                    .toString());
            BigDecimal divisor = new BigDecimal(checkNotNull(
                    valueBuilder.evaluate(exchange, Integer.class),
                    "No %s set in exchange %s", valueBuilder, exchange)
                    .toString());
            in.setBody(sum.divide(divisor, HALF_UP));
        }

    };
}
项目:Camel    文件:HttpJettyProducerRecipientListCustomThreadPoolTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:a").recipientList(header("slip"));

            from("jetty://http://localhost:{{port}}/myapp").process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    int foo = exchange.getIn().getHeader("foo", Integer.class);
                    String bar = exchange.getIn().getHeader("bar", String.class);

                    exchange.getOut().setHeader("foo", foo * 2);
                    exchange.getOut().setBody("Bye " + bar);
                }
            });
        }
    };
}
项目:Camel    文件:NettyUDPObjectSyncTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {                
            from("netty4:udp://localhost:{{port}}?sync=true")
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        Poetry poetry = (Poetry) exchange.getIn().getBody();
                        poetry.setPoet("Dr. Sarojini Naidu");
                        exchange.getOut().setBody(poetry);
                    }
                });
        }
    };
}
项目:Camel    文件:FileBatchConsumerMemoryLeakTest.java   
/**
 * Process 100 files with a sorted file endpoint. For each exchange the body will be replaced
 * by a large buffer. In reality a similar thing happens if you have a lot of large files
 * and use convertBodyTo(String.class). In both cases the Exchanges becomes quite large.
 * The test will consume a lot of memory if all exchanges are kept in a list while doing
 * the batch processing. This is because the garbage collector can not clean them as they
 * are referenced in the list of exchanges.
 * <p/>
 * The test is not really a good integration test as it simply waits and does not fail
 * or succeed fast
 */
public void xxxtestMemoryLeak() throws Exception {
    // run this manually and browse the memory usage, eg in IDEA there is a Statistics tab

    deleteDirectory("target/filesorter/archiv");
    for (int c = 0; c < 100; c++) {
        template.sendBodyAndHeader(fileUrl + "c", "test", Exchange.FILE_NAME, c + ".dat");
    }
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from(fileUrl + "c/?sortBy=ignoreCase:file:name")
                    .process(new Processor() {
                        public void process(Exchange exchange) throws Exception {
                            StringBuilder buf = new StringBuilder(10000000);
                            buf.setLength(1000000);
                            exchange.getIn().setBody(buf.toString());
                        }
                    }).to("file:target/filesorter/archiv");
        }
    });
    context.start();

    Thread.sleep(30 * 1000L);
}
项目:Camel    文件:NettyHttpProducerSimpleTest.java   
@Test
public void testHttpSimpleExchange() throws Exception {
    getMockEndpoint("mock:input").expectedBodiesReceived("Hello World");

    Exchange out = template.request("netty-http:http://localhost:{{port}}/foo", new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody("Hello World");
        }
    });
    assertNotNull(out);
    assertTrue(out.hasOut());

    NettyHttpMessage response = out.getOut(NettyHttpMessage.class);
    assertNotNull(response);
    assertEquals(200, response.getHttpResponse().getStatus().getCode());

    // we can also get the response as body
    HttpResponse body = out.getOut().getBody(HttpResponse.class);
    assertNotNull(body);
    assertEquals(200, body.getStatus().getCode());

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:MulticastParallelStreamingTest.java   
public void testMulticastParallel() throws Exception {
    MockEndpoint mock = getMockEndpoint("mock:result");
    mock.expectedMessageCount(10);
    mock.whenAnyExchangeReceived(new Processor() {
        public void process(Exchange exchange) throws Exception {
            // they should all be BA as B is faster than A
            assertEquals("BA", exchange.getIn().getBody(String.class));
        }
    });

    for (int i = 0; i < 10; i++) {
        template.sendBody("direct:start", "Hello");
    }

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:MulticastStopOnExceptionWithOnExceptionIssueTest.java   
public void testEnd2FailureTest() throws Exception {
    MockEndpoint end2 = getMockEndpoint("mock:end2");
    end2.whenAnyExchangeReceived(new Processor() {
        public void process(Exchange exchange) throws Exception {
            throw new RuntimeException("Simulated Exception");
        }
    });

    getMockEndpoint("mock:end1").expectedMessageCount(1);
    getMockEndpoint("mock:end3").expectedMessageCount(0);
    getMockEndpoint("mock:end4").expectedMessageCount(1);

    String result = template.requestBody("direct:start", "Hello World!", String.class);
    assertEquals("Stop!", result);

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:HttpFilterCamelHeadersTest.java   
@Test
public void testFilterCamelHeaders() throws Exception {
    Exchange out = template.send("http://localhost:{{port}}/test/filter", new Processor() {
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody("Claus");
            exchange.getIn().setHeader("bar", 123);
        }
    });

    assertNotNull(out);
    assertEquals("Hi Claus", out.getOut().getBody(String.class));

    // there should be no internal Camel headers
    // except for the response code
    Map<String, Object> headers = out.getOut().getHeaders();
    for (String key : headers.keySet()) {
        boolean valid = key.equalsIgnoreCase(Exchange.HTTP_RESPONSE_CODE) || key.equalsIgnoreCase(Exchange.HTTP_RESPONSE_TEXT);
        if (!valid) {
            assertTrue("Should not contain any Camel internal headers", !key.toLowerCase().startsWith("camel"));
        } else {
            assertEquals(200, headers.get(Exchange.HTTP_RESPONSE_CODE));
        }
    }
}
项目:Camel    文件:ProcessorBuilder.java   
/**
 * Removes all headers on the message, except for the ones provided in the <tt>names</tt> parameter
 */
public static Processor removeHeaders(final String pattern, final String... exceptionPatterns) {
    return new Processor() {
        public void process(Exchange exchange) {
            if (exchange.hasOut()) {
                exchange.getOut().removeHeaders(pattern, exceptionPatterns);
            } else {
                exchange.getIn().removeHeaders(pattern, exceptionPatterns);
            }
        }

        @Override
        public String toString() {
            return "removeHeaders(" + pattern + ", " + Arrays.toString(exceptionPatterns) + ")";
        }
    };
}
项目:Camel    文件:CustomProcessorWithNamespacesTest.java   
public void testXMLRouteLoading() throws Exception {
    applicationContext = createApplicationContext();

    SpringCamelContext context = applicationContext.getBeansOfType(SpringCamelContext.class).values().iterator().next();
    assertValidContext(context);

    // now lets send a message
    ProducerTemplate template = context.createProducerTemplate();
    template.start();
    template.send("direct:start", new Processor() {
        public void process(Exchange exchange) {
            Message in = exchange.getIn();
            in.setHeader("name", "James");
            in.setBody(body);
        }
    });
    template.stop();

    MyProcessor myProcessor = applicationContext.getBean("myProcessor", MyProcessor.class);
    List<Exchange> list = myProcessor.getExchanges();
    assertEquals("Should have received a single exchange: " + list, 1, list.size());
}
项目:Camel    文件:CxfJavaOnlyPayloadModeTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from(url).process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    String s = "<GetPersonResponse xmlns=\"http://camel.apache.org/wsdl-first/types\">"
                            + "<personId>123</personId><ssn>456</ssn><name>Donald Duck</name>"
                            + "</GetPersonResponse>";

                    Document xml = context.getTypeConverter().convertTo(Document.class, s);
                    exchange.getOut().setBody(xml);
                }
            });
        }
    };
}
项目:Camel    文件:NettyHttpHeaderCaseTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        public void configure() throws Exception {
            from("netty4-http:http://localhost:{{port}}/myapp/mytest").process(new Processor() {
                public void process(Exchange exchange) throws Exception {

                    // headers received should be in case as well
                    Map<String, Object> map = new LinkedHashMap<String, Object>();
                    map.putAll(exchange.getIn().getHeaders());

                    assertEquals("123", map.get("OTHER"));
                    assertEquals(null, map.get("other"));
                    assertEquals("Carlsberg", map.get("beer"));
                    assertEquals(null, map.get("Beer"));

                    exchange.getOut().setBody("Bye World");
                    exchange.getOut().setHeader("MyCaseHeader", "aBc123");
                    exchange.getOut().setHeader("otherCaseHeader", "456DEf");
                }
            });
        }
    };
}
项目:Camel    文件:IdempotentConsumerDefinition.java   
@Override
@SuppressWarnings("unchecked")
public Processor createProcessor(RouteContext routeContext) throws Exception {
    Processor childProcessor = this.createChildProcessor(routeContext, true);

    IdempotentRepository<String> idempotentRepository =
            (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
    ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);

    Expression expression = getExpression().createExpression(routeContext);

    // these boolean should be true by default
    boolean eager = getEager() == null || getEager();
    boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
    boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
    // these boolean should be false by default
    boolean completionEager = getCompletionEager() != null && getCompletionEager();

    return new IdempotentConsumer(expression, idempotentRepository, eager, completionEager, duplicate, remove, childProcessor);
}
项目:Camel    文件:FileConsumerBeginRenameStrategyTest.java   
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        public void configure() throws Exception {
            from("file://target/reports?preMove=../inprogress/${file:name}&consumer.delay=5000")
                    .process(new Processor() {
                        @SuppressWarnings("unchecked")
                        public void process(Exchange exchange) throws Exception {
                            GenericFile<File> file = (GenericFile<File>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
                            assertNotNull(file);
                            assertTrue(file.getRelativeFilePath().indexOf("inprogress") > -1);
                        }
                    })
                    .to("mock:report");
        }
    };
}
项目:Camel    文件:CxfConsumerPayloadFaultTest.java   
@Override
protected RouteBuilder createRouteBuilder() {
    return new RouteBuilder() {
        public void configure() {
            from(fromURI).process(new Processor() {
                public void process(final Exchange exchange) throws Exception {
                    QName faultCode = new QName("http://schemas.xmlsoap.org/soap/envelope/", "Server");
                    SoapFault fault = new SoapFault("Get the null value of person name", faultCode);
                    Element details = StaxUtils.read(new StringReader(DETAILS)).getDocumentElement();
                    fault.setDetail(details);
                    exchange.setException(fault);

                }
            });
        }
    };
}
项目:Camel    文件:SplitterTest.java   
public void testSplitterWithAggregationStrategyParallel() throws Exception {
    MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
    resultEndpoint.expectedMessageCount(5);

    Exchange result = template.request("direct:parallel", new Processor() {
        public void process(Exchange exchange) {
            Message in = exchange.getIn();
            in.setBody("James,Guillaume,Hiram,Rob,Roman");
            in.setHeader("foo", "bar");
        }
    });

    assertMockEndpointsSatisfied();
    Message out = result.getOut();

    assertMessageHeader(out, "foo", "bar");
    assertEquals((Integer) 5, result.getProperty("aggregated", Integer.class));
}
项目:Camel    文件:JmsClientAckTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("activemq:queue:foo")
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            JmsMessage jms = exchange.getIn(JmsMessage.class);
                            assertNotNull(jms);
                            Session session = jms.getJmsSession();
                            assertNotNull("Should have JMS session", session);

                            assertEquals("Should be client ACK mode", Session.CLIENT_ACKNOWLEDGE, session.getAcknowledgeMode());
                            jms.getJmsMessage().acknowledge();
                        }
                    })
                    .to("mock:result");
        }
    };
}
项目:Camel    文件:SwfComponentSpringTest.java   
@Test
public void sendInOut() throws Exception {
    result.expectedMessageCount(1);

    template.send("direct:start", new Processor() {
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setHeader(SWFConstants.WORKFLOW_ID, "123");
        }
    });

    assertMockEndpointsSatisfied();

    Exchange resultExchange = result.getExchanges().get(0);
    assertNotNull(resultExchange.getIn().getHeader(SWFConstants.WORKFLOW_ID));
    assertNotNull(resultExchange.getIn().getHeader(SWFConstants.RUN_ID));
}
项目:Camel    文件:HttpBasicAuthTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("jetty://http://localhost:{{port}}/test?handlers=myAuthHandler")
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        HttpServletRequest req = exchange.getIn().getBody(HttpServletRequest.class);
                        assertNotNull(req);
                        Principal user = req.getUserPrincipal();
                        assertNotNull(user);
                        assertEquals("donald", user.getName());
                    }
                })
                .transform(constant("Bye World"));
        }
    };
}
项目:Camel    文件:OnExceptionHandleAndTransformTest.java   
public void testOnExceptionTransformConstant() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0));

            // START SNIPPET: e1
            // we catch MyFunctionalException and want to mark it as handled (= no failure returned to client)
            // but we want to return a fixed text response, so we transform OUT body as Sorry.
            onException(MyFunctionalException.class)
                    .handled(true)
                    .transform().constant("Sorry");
            // END SNIPPET: e1

            from("direct:start").process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    throw new MyFunctionalException("Sorry you cannot do this");
                }
            });
        }
    });

    Object out = template.requestBody("direct:start", "Hello World");
    assertEquals("Sorry", out);
}
项目:Camel    文件:NettyRequestTimeoutTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("netty4:tcp://localhost:{{port}}?textline=true&sync=true")
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        String body = exchange.getIn().getBody(String.class);

                        if (body.contains("Camel")) {
                            Thread.sleep(3000);
                        }
                    }
                })
                .transform().constant("Bye World");

        }
    };
}
项目:Camel    文件:CamelContextFactoryBeanTest.java   
public void testXMLRouteLoading() throws Exception {
    applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/spring/camelContextFactoryBean.xml");

    CamelContext context = applicationContext.getBean("camel2", CamelContext.class);
    assertNotNull("No context found!", context);

    List<Route> routes = context.getRoutes();
    LOG.debug("Found routes: " + routes);

    assertNotNull("Should have found some routes", routes);
    assertEquals("One Route should be found", 1, routes.size());

    for (Route route : routes) {
        Endpoint key = route.getEndpoint();
        EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
        Processor processor = consumerRoute.getProcessor();
        assertNotNull(processor);

        assertEndpointUri(key, "seda://test.c");
    }
}
项目:Camel    文件:Mina2Consumer.java   
public Mina2Consumer(final Mina2Endpoint endpoint, Processor processor) throws Exception {
    super(endpoint, processor);
    this.configuration = endpoint.getConfiguration();
    //
    // All mina2 endpoints are InOut. The endpoints are asynchronous.
    // Endpoints can send "n" messages and receive "m" messages.
    //
    this.getEndpoint().setExchangePattern(ExchangePattern.InOut);

    String protocol = configuration.getProtocol();
    if (protocol.equals("tcp")) {
        if (configuration.isClientMode()) {
            setupClientSocketProtocol(protocol, configuration);
        } else {
            setupSocketProtocol(protocol, configuration);
        }
    } else if (configuration.isDatagramProtocol()) {
        setupDatagramProtocol(protocol, configuration);
    } else if (protocol.equals("vm")) {
        setupVmProtocol(protocol, configuration);
    }
}
项目:Camel    文件:InOnlyConsumerAsyncTrueTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("sjms:queue:in.only.consumer.async?synchronous=false").to("log:before")
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        if (exchange.getIn().getBody(String.class).equals("Hello Camel")) {
                            Thread.sleep(2000);
                        }
                    }
                }).to("log:after").to(MOCK_RESULT);
        }
    };
}
项目:Camel    文件:EnrichDefinition.java   
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {

    Expression exp = getExpression().createExpression(routeContext);
    boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
    boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint();

    Enricher enricher = new Enricher(exp);
    enricher.setShareUnitOfWork(isShareUnitOfWork);
    enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
    AggregationStrategy strategy = createAggregationStrategy(routeContext);
    if (strategy != null) {
        enricher.setAggregationStrategy(strategy);
    }
    if (aggregateOnException != null) {
        enricher.setAggregateOnException(aggregateOnException);
    }
    return enricher;
}
项目:Camel    文件:RouteService.java   
/**
 * Gather the route scoped error handler from the given route
 */
private void doGetRouteScopedErrorHandler(Set<Service> services, Route route) {
    // only include error handlers if they are route scoped
    boolean includeErrorHandler = !routeDefinition.isContextScopedErrorHandler(route.getRouteContext().getCamelContext());
    List<Service> extra = new ArrayList<Service>();
    if (includeErrorHandler) {
        for (Service service : services) {
            if (service instanceof Channel) {
                Processor eh = ((Channel) service).getErrorHandler();
                if (eh != null && eh instanceof Service) {
                    extra.add((Service) eh);
                }
            }
        }
    }
    if (!extra.isEmpty()) {
        services.addAll(extra);
    }
}
项目:syndesis    文件:SqlStartConnectorComponent.java   
@Override
public Processor getBeforeProducer() {

    final Processor processor = exchange -> {
        final String body = exchange.getIn().getBody(String.class);
        if (body!=null) {
            final Properties properties = JSONBeanUtil.parsePropertiesFromJSONBean(body);
            exchange.getIn().setBody(properties);
        }
    };
    return processor;
}
项目:syndesis    文件:RecordSplitterEndpoint.java   
/**
 * Creates a consumer endpoint that splits up the List of Maps into exchanges of single
 * Maps, and within each exchange it converts each Map to JSON.
 */
@Override
public Consumer createConsumer(final Processor processor) throws Exception {
    final ToJSONProcessor toJsonProcessor = new ToJSONProcessor();
    Processor pipeline = Pipeline.newInstance(getCamelContext(), toJsonProcessor, processor);
    final Expression expression = ExpressionBuilder.bodyExpression(List.class);
    final Splitter splitter = new Splitter(getCamelContext(), expression, pipeline, null);
    return endpoint.createConsumer(splitter);
}
项目:syndesis    文件:SqlConnectorComponent.java   
@Override
public Processor getBeforeProducer() {

    final Processor processor = exchange -> {
        final String body = exchange.getIn().getBody(String.class);
        if (body!=null) {
            final Properties properties = JSONBeanUtil.parsePropertiesFromJSONBean(body);
            exchange.getIn().setBody(properties);
        }
    };
    return processor;
}