Java 类org.apache.camel.component.leveldb.LevelDBAggregationRepository 实例源码

项目:camelinaction2    文件:AggregateABCLevelDBTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            LevelDBAggregationRepository levelDB = 
                new LevelDBAggregationRepository("myrepo", "data/myrepo.dat");

            from("file:target/inbox")
                // do a little logging when we load the file
                .log("Consuming file ${file:name}")
                .convertBodyTo(String.class)
                // just aggregate all messages
                .aggregate(constant(true), new MyAggregationStrategy())
                    // use LevelDB as the persistent repository
                    .aggregationRepository(levelDB)
                    // and complete when we got 3 messages
                    .completionSize(3)
                    // do a little logging for the published message
                    .log("Sending out ${body}")
                    // and send it to the mock
                    .to("mock:result");
        }
    };
}
项目:camelinaction2    文件:AggregateABCRecoverTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            LevelDBAggregationRepository levelDB = 
                new LevelDBAggregationRepository("myrepo", "data/myrepo.dat");
            // will recover by default
            levelDB.setUseRecovery(true);
            // try at most 4 times
            levelDB.setMaximumRedeliveries(4);
            // send to mock:dead if exhausted
            levelDB.setDeadLetterUri("mock:dead");
            // have it retry every 3th second
            levelDB.setRecoveryInterval(3000);

            from("direct:start")
                // do a little logging
                .log("Sending ${body} with correlation key ${header.myId}")
                // aggregate based on header correlation key
                // use class MyAggregationStrategy for aggregation
                // and complete when we have aggregated 3 messages
                .aggregate(header("myId"), new MyAggregationStrategy())
                    // use LevelDB as the persistent repository
                    .aggregationRepository(levelDB)
                    // and complete when we got 3 messages
                    .completionSize(3)
                    // do a little logging for the published message
                    .log("Sending out ${body}")
                    // use a mock to check recovery
                    .to("mock:aggregate")
                    // force failure to have the message being recovered
                    .throwException(new IllegalArgumentException("Damn does not work"))
                    // and send it to the mock (not possible, due exception being thrown)
                    .to("mock:result");
        }
    };
}
项目:wildfly-camel    文件:LevelDBIntegrationTest.java   
@Test
public void testLevelDBAggregate() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {

            LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo1", "target/leveldb/leveldb.dat");

            from("direct:start")
                .aggregate(header("id"), new MyAggregationStrategy())
                    .completionSize(5).aggregationRepository(repo)
                    .to("mock:aggregated");
        }
    });

    MockEndpoint mockAggregated = camelctx.getEndpoint("mock:aggregated", MockEndpoint.class);
    mockAggregated.expectedBodiesReceived("ABCDE");

    camelctx.start();
    try {
        ProducerTemplate template = camelctx.createProducerTemplate();

        template.sendBodyAndHeader("direct:start", "A", "id", 123);
        template.sendBodyAndHeader("direct:start", "B", "id", 123);
        template.sendBodyAndHeader("direct:start", "C", "id", 123);
        template.sendBodyAndHeader("direct:start", "D", "id", 123);
        template.sendBodyAndHeader("direct:start", "E", "id", 123);

        MockEndpoint.assertIsSatisfied(camelctx, 30, TimeUnit.SECONDS);
    } finally {
        camelctx.stop();
    }
}
项目:camel-example    文件:AggregatorTest.java   
@Override
@Before
public void setUp() throws Exception {
    repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
    super.setUp();
}