Java 类org.apache.camel.spi.Synchronization 实例源码

项目:Camel    文件:UnitOfWorkHelper.java   
public static void beforeRouteSynchronizations(Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) {
    if (synchronizations != null && !synchronizations.isEmpty()) {
        // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
        List<Synchronization> copy = new ArrayList<Synchronization>(synchronizations);

        // reverse so we invoke it FILO style instead of FIFO
        Collections.reverse(copy);
        // and honor if any was ordered by sorting it accordingly
        Collections.sort(copy, new OrderedComparator());

        // invoke synchronization callbacks
        for (Synchronization synchronization : copy) {
            if (synchronization instanceof SynchronizationRouteAware) {
                try {
                    log.trace("Invoking synchronization.onBeforeRoute: {} with {}", synchronization, exchange);
                    ((SynchronizationRouteAware) synchronization).onBeforeRoute(route, exchange);
                } catch (Throwable e) {
                    // must catch exceptions to ensure all synchronizations have a chance to run
                    log.warn("Exception occurred during onBeforeRoute. This exception will be ignored.", e);
                }
            }
        }
    }
}
项目:Camel    文件:UnitOfWorkHelper.java   
public static void afterRouteSynchronizations(Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) {
    if (synchronizations != null && !synchronizations.isEmpty()) {
        // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
        List<Synchronization> copy = new ArrayList<Synchronization>(synchronizations);

        // reverse so we invoke it FILO style instead of FIFO
        Collections.reverse(copy);
        // and honor if any was ordered by sorting it accordingly
        Collections.sort(copy, new OrderedComparator());

        // invoke synchronization callbacks
        for (Synchronization synchronization : copy) {
            if (synchronization instanceof SynchronizationRouteAware) {
                try {
                    log.trace("Invoking synchronization.onAfterRoute: {} with {}", synchronization, exchange);
                    ((SynchronizationRouteAware) synchronization).onAfterRoute(route, exchange);
                } catch (Throwable e) {
                    // must catch exceptions to ensure all synchronizations have a chance to run
                    log.warn("Exception occurred during onAfterRoute. This exception will be ignored.", e);
                }
            }
        }
    }
}
项目:Camel    文件:DefaultConsumerTemplate.java   
public void doneUoW(Exchange exchange) {
    try {
        // The receiveBody method will get a null exchange
        if (exchange == null) {
            return;
        }
        if (exchange.getUnitOfWork() == null) {
            // handover completions and done them manually to ensure they are being executed
            List<Synchronization> synchronizations = exchange.handoverCompletions();
            UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG);
        } else {
            // done the unit of work
            exchange.getUnitOfWork().done(exchange);
        }
    } catch (Throwable e) {
        LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
                + ". This exception will be ignored.", e);
    }
}
项目:Camel    文件:DefaultProducerTemplate.java   
private Future<Object> asyncCallback(final Endpoint endpoint, final ExchangePattern pattern, final Object body, final Synchronization onCompletion) {
    Callable<Object> task = new Callable<Object>() {
        public Object call() throws Exception {
            Exchange answer = send(endpoint, pattern, createSetBodyProcessor(body));

            // invoke callback before returning answer
            // as it allows callback to be used without unit of work invoking it
            // and thus it works directly from a producer template as well, as opposed
            // to the unit of work that is injected in routes
            if (answer.isFailed()) {
                onCompletion.onFailure(answer);
            } else {
                onCompletion.onComplete(answer);
            }

            Object result = extractResultBody(answer, pattern);
            if (pattern.isOutCapable()) {
                return result;
            } else {
                // return null if not OUT capable
                return null;
            }
        }
    };
    return getExecutorService().submit(task);
}
项目:Camel    文件:DefaultProducerTemplate.java   
public Future<Exchange> asyncCallback(final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) {
    Callable<Exchange> task = new Callable<Exchange>() {
        public Exchange call() throws Exception {
            // process the exchange, any exception occurring will be caught and set on the exchange
            send(endpoint, exchange);

            // invoke callback before returning answer
            // as it allows callback to be used without unit of work invoking it
            // and thus it works directly from a producer template as well, as opposed
            // to the unit of work that is injected in routes
            if (exchange.isFailed()) {
                onCompletion.onFailure(exchange);
            } else {
                onCompletion.onComplete(exchange);
            }
            return exchange;
        }
    };
    return getExecutorService().submit(task);
}
项目:Camel    文件:DefaultProducerTemplate.java   
public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) {
    Callable<Exchange> task = new Callable<Exchange>() {
        public Exchange call() throws Exception {
            // process the exchange, any exception occurring will be caught and set on the exchange
            Exchange answer = send(endpoint, processor);

            // invoke callback before returning answer
            // as it allows callback to be used without unit of work invoking it
            // and thus it works directly from a producer template as well, as opposed
            // to the unit of work that is injected in routes
            if (answer.isFailed()) {
                onCompletion.onFailure(answer);
            } else {
                onCompletion.onComplete(answer);
            }
            return answer;
        }
    };
    return getExecutorService().submit(task);
}
项目:Camel    文件:DefaultUnitOfWork.java   
public void handoverSynchronization(Exchange target) {
    if (synchronizations == null || synchronizations.isEmpty()) {
        return;
    }

    Iterator<Synchronization> it = synchronizations.iterator();
    while (it.hasNext()) {
        Synchronization synchronization = it.next();

        boolean handover = true;
        if (synchronization instanceof SynchronizationVetoable) {
            SynchronizationVetoable veto = (SynchronizationVetoable) synchronization;
            handover = veto.allowHandover();
        }

        if (handover) {
            log.trace("Handover synchronization {} to: {}", synchronization, target);
            target.addOnCompletion(synchronization);
            // remove it if its handed over
            it.remove();
        } else {
            log.trace("Handover not allow for synchronization {}", synchronization);
        }
    }
}
项目:Camel    文件:XsltBuilderTest.java   
public void testXsltOutputFileDelete() throws Exception {
    URL styleSheet = getClass().getResource("example.xsl");

    XsltBuilder builder = XsltBuilder.xslt(styleSheet).outputFile().deleteOutputFile();

    Exchange exchange = new DefaultExchange(context);
    exchange.getIn().setBody("<hello>world!</hello>");
    exchange.getIn().setHeader(Exchange.XSLT_FILE_NAME, "target/xslt/xsltout.xml");

    builder.process(exchange);
    assertIsInstanceOf(File.class, exchange.getOut().getBody());

    File file = new File("target/xslt/xsltout.xml");
    assertTrue("Output file should exist", file.exists());

    String body = exchange.getOut().getBody(String.class);
    assertTrue(body.endsWith("<goodbye>world!</goodbye>"));

    // now done the exchange
    List<Synchronization> onCompletions = exchange.handoverCompletions();
    UnitOfWorkHelper.doneSynchronizations(exchange, onCompletions, log);

    // the file should be deleted
    assertFalse("Output file should be deleted", file.exists());
}
项目:Camel    文件:UnitOfWorkHelper.java   
public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) {
    boolean failed = exchange.isFailed();

    if (synchronizations != null && !synchronizations.isEmpty()) {
        // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
        List<Synchronization> copy = new ArrayList<Synchronization>(synchronizations);

        // reverse so we invoke it FILO style instead of FIFO
        Collections.reverse(copy);
        // and honor if any was ordered by sorting it accordingly
        Collections.sort(copy, new OrderedComparator());

        // invoke synchronization callbacks
        for (Synchronization synchronization : copy) {
            try {
                if (failed) {
                    log.trace("Invoking synchronization.onFailure: {} with {}", synchronization, exchange);
                    synchronization.onFailure(exchange);
                } else {
                    log.trace("Invoking synchronization.onComplete: {} with {}", synchronization, exchange);
                    synchronization.onComplete(exchange);
                }
            } catch (Throwable e) {
                // must catch exceptions to ensure all synchronizations have a chance to run
                log.warn("Exception occurred during onCompletion. This exception will be ignored.", e);
            }
        }
    }
}
项目:Camel    文件:DefaultUnitOfWork.java   
public synchronized void addSynchronization(Synchronization synchronization) {
    if (synchronizations == null) {
        synchronizations = new ArrayList<Synchronization>();
    }
    log.trace("Adding synchronization {}", synchronization);
    synchronizations.add(synchronization);
}
项目:Camel    文件:DefaultExchange.java   
public void setUnitOfWork(UnitOfWork unitOfWork) {
    this.unitOfWork = unitOfWork;
    if (unitOfWork != null && onCompletions != null) {
        // now an unit of work has been assigned so add the on completions
        // we might have registered already
        for (Synchronization onCompletion : onCompletions) {
            unitOfWork.addSynchronization(onCompletion);
        }
        // cleanup the temporary on completion list as they now have been registered
        // on the unit of work
        onCompletions.clear();
        onCompletions = null;
    }
}
项目:Camel    文件:DefaultExchange.java   
public void addOnCompletion(Synchronization onCompletion) {
    if (unitOfWork == null) {
        // unit of work not yet registered so we store the on completion temporary
        // until the unit of work is assigned to this exchange by the unit of work
        if (onCompletions == null) {
            onCompletions = new ArrayList<Synchronization>();
        }
        onCompletions.add(onCompletion);
    } else {
        getUnitOfWork().addSynchronization(onCompletion);
    }
}
项目:Camel    文件:DefaultExchange.java   
public boolean containsOnCompletion(Synchronization onCompletion) {
    if (unitOfWork != null) {
        // if there is an unit of work then the completions is moved there
        return unitOfWork.containsSynchronization(onCompletion);
    } else {
        // check temporary completions if no unit of work yet
        return onCompletions != null && onCompletions.contains(onCompletion);
    }
}
项目:Camel    文件:DefaultExchange.java   
public void handoverCompletions(Exchange target) {
    if (onCompletions != null) {
        for (Synchronization onCompletion : onCompletions) {
            target.addOnCompletion(onCompletion);
        }
        // cleanup the temporary on completion list as they have been handed over
        onCompletions.clear();
        onCompletions = null;
    } else if (unitOfWork != null) {
        // let unit of work handover
        unitOfWork.handoverSynchronization(target);
    }
}
项目:Camel    文件:DefaultExchange.java   
public List<Synchronization> handoverCompletions() {
    List<Synchronization> answer = null;
    if (onCompletions != null) {
        answer = new ArrayList<Synchronization>(onCompletions);
        onCompletions.clear();
        onCompletions = null;
    }
    return answer;
}
项目:Camel    文件:AbstractSynchronizedExchange.java   
@Override
public Exchange cancelAndGetOriginalExchange() {
    if (synchronizations != null) {
        for (Synchronization synchronization : synchronizations) {
            exchange.addOnCompletion(synchronization);
        }
    }

    return exchange;
}
项目:camelinaction    文件:CamelCallbackTest.java   
@Test
public void testCamelCallback() throws Exception {
    // echos is the list of replies which could be modified by multiple thread
    final List<String> echos = new CopyOnWriteArrayList<String>();
    final CountDownLatch latch = new CountDownLatch(3);

    // use this callback to gather the replies and add it to the echos list
    Synchronization callback = new SynchronizationAdapter() {
        @Override
        public void onDone(Exchange exchange) {
            // get the reply and add it to echoes
            echos.add(exchange.getOut().getBody(String.class));
            // count down latch when we receive a response
            latch.countDown();
        }
    };

    // now submit 3 async request/reply messages and use the same callback to
    // handle the replies
    template.asyncCallbackRequestBody("seda:echo", "Donkey", callback);
    template.asyncCallbackRequestBody("seda:echo", "Tiger", callback);
    template.asyncCallbackRequestBody("seda:echo", "Camel", callback);

    // wait until the messages is done, or timeout after 6 seconds
    latch.await(6, TimeUnit.SECONDS);

    // assert we got 3 replies
    assertEquals(3, echos.size());
    List result = new ArrayList(echos);
    // sort list so we can assert by index
    Collections.sort(result);
    assertEquals("CamelCamel", result.get(0));
    assertEquals("DonkeyDonkey", result.get(1));
    assertEquals("TigerTiger", result.get(2));
}
项目:t4f-data    文件:CamelCallbackTest.java   
@Test
public void testCamelCallback() throws Exception {
    // echos is the list of replies
    final List<String> echos = new ArrayList<String>();

    // use this callback to gather the replies and add it to the echos list
    Synchronization callback = new SynchronizationAdapter() {
        @Override
        public void onDone(Exchange exchange) {
            // get the reply and add it to echoes
            echos.add(exchange.getOut().getBody(String.class));
        }
    };

    // now submit 3 async request/reply messages and use the same callback to
    // handle the replies
    template.asyncCallbackRequestBody("seda:echo", "Donkey", callback);
    template.asyncCallbackRequestBody("seda:echo", "Tiger", callback);
    template.asyncCallbackRequestBody("seda:echo", "Camel", callback);

    // wait until the messages is done
    Thread.sleep(5000);

    // assert we got 3 replies
    assertEquals(3, echos.size());
    // sort list so we can assert by index
    Collections.sort(echos);
    assertEquals("CamelCamel", echos.get(0));
    assertEquals("DonkeyDonkey", echos.get(1));
    assertEquals("TigerTiger", echos.get(2));
}
项目:Camel    文件:IdempotentConsumer.java   
IdempotentConsumerCallback(Exchange exchange, Synchronization onCompletion, AsyncCallback callback, boolean completionEager) {
    this.exchange = exchange;
    this.onCompletion = onCompletion;
    this.callback = callback;
    this.completionEager = completionEager;
}
项目:Camel    文件:DefaultProducerTemplate.java   
public Future<Object> asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) {
    return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion);
}
项目:Camel    文件:DefaultProducerTemplate.java   
public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
    return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion);
}
项目:Camel    文件:DefaultProducerTemplate.java   
public Future<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) {
    return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion);
}
项目:Camel    文件:DefaultProducerTemplate.java   
public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
    return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion);
}
项目:Camel    文件:DefaultProducerTemplate.java   
public Future<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) {
    return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion);
}
项目:Camel    文件:DefaultProducerTemplate.java   
public Future<Exchange> asyncCallback(String uri, Processor processor, Synchronization onCompletion) {
    return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion);
}
项目:Camel    文件:DefaultUnitOfWork.java   
public synchronized void removeSynchronization(Synchronization synchronization) {
    if (synchronizations != null) {
        synchronizations.remove(synchronization);
    }
}
项目:Camel    文件:DefaultUnitOfWork.java   
public synchronized boolean containsSynchronization(Synchronization synchronization) {
    return synchronizations != null && synchronizations.contains(synchronization);
}
项目:Camel    文件:InOutMessageHandler.java   
public InOutMessageHandler(SjmsEndpoint endpoint, ExecutorService executor, Synchronization synchronization) {
    super(endpoint, executor, synchronization);
}
项目:Camel    文件:AbstractMessageHandler.java   
public AbstractMessageHandler(SjmsEndpoint endpoint, ExecutorService executor, Synchronization synchronization) {
    this.synchronization = synchronization;
    this.endpoint = endpoint;
    this.executor = executor;
}
项目:Camel    文件:SjmsConsumer.java   
/**
 * Helper factory method used to create a MessageListener based on the MEP
 *
 * @param session a session is only required if we are a transacted consumer
 * @return the listener
 */
protected MessageListener createMessageHandler(Session session) {

    TransactionCommitStrategy commitStrategy;
    if (getTransactionCommitStrategy() != null) {
        commitStrategy = getTransactionCommitStrategy();
    } else if (getTransactionBatchCount() > 0) {
        commitStrategy = new BatchTransactionCommitStrategy(getTransactionBatchCount());
    } else {
        commitStrategy = new DefaultTransactionCommitStrategy();
    }

    Synchronization synchronization;
    if (commitStrategy instanceof BatchTransactionCommitStrategy) {
        TimedTaskManager timedTaskManager = getEndpoint().getComponent().getTimedTaskManager();
        synchronization = new SessionBatchTransactionSynchronization(timedTaskManager, session, commitStrategy, getTransactionBatchTimeout());
    } else {
        synchronization = new SessionTransactionSynchronization(session, commitStrategy);
    }

    AbstractMessageHandler messageHandler;
    if (getEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) {
        if (isTransacted()) {
            messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, synchronization);
        } else {
            messageHandler = new InOnlyMessageHandler(getEndpoint(), executor);
        }
    } else {
        if (isTransacted()) {
            messageHandler = new InOutMessageHandler(getEndpoint(), executor, synchronization);
        } else {
            messageHandler = new InOutMessageHandler(getEndpoint(), executor);
        }
    }
    messageHandler.setSession(session);
    messageHandler.setProcessor(getAsyncProcessor());
    messageHandler.setSynchronous(isSynchronous());
    messageHandler.setTransacted(isTransacted());
    messageHandler.setTopic(isTopic());
    return messageHandler;
}
项目:microservice-bundle    文件:ManagedProducerTemplate.java   
@Override
public Future<Exchange> asyncCallback(String endpointUri, Exchange exchange, Synchronization onCompletion) {
  return template.asyncCallback(endpointUri, exchange, onCompletion);
}
项目:microservice-bundle    文件:ManagedProducerTemplate.java   
@Override
public Future<Exchange> asyncCallback(Endpoint endpoint, Exchange exchange, Synchronization onCompletion) {
  return template.asyncCallback(endpoint, exchange, onCompletion);
}
项目:microservice-bundle    文件:ManagedProducerTemplate.java   
@Override
public Future<Exchange> asyncCallback(String endpointUri, Processor processor, Synchronization onCompletion) {
  return template.asyncCallback(endpointUri, processor, onCompletion);
}
项目:microservice-bundle    文件:ManagedProducerTemplate.java   
@Override
public Future<Exchange> asyncCallback(Endpoint endpoint, Processor processor, Synchronization onCompletion) {
  return template.asyncCallback(endpoint, processor, onCompletion);
}
项目:microservice-bundle    文件:ManagedProducerTemplate.java   
@Override
public Future<Object> asyncCallbackSendBody(String endpointUri, Object body, Synchronization onCompletion) {
  return template.asyncCallbackSendBody(endpointUri, body, onCompletion);
}
项目:microservice-bundle    文件:ManagedProducerTemplate.java   
@Override
public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
  return template.asyncCallbackSendBody(endpoint, body, onCompletion);
}
项目:microservice-bundle    文件:ManagedProducerTemplate.java   
@Override
public Future<Object> asyncCallbackRequestBody(String endpointUri, Object body, Synchronization onCompletion) {
  return template.asyncCallbackRequestBody(endpointUri, body, onCompletion);
}
项目:microservice-bundle    文件:ManagedProducerTemplate.java   
@Override
public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
  return template.asyncCallbackRequestBody(endpoint, body, onCompletion);
}
项目:dropwizard-camel    文件:ManagedProducerTemplate.java   
@Override
public Future<Exchange> asyncCallback(String endpointUri, Exchange exchange, Synchronization onCompletion) {
    return template.asyncCallback(endpointUri, exchange, onCompletion);
}
项目:dropwizard-camel    文件:ManagedProducerTemplate.java   
@Override
public Future<Exchange> asyncCallback(Endpoint endpoint, Exchange exchange, Synchronization onCompletion) {
    return template.asyncCallback(endpoint, exchange, onCompletion);
}