Java 类org.apache.camel.support.SynchronizationAdapter 实例源码

项目:Camel    文件:DefaultProducerTemplateAsyncTest.java   
public void testAsyncCallbackBodyInOnly() throws Exception {
    ORDER.set(0);

    getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");

    final CountDownLatch latch = new CountDownLatch(1);

    template.asyncCallbackSendBody("direct:start", "Hello", new SynchronizationAdapter() {
        @Override
        public void onDone(Exchange exchange) {
            assertEquals("Hello World", exchange.getIn().getBody());
            ORDER.addAndGet(2);
            latch.countDown();
        }
    });

    ORDER.addAndGet(1);
    assertTrue(latch.await(10, TimeUnit.SECONDS));
    ORDER.addAndGet(4);

    assertMockEndpointsSatisfied();
    assertEquals(7, ORDER.get());
}
项目:Camel    文件:DefaultProducerTemplateAsyncTest.java   
public void testAsyncCallbackBodyInOut() throws Exception {
    ORDER.set(0);

    final CountDownLatch latch = new CountDownLatch(1);

    template.asyncCallbackRequestBody("direct:echo", "Hello", new SynchronizationAdapter() {
        @Override
        public void onDone(Exchange exchange) {
            assertEquals("HelloHello", exchange.getOut().getBody());
            ORDER.addAndGet(2);
            latch.countDown();
        }
    });

    ORDER.addAndGet(1);
    assertTrue(latch.await(10, TimeUnit.SECONDS));
    ORDER.addAndGet(4);

    assertEquals(7, ORDER.get());
}
项目:Camel    文件:DefaultProducerTemplateAsyncTest.java   
public void testAsyncCallbackBodyInOnlyGetResult() throws Exception {
    ORDER.set(0);

    getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");

    Future<Object> future = template.asyncCallbackSendBody("direct:start", "Hello", new SynchronizationAdapter() {
        @Override
        public void onDone(Exchange exchange) {
            assertEquals("Hello World", exchange.getIn().getBody());
            ORDER.addAndGet(2);
        }
    });

    ORDER.addAndGet(1);
    Object reply = future.get(10, TimeUnit.SECONDS);
    ORDER.addAndGet(4);

    assertMockEndpointsSatisfied();
    assertEquals(7, ORDER.get());
    // no reply when in only
    assertEquals(null, reply);
}
项目:Camel    文件:DefaultProducerTemplateAsyncTest.java   
public void testAsyncCallbackBodyInOutGetResult() throws Exception {
    ORDER.set(0);

    Future<Object> future = template.asyncCallbackRequestBody("direct:echo", "Hello", new SynchronizationAdapter() {
        @Override
        public void onDone(Exchange exchange) {
            assertEquals("HelloHello", exchange.getOut().getBody());
            ORDER.addAndGet(2);
        }
    });

    ORDER.addAndGet(1);
    Object reply = future.get(10, TimeUnit.SECONDS);
    ORDER.addAndGet(4);

    assertEquals(7, ORDER.get());
    assertEquals("HelloHello", reply);
}
项目:Camel    文件:RouteboxDispatcher.java   
public Exchange dispatchAsync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
    URI dispatchUri;
    Exchange reply;

    if (LOG.isDebugEnabled()) {
        LOG.debug("Dispatching exchange {} to endpoint {}", exchange, endpoint.getEndpointUri());
    }

    dispatchUri = selectDispatchUri(endpoint, exchange);

    if (exchange.getPattern() == ExchangePattern.InOnly) {
        producer.asyncSend(dispatchUri.toASCIIString(), exchange);
        reply = exchange;
    } else {
        Future<Exchange> future = producer.asyncCallback(dispatchUri.toASCIIString(), exchange, new SynchronizationAdapter());
        reply = future.get(endpoint.getConfig().getConnectionTimeout(), TimeUnit.MILLISECONDS);
    }

    return reply;
}
项目:camelinaction    文件:CamelCallbackTest.java   
@Test
public void testCamelCallback() throws Exception {
    // echos is the list of replies which could be modified by multiple thread
    final List<String> echos = new CopyOnWriteArrayList<String>();
    final CountDownLatch latch = new CountDownLatch(3);

    // use this callback to gather the replies and add it to the echos list
    Synchronization callback = new SynchronizationAdapter() {
        @Override
        public void onDone(Exchange exchange) {
            // get the reply and add it to echoes
            echos.add(exchange.getOut().getBody(String.class));
            // count down latch when we receive a response
            latch.countDown();
        }
    };

    // now submit 3 async request/reply messages and use the same callback to
    // handle the replies
    template.asyncCallbackRequestBody("seda:echo", "Donkey", callback);
    template.asyncCallbackRequestBody("seda:echo", "Tiger", callback);
    template.asyncCallbackRequestBody("seda:echo", "Camel", callback);

    // wait until the messages is done, or timeout after 6 seconds
    latch.await(6, TimeUnit.SECONDS);

    // assert we got 3 replies
    assertEquals(3, echos.size());
    List result = new ArrayList(echos);
    // sort list so we can assert by index
    Collections.sort(result);
    assertEquals("CamelCamel", result.get(0));
    assertEquals("DonkeyDonkey", result.get(1));
    assertEquals("TigerTiger", result.get(2));
}
项目:wildfly-camel    文件:GRPCIntegrationTest.java   
@Test
public void testGRPCAsynchronousProducer() throws Exception {

    int port = Integer.parseInt(AvailablePortFinder.readServerData("grpc-port"));

    DefaultCamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start")
            .toF("grpc://localhost:%d/%s?method=pingAsyncSync", port, "org.wildfly.camel.test.grpc.subA.PingPong");
        }
    });

    camelctx.start();
    try {
        final CountDownLatch latch = new CountDownLatch(1);

        PingRequest pingRequest = PingRequest.newBuilder()
            .setPingName(GRPC_TEST_PING_VALUE)
            .setPingId(GRPC_TEST_PING_ID_1)
            .build();

        ProducerTemplate template = camelctx.createProducerTemplate();
        template.asyncCallbackSendBody("direct:start", pingRequest, new SynchronizationAdapter() {
            @Override
            public void onComplete(Exchange exchange) {
                latch.countDown();

                List<PongResponse> response = exchange.getOut().getBody(List.class);
                Assert.assertEquals(2, response.size());
                Assert.assertEquals(response.get(0).getPongId(), GRPC_TEST_PONG_ID_1);
                Assert.assertEquals(response.get(0).getPongName(), GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE);
                Assert.assertEquals(response.get(1).getPongId(), GRPC_TEST_PONG_ID_2);
            }
        });

        Assert.assertTrue("Gave up waiting for latch", latch.await(5, TimeUnit.SECONDS));
    } finally {
        camelctx.stop();
    }
}