Java 类org.apache.camel.processor.idempotent.MemoryIdempotentRepository 实例源码

项目:Camel    文件:IdempotentConsumerEagerTest.java   
public void testDuplicateMessagesAreFilteredOut() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").idempotentConsumer(
                    header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).eager(false).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerEagerTest.java   
public void testNotEager() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            final IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);

            from("direct:start").idempotentConsumer(header("messageId"), repo).eager(false).
                    process(new Processor() {
                        public void process(Exchange exchange) throws Exception {
                            String id = exchange.getIn().getHeader("messageId", String.class);
                            // should not contain
                            assertFalse("Should not eager add to repo", repo.contains(id));
                        }
                    }).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerEagerTest.java   
public void testEager() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            final IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);

            from("direct:start").idempotentConsumer(header("messageId"), repo).eager(true).
                    process(new Processor() {
                        public void process(Exchange exchange) throws Exception {
                            String id = exchange.getIn().getHeader("messageId", String.class);
                            // should contain
                            assertTrue("Should eager add to repo", repo.contains(id));
                        }
                    }).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerTest.java   
public void testDuplicateMessagesAreFilteredOut() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").idempotentConsumer(
                    header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerRemoveOnFailureTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").idempotentConsumer(header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200))
                    // in case of a failure we still want the message to be regarded as a duplicate, so we set the option to false
                    .removeOnFailure(false)
                    .process(new Processor() {
                        public void process(Exchange exchange) throws Exception {
                            String id = exchange.getIn().getHeader("messageId", String.class);
                            if (id.equals("2")) {
                                throw new IllegalArgumentException("Damn I cannot handle id 2");
                            }
                        }
                    }).to("mock:result");
        }
    };
}
项目:Camel    文件:IdempotentConsumerAsyncTest.java   
public void testDuplicateMessagesAreFilteredOut() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").idempotentConsumer(
                    header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).threads().to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceivedInAnyOrder("one", "two", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerConcurrentTest.java   
public void testDuplicateMessagesAreFilteredOut() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").idempotentConsumer(header("messageId"),
                    MemoryIdempotentRepository.memoryIdempotentRepository(200)).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerNoMessageIdTest.java   
public void testNoMessageId() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            errorHandler(deadLetterChannel("mock:dead"));

            from("direct:start").idempotentConsumer(
                    header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two");

    getMockEndpoint("mock:dead").expectedBodiesReceived("Hello World");

    sendMessage("1", "one");
    template.sendBody("direct:start", "Hello World");
    sendMessage("2", "two");
    sendMessage("1", "one");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:RouteBuilderTest.java   
public void testIdempotentConsumer() throws Exception {

        List<Route> routes = buildIdempotentConsumer();

        log.debug("Created routes: " + routes);

        assertEquals("Number routes created", 1, routes.size());
        for (Route route : routes) {
            Endpoint key = route.getEndpoint();
            assertEquals("From endpoint", "direct://a", key.getEndpointUri());

            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
            Channel channel = unwrapChannel(consumer.getProcessor());

            IdempotentConsumer idempotentConsumer = assertIsInstanceOf(IdempotentConsumer.class, channel.getNextProcessor());
            assertEquals("messageIdExpression", "header(myMessageId)", idempotentConsumer.getMessageIdExpression().toString());

            assertIsInstanceOf(MemoryIdempotentRepository.class, idempotentConsumer.getIdempotentRepository());
            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, unwrapChannel(idempotentConsumer.getProcessor()).getNextProcessor());
            assertEquals("Endpoint URI", "direct://b", sendProcessor.getDestination().getEndpointUri());
        }
    }
项目:Camel    文件:FileBrowsableEndpointTest.java   
public void testBrowsableOneFile() throws Exception {
    template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt");

    FileEndpoint endpoint = context.getEndpoint("file:target/browse", FileEndpoint.class);
    assertNotNull(endpoint);

    MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
    assertEquals(0, repo.getCacheSize());

    List<Exchange> list = endpoint.getExchanges();
    assertNotNull(list);
    assertEquals(1, list.size());

    assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));

    // the in progress repo should not leak
    assertEquals(0, repo.getCacheSize());

    // and the file is still there
    File file = new File("target/browse/a.txt");
    assertTrue("File should exist " + file, file.exists());
}
项目:Camel    文件:FileBrowsableEndpointTest.java   
public void testBrowsableTwoFiles() throws Exception {
    template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt");
    template.sendBodyAndHeader("file:target/browse", "B", Exchange.FILE_NAME, "b.txt");

    FileEndpoint endpoint = context.getEndpoint("file:target/browse?sortBy=file:name", FileEndpoint.class);
    assertNotNull(endpoint);

    MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
    assertEquals(0, repo.getCacheSize());

    List<Exchange> list = endpoint.getExchanges();
    assertNotNull(list);
    assertEquals(2, list.size());

    assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
    assertEquals("b.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME));

    // the in progress repo should not leak
    assertEquals(0, repo.getCacheSize());

    // and the files is still there
    File fileA = new File("target/browse/a.txt");
    assertTrue("File should exist " + fileA, fileA.exists());
    File fileB = new File("target/browse/b.txt");
    assertTrue("File should exist " + fileB, fileB.exists());
}
项目:Camel    文件:FtpBrowsableEndpointTest.java   
@Test
public void testBrowsableOneFile() throws Exception {
    template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt");

    FtpEndpoint<?> endpoint = context.getEndpoint(getFtpUrl(), FtpEndpoint.class);
    assertNotNull(endpoint);

    MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
    assertEquals(0, repo.getCacheSize());

    List<Exchange> list = endpoint.getExchanges();
    assertNotNull(list);
    assertEquals(1, list.size());

    assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));

    // the in progress repo should not leak
    assertEquals(0, repo.getCacheSize());

    // and the file is still there
    File file = new File(FTP_ROOT_DIR + "/browse/a.txt");
    assertTrue("File should exist " + file, file.exists());
}
项目:camel-cookbook-examples    文件:IdempotentConsumerSkipDuplicateRoute.java   
@Override
public void configure() throws Exception {
    from("direct:in")
        .log("Received message ${header[messageId]}")
        .idempotentConsumer(header("messageId"), new MemoryIdempotentRepository()).skipDuplicate(false)
            .choice()
                .when(exchangeProperty(Exchange.DUPLICATE_MESSAGE))
                    .log("Duplicate")
                    .to("mock:duplicate")
                .otherwise()
                    .to("mock:ws")
            .endChoice()
        .end()
        .log("Completing")
        .to("mock:out");
}
项目:sponge    文件:SpringConfiguration.java   
/**
 * Camel routes for reading RSS feeds. Routes could be also defined in XML, Groovy or scripting knowledge bases.
 *
 * @return route builder.
 */
@Bean
public RouteBuilder rssInputRoute() {
    return new RouteBuilder() {

        // @formatter:off
        @SuppressWarnings("unchecked")
        @Override
        public void configure() throws Exception {
            EngineOperations operations = camelRssEngine().getOperations();
            Map<String, String> rssSources = operations.getVariable(Map.class, CamelRssConstants.VAR_RSS_SOURCES);

            // Read RSS feeds from all configured sources.
            rssSources.forEach((source, url) ->
                    from("rss:" + url + operations.getVariable(CamelRssConstants.VAR_RSS_ENDPOINT_PARAMETERS, "")).routeId(source)
                        .setHeader(HEADER_SOURCE).constant(source)
                        .to("direct:rss"));

            // Gathers RSS from different sources and sends to Sponge engine as a normalized event.
            from("direct:rss").routeId("rss")
                    .marshal().rss()
                    // Deduplicate by title.
                    .idempotentConsumer(xpath("/rss/channel/item/title/text()"),
                            MemoryIdempotentRepository.memoryIdempotentRepository())
                    // Conversion from RSS XML to Sponge event with attributes.
                    .process((exchange) -> exchange.getIn().setBody(operations.event("news")
                            .set("source", exchange.getIn().getHeader(HEADER_SOURCE))
                            .set("channel", CamelUtils.xpath(exchange, "/rss/channel/title/text()"))
                            .set("title", CamelUtils.xpath(exchange, "/rss/channel/item/title/text()"))
                            .set("link", CamelUtils.xpath(exchange, "/rss/channel/item/link/text()"))
                            .set("description", CamelUtils.xpath(exchange, "/rss/channel/item/description/text()"))
                            .make()))
                    //.filter((exchange) -> false)
                    .to("sponge:camelRssEngine");
        }
        // @formatter:on
    };
}
项目:Camel    文件:IdempotentConsumerEagerTest.java   
public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false));

            from("direct:start").idempotentConsumer(
                    header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).eager(false).process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    String id = exchange.getIn().getHeader("messageId", String.class);
                    if (id.equals("2")) {
                        throw new IllegalArgumentException("Damm I cannot handle id 2");
                    }
                }
            }).to("mock:result");
        }
    });
    context.start();

    // we send in 2 messages with id 2 that fails
    getMockEndpoint("mock:error").expectedMessageCount(2);
    resultEndpoint.expectedBodiesReceived("one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerEagerTest.java   
public void testFailedExchangesNotAdded() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").idempotentConsumer(
                    header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).eager(false).process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    String id = exchange.getIn().getHeader("messageId", String.class);
                    if (id.equals("2")) {
                        throw new IllegalArgumentException("Damm I cannot handle id 2");
                    }
                }
            }).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerTest.java   
public void testNotSkiDuplicate() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);

            from("direct:start")
                .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
                .to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "one", "two", "one", "three");
    resultEndpoint.message(0).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull();
    resultEndpoint.message(1).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull();
    resultEndpoint.message(2).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
    resultEndpoint.message(3).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
    resultEndpoint.message(4).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
    resultEndpoint.message(5).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull();

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerTest.java   
public void testNotSkiDuplicateWithFilter() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);

            // START SNIPPET: e1
            from("direct:start")
                // instruct idempotent consumer to not skip duplicates as we will filter then our self
                .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
                .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
                    // filter out duplicate messages by sending them to someplace else and then stop
                    .to("mock:duplicate")
                    .stop()
                .end()
                // and here we process only new messages (no duplicates)
                .to("mock:result");
            // END SNIPPET: e1
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "two", "three");

    getMockEndpoint("mock:duplicate").expectedBodiesReceived("one", "two", "one");
    getMockEndpoint("mock:duplicate").allMessages().exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerTest.java   
public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false));

            from("direct:start").idempotentConsumer(
                    header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    String id = exchange.getIn().getHeader("messageId", String.class);
                    if (id.equals("2")) {
                        throw new IllegalArgumentException("Damm I cannot handle id 2");
                    }
                }
            }).to("mock:result");
        }
    });
    context.start();

    // we send in 2 messages with id 2 that fails
    getMockEndpoint("mock:error").expectedMessageCount(2);
    resultEndpoint.expectedBodiesReceived("one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerTest.java   
public void testFailedExchangesNotAddedDeadLetterChannelNotHandled() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false));

            from("direct:start").idempotentConsumer(
                header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    String id = exchange.getIn().getHeader("messageId", String.class);
                    if (id.equals("2")) {
                        throw new IllegalArgumentException("Damm I cannot handle id 2");
                    }
                }
            }).to("mock:result");
        }
    });
    context.start();

    // we send in 2 messages with id 2 that fails
    getMockEndpoint("mock:error").expectedMessageCount(2);
    resultEndpoint.expectedBodiesReceived("one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerTest.java   
public void testFailedExchangesNotAdded() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            // use default error handler
            errorHandler(defaultErrorHandler());

            from("direct:start").idempotentConsumer(
                    header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    String id = exchange.getIn().getHeader("messageId", String.class);
                    if (id.equals("2")) {
                        throw new IllegalArgumentException("Damm I cannot handle id 2");
                    }
                }
            }).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerCompletionEagerTest.java   
public void testCompletionEager() throws Exception {
    repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            errorHandler(deadLetterChannel("mock:dead"));

            from("direct:start")
                .idempotentConsumer(header("messageId"), repo).completionEager(true)
                    .to("log:a", "mock:a")
                    .to("log:b", "mock:b")
                .end()
                .filter(simple("${header.messageId} == '2'"))
                    .throwException(new IllegalArgumentException("Forced"))
                .end()
                .to("log:result", "mock:result");
        }
    });
    context.start();

    // we are on block only scope as "two" was success in the block, and then "two" failed afterwards does not matter
    // the idempotent consumer will not receive "two" again
    a.expectedBodiesReceived("one", "two", "three");
    b.expectedBodiesReceived("one", "two", "three");
    dead.expectedBodiesReceived("two", "two");
    resultEndpoint.expectedBodiesReceived("one", "one", "one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerCompletionEagerTest.java   
public void testNotCompletionEager() throws Exception {
    repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            errorHandler(deadLetterChannel("mock:dead"));

            from("direct:start")
                .idempotentConsumer(header("messageId"), repo).completionEager(false)
                    .to("log:a", "mock:a")
                    .to("log:b", "mock:b")
                .end()
                .filter(simple("${header.messageId} == '2'"))
                    .throwException(new IllegalArgumentException("Forced"))
                .end()
                .to("log:result", "mock:result");
        }
    });
    context.start();

    // we are on completion scope so the "two" will rollback and therefore the idempotent consumer receives those again
    a.expectedBodiesReceived("one", "two", "two", "three");
    b.expectedBodiesReceived("one", "two", "two", "three");
    dead.expectedBodiesReceived("two", "two");
    resultEndpoint.expectedBodiesReceived("one", "one", "one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerAsyncTest.java   
public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false));

            from("direct:start").idempotentConsumer(
                    header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).threads().process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    String id = exchange.getIn().getHeader("messageId", String.class);
                    if (id.equals("2")) {
                        throw new IllegalArgumentException("Damn I cannot handle id 2");
                    }
                }
            }).to("mock:result");
        }
    });
    context.start();

    // we send in 2 messages with id 2 that fails
    getMockEndpoint("mock:error").expectedMessageCount(2);
    resultEndpoint.expectedBodiesReceivedInAnyOrder("one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerAsyncTest.java   
public void testFailedExchangesNotAdded() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").idempotentConsumer(
                    header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
            ).threads().process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    String id = exchange.getIn().getHeader("messageId", String.class);
                    if (id.equals("2")) {
                        throw new IllegalArgumentException("Damn I cannot handle id 2");
                    }
                }
            }).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceivedInAnyOrder("one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerConcurrentTest.java   
public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false));

            from("direct:start").idempotentConsumer(header("messageId"),
                    MemoryIdempotentRepository.memoryIdempotentRepository(200))
                    .process(new Processor() {
                        public void process(Exchange exchange) throws Exception {
                            String id = exchange.getIn().getHeader("messageId", String.class);
                            if (id.equals("2")) {
                                throw new IllegalArgumentException("Damm I cannot handle id 2");
                            }
                        }
                    }).to("mock:result");
        }
    });
    context.start();

    // we send in 2 messages with id 2 that fails
    getMockEndpoint("mock:error").expectedMessageCount(2);
    resultEndpoint.expectedBodiesReceived("one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:IdempotentConsumerConcurrentTest.java   
public void testFailedExchangesNotAdded() throws Exception {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").idempotentConsumer(header("messageId"),
                    MemoryIdempotentRepository.memoryIdempotentRepository(200))
                    .process(new Processor() {
                        public void process(Exchange exchange) throws Exception {
                            String id = exchange.getIn().getHeader("messageId", String.class);
                            if (id.equals("2")) {
                                throw new IllegalArgumentException("Damm I cannot handle id 2");
                            }
                        }
                    }).to("mock:result");
        }
    });
    context.start();

    resultEndpoint.expectedBodiesReceived("one", "three");

    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("2", "two");
    sendMessage("1", "one");
    sendMessage("3", "three");

    assertMockEndpointsSatisfied();
}
项目:Camel    文件:ManagedMemoryIdempotentClearTest.java   
@Override
protected void setUp() throws Exception {
    repo = MemoryIdempotentRepository.memoryIdempotentRepository();
    // lets start with 4
    repo.add("4");

    super.setUp();
    startEndpoint = resolveMandatoryEndpoint("direct:start");
    resultEndpoint = getMockEndpoint("mock:result");
}
项目:Camel    文件:ManagedMemoryIdempotentConsumerTest.java   
@Override
protected void setUp() throws Exception {
    repo = MemoryIdempotentRepository.memoryIdempotentRepository();
    // lets start with 4
    repo.add("4");

    super.setUp();
    startEndpoint = resolveMandatoryEndpoint("direct:start");
    resultEndpoint = getMockEndpoint("mock:result");
}
项目:Camel    文件:RouteBuilderTest.java   
protected List<Route> buildIdempotentConsumer() throws Exception {
    // START SNIPPET: idempotent
    RouteBuilder builder = new RouteBuilder() {
        public void configure() {
            errorHandler(deadLetterChannel("mock:error"));

            from("direct:a")
                .idempotentConsumer(header("myMessageId"),
                        MemoryIdempotentRepository.memoryIdempotentRepository(200))
                .to("direct:b");
        }
    };
    // END SNIPPET: idempotent
    return getRouteList(builder);
}
项目:Camel    文件:FileBrowsableEndpointTest.java   
public void testBrowsableThreeFilesRecursive() throws Exception {
    template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt");
    template.sendBodyAndHeader("file:target/browse", "B", Exchange.FILE_NAME, "foo/b.txt");
    template.sendBodyAndHeader("file:target/browse", "C", Exchange.FILE_NAME, "bar/c.txt");

    FileEndpoint endpoint = context.getEndpoint("file:target/browse?recursive=true&sortBy=file:name", FileEndpoint.class);
    assertNotNull(endpoint);

    MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
    assertEquals(0, repo.getCacheSize());

    List<Exchange> list = endpoint.getExchanges();
    assertNotNull(list);
    assertEquals(3, list.size());

    assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
    assertEquals("c.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME_ONLY));
    assertEquals("b.txt", list.get(2).getIn().getHeader(Exchange.FILE_NAME_ONLY));

    // the in progress repo should not leak
    assertEquals(0, repo.getCacheSize());

    // and the files is still there
    File fileA = new File("target/browse/a.txt");
    assertTrue("File should exist " + fileA, fileA.exists());
    File fileB = new File("target/browse/foo/b.txt");
    assertTrue("File should exist " + fileB, fileB.exists());
    File fileC = new File("target/browse/bar/c.txt");
    assertTrue("File should exist " + fileC, fileC.exists());
}
项目:Camel    文件:FileConsumerIdempotentTest.java   
public void testIdempotent() throws Exception {
    // consume the file the first time
    MockEndpoint mock = getMockEndpoint("mock:result");
    mock.expectedBodiesReceived("Hello World");

    assertMockEndpointsSatisfied();

    oneExchangeDone.matchesMockWaitTime();

    // reset mock and set new expectations
    mock.reset();
    mock.expectedMessageCount(0);

    // move file back
    File file = new File("target/idempotent/done/report.txt");
    File renamed = new File("target/idempotent/report.txt");
    file.renameTo(renamed);

    // should NOT consume the file again, let a bit time pass to let the consumer try to consume it but it should not
    Thread.sleep(100);
    assertMockEndpointsSatisfied();

    FileEndpoint fe = context.getEndpoint(uri, FileEndpoint.class);
    assertNotNull(fe);

    MemoryIdempotentRepository repo = (MemoryIdempotentRepository) fe.getInProgressRepository();
    assertEquals("Should be no in-progress files", 0, repo.getCacheSize());
}
项目:Camel    文件:RemoteFileEndpoint.java   
@Override
public RemoteFileConsumer<T> createConsumer(Processor processor) throws Exception {
    afterPropertiesSet();
    RemoteFileConsumer<T> consumer = buildConsumer(processor);

    if (isDelete() && getMove() != null) {
        throw new IllegalArgumentException("You cannot both set delete=true and move options");
    }

    // if noop=true then idempotent should also be configured
    if (isNoop() && !isIdempotentSet()) {
        log.info("Endpoint is configured with noop=true so forcing endpoint to be idempotent as well");
        setIdempotent(true);
    }

    // if idempotent and no repository set then create a default one
    if (isIdempotentSet() && isIdempotent() && idempotentRepository == null) {
        log.info("Using default memory based idempotent repository with cache max size: " + DEFAULT_IDEMPOTENT_CACHE_SIZE);
        idempotentRepository = MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
    }

    if (!getConfiguration().isUseList() && getFileName() == null) {
        throw new IllegalArgumentException("Endpoint is configured with useList=false, then fileName must be configured also");
    }

    // set max messages per poll
    consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
    consumer.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());

    configureConsumer(consumer);
    return consumer;
}
项目:Camel    文件:FtpBrowsableEndpointTest.java   
@Test
public void testBrowsableTwoFiles() throws Exception {
    template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt");
    template.sendBodyAndHeader(getFtpUrl(), "B", Exchange.FILE_NAME, "b.txt");

    FtpEndpoint<?> endpoint = context.getEndpoint(getFtpUrl() + "&sortBy=file:name", FtpEndpoint.class);
    assertNotNull(endpoint);

    MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
    assertEquals(0, repo.getCacheSize());

    List<Exchange> list = endpoint.getExchanges();
    assertNotNull(list);
    assertEquals(2, list.size());

    assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
    assertEquals("b.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME));

    // the in progress repo should not leak
    assertEquals(0, repo.getCacheSize());

    // and the files is still there
    File fileA = new File(FTP_ROOT_DIR + "/browse/a.txt");
    assertTrue("File should exist " + fileA, fileA.exists());
    File fileB = new File(FTP_ROOT_DIR + "/browse/b.txt");
    assertTrue("File should exist " + fileB, fileB.exists());
}
项目:Camel    文件:FtpBrowsableEndpointTest.java   
@Test
public void testBrowsableThreeFilesRecursive() throws Exception {
    template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt");
    template.sendBodyAndHeader(getFtpUrl(), "B", Exchange.FILE_NAME, "foo/b.txt");
    template.sendBodyAndHeader(getFtpUrl(), "C", Exchange.FILE_NAME, "bar/c.txt");

    FtpEndpoint<?> endpoint = context.getEndpoint(getFtpUrl() + "&recursive=true&sortBy=file:name", FtpEndpoint.class);
    assertNotNull(endpoint);

    MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
    assertEquals(0, repo.getCacheSize());

    List<Exchange> list = endpoint.getExchanges();
    assertNotNull(list);
    assertEquals(3, list.size());

    assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
    assertEquals("c.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME_ONLY));
    assertEquals("b.txt", list.get(2).getIn().getHeader(Exchange.FILE_NAME_ONLY));

    // the in progress repo should not leak
    assertEquals(0, repo.getCacheSize());

    // and the files is still there
    File fileA = new File(FTP_ROOT_DIR + "/browse/a.txt");
    assertTrue("File should exist " + fileA, fileA.exists());
    File fileB = new File(FTP_ROOT_DIR + "/browse/foo/b.txt");
    assertTrue("File should exist " + fileB, fileB.exists());
    File fileC = new File(FTP_ROOT_DIR + "/browse/bar/c.txt");
    assertTrue("File should exist " + fileC, fileC.exists());
}
项目:Camel    文件:FtpConsumerIdempotentMemoryRefTest.java   
@Override
protected JndiRegistry createRegistry() throws Exception {
    JndiRegistry jndi = super.createRegistry();
    repo = new MemoryIdempotentRepository();
    repo.setCacheSize(5);
    jndi.bind("myRepo", repo);
    return jndi;
}
项目:camel-cookbook-examples    文件:IdempotentConsumerRoute.java   
@Override
public void configure() throws Exception {
    from("direct:in")
        .log("Received message ${header[messageId]}")
        .idempotentConsumer(header("messageId"), new MemoryIdempotentRepository())
            .log("Invoking WS")
            .to("mock:ws")
        .end()
        .log("Completing")
        .to("mock:out");
}
项目:camel-cookbook-examples    文件:IdempotentConsumerMultipleEndpointsRoute.java   
@Override
public void configure() throws Exception {
    from("direct:in")
        .log("Received message ${header[messageId]}")
        .enrich("direct:invokeWs")
        .log("Completing")
        .to("mock:out");

    from("direct:invokeWs")
        .idempotentConsumer(header("messageId"), new MemoryIdempotentRepository())
            .log("Invoking WS")
            .to("mock:ws");
}
项目:sponge    文件:CamelRssTest.java   
@Bean
@Override
public RouteBuilder route() {
    return new RouteBuilder() {

        @Override
        public void configure() throws Exception {
            // @formatter:off
            // RSS from CNN, body set to the title, deduplicated by body (title), put into Sponge as a camel event
            // (containing exchange).
            from("rss:http://rss.cnn.com/rss/edition.rss?sortEntries=false&consumer.delay=1000").routeId("rss")
                    .marshal().rss()
                    .setBody(xpath("/rss/channel/item/title/text()"))
                    .idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository())
                    .transform(body().prepend("CNN news: "))
                    .to("sponge:spongeEngine");

            // RSS from BBC, deduplicated by title, put into Sponge as an event with attributes containing decomposed feed.
            from("rss:http://feeds.bbci.co.uk/news/world/rss.xml?consumer.delay=1000").routeId("rssDecomposed")
                    .marshal().rss()
                    .idempotentConsumer(xpath("/rss/channel/item/title/text()"),
                            MemoryIdempotentRepository.memoryIdempotentRepository())
                    // Conversion from RSS XML to Sponge event with attributes.
                    .process((exchange) -> {
                        exchange.getIn().setBody(spongeEngine().getOperations().makeEvent("rssDecomposed")
                            .set("source", "BBC")
                            .set("title", CamelUtils.xpath(exchange, "/rss/channel/item/title/text()"))
                            .set("link", CamelUtils.xpath(exchange, "/rss/channel/item/link/text()"))
                            .set("description", CamelUtils.xpath(exchange, "/rss/channel/item/description/text()")));
                    })
                    .to("sponge:spongeEngine");

            from("sponge:spongeEngine").routeId("spongeConsumer")
                    .process(exchange -> spongeEngine().getOperations().getVariable(AtomicInteger.class, "receivedCamelMessages")
                            .incrementAndGet())
                    .log("${body}");

            from("direct:log").routeId("directLog")
                    .process(exchange -> spongeEngine().getOperations().getVariable(AtomicInteger.class, "receivedCamelMessages")
                            .incrementAndGet())
                    .log("${body}");
            // @formatter:on
        }
    };
}