Java 类org.apache.camel.util.AsyncProcessorConverterHelper 实例源码

项目:sponge    文件:DefaultCamelConsumer.java   
public DefaultCamelConsumer(SpongeEndpoint spongeEndpoint, Processor processor) {
    ObjectHelper.notNull(spongeEndpoint, "spongeEndpoint");
    ObjectHelper.notNull(processor, "processor");

    this.spongeEndpoint = spongeEndpoint;
    this.processor = AsyncProcessorConverterHelper.convert(processor);
}
项目:Camel    文件:TryProcessor.java   
public boolean process(Exchange exchange, AsyncCallback callback) {
    Iterator<Processor> processors = next().iterator();

    Object lastHandled = exchange.getProperty(Exchange.EXCEPTION_HANDLED);
    exchange.setProperty(Exchange.EXCEPTION_HANDLED, null);

    while (continueRouting(processors, exchange)) {
        exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
        ExchangeHelper.prepareOutToIn(exchange);

        // process the next processor
        Processor processor = processors.next();
        AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
        boolean sync = process(exchange, callback, processors, async, lastHandled);

        // continue as long its being processed synchronously
        if (!sync) {
            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
            // the remainder of the try .. catch .. finally will be completed async
            // so we break out now, then the callback will be invoked which then continue routing from where we left here
            return false;
        }

        LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
    }

    ExchangeHelper.prepareOutToIn(exchange);
    exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
    exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled);
    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
    callback.done(true);
    return true;
}
项目:Camel    文件:IdempotentConsumer.java   
public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository,
                          boolean eager, boolean completionEager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) {
    this.messageIdExpression = messageIdExpression;
    this.idempotentRepository = idempotentRepository;
    this.eager = eager;
    this.completionEager = completionEager;
    this.skipDuplicate = skipDuplicate;
    this.removeOnFailure = removeOnFailure;
    this.processor = AsyncProcessorConverterHelper.convert(processor);
}
项目:Camel    文件:FailOverLoadBalancer.java   
private boolean processExchange(Processor processor, Exchange exchange, Exchange copy,
                                AtomicInteger attempts, AtomicInteger index,
                                AsyncCallback callback, List<Processor> processors) {
    if (processor == null) {
        throw new IllegalStateException("No processors could be chosen to process " + copy);
    }
    log.debug("Processing failover at attempt {} for {}", attempts, copy);

    AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
    return albp.process(copy, new FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors));
}
项目:Camel    文件:CircuitBreakerLoadBalancer.java   
private boolean executeProcessor(final Exchange exchange, final AsyncCallback callback) {
    Processor processor = getProcessors().get(0);
    if (processor == null) {
        throw new IllegalStateException("No processors could be chosen to process CircuitBreaker");
    }

    // store state as exchange property
    exchange.setProperty(Exchange.CIRCUIT_BREAKER_STATE, stateAsString(state.get()));

    AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
    // Added a callback for processing the exchange in the callback
    boolean sync = albp.process(exchange, new CircuitBreakerCallback(exchange, callback));

    // We need to check the exception here as albp is use sync call
    if (sync) {
        boolean failed = hasFailed(exchange);
        if (!failed) {
            failures.set(0);
        } else {
            failures.incrementAndGet();
            lastFailure = System.currentTimeMillis();
        }
    } else {
        // CircuitBreakerCallback can take care of failure check of the
        // exchange
        log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
        return false;
    }

    log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
    callback.done(true);
    return true;
}
项目:Camel    文件:SendProcessor.java   
protected void doStart() throws Exception {
    if (producerCache == null) {
        // use a single producer cache as we need to only hold reference for one destination
        // and use a regular HashMap as we do not want a soft reference store that may get re-claimed when low on memory
        // as we want to ensure the producer is kept around, to ensure its lifecycle is fully managed,
        // eg stopping the producer when we stop etc.
        producerCache = new ProducerCache(this, camelContext, new HashMap<String, Producer>(1));
        // do not add as service as we do not want to manage the producer cache
    }
    ServiceHelper.startService(producerCache);

    // the destination could since have been intercepted by a interceptSendToEndpoint so we got to
    // lookup this before we can use the destination
    Endpoint lookup = camelContext.hasEndpoint(destination.getEndpointKey());
    if (lookup instanceof InterceptSendToEndpoint) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Intercepted sending to {} -> {}",
                    URISupport.sanitizeUri(destination.getEndpointUri()), URISupport.sanitizeUri(lookup.getEndpointUri()));
        }
        destination = lookup;
    }
    // warm up the producer by starting it so we can fail fast if there was a problem
    // however must start endpoint first
    ServiceHelper.startService(destination);

    // this SendProcessor is used a lot in Camel (eg every .to in the route DSL) and therefore we
    // want to optimize for regular producers, by using the producer directly instead of the ProducerCache
    // Only for pooled and non singleton producers we have to use the ProducerCache as it supports these
    // kind of producer better (though these kind of producer should be rare)

    Producer producer = producerCache.acquireProducer(destination);
    if (producer instanceof ServicePoolAware || !producer.isSingleton()) {
        // no we cannot optimize it - so release the producer back to the producer cache
        // and use the producer cache for sending
        producerCache.releaseProducer(destination, producer);
    } else {
        // yes we can optimize and use the producer directly for sending
        this.producer = AsyncProcessorConverterHelper.convert(producer);
    }
}
项目:Camel    文件:AsyncProcessorTypeConverter.java   
@Override
public <T> T convertTo(Class<T> type, Exchange exchange, Object value) {
    if (type.equals(AsyncProcessor.class)) {
        if (value instanceof Processor) {
            return type.cast(AsyncProcessorConverterHelper.convert((Processor) value));
        }
    }
    return null;
}
项目:Camel    文件:DefaultConsumer.java   
/**
 * Provides an {@link org.apache.camel.AsyncProcessor} interface to the configured
 * processor on the consumer. If the processor does not implement the interface,
 * it will be adapted so that it does.
 */
public synchronized AsyncProcessor getAsyncProcessor() {
    if (asyncProcessor == null) {            
        asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
    }
    return asyncProcessor;
}
项目:Camel    文件:CamelJGroupsReceiver.java   
public CamelJGroupsReceiver(JGroupsEndpoint endpoint, Processor processor) {
    ObjectHelper.notNull(endpoint, "endpoint");
    ObjectHelper.notNull(processor, "processor");

    this.endpoint = endpoint;
    this.processor = AsyncProcessorConverterHelper.convert(processor);
}
项目:Camel    文件:RouteboxDirectConsumer.java   
/**
 * Provides an {@link org.apache.camel.AsyncProcessor} interface to the configured
 * processor on the consumer. If the processor does not implement the interface,
 * it will be adapted so that it does.
 */
public synchronized AsyncProcessor getAsyncProcessor() {
    if (asyncProcessor == null) {            
        asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
    }
    return asyncProcessor;
}
项目:Camel    文件:CamelEventHandler.java   
public CamelEventHandler(GuavaEventBusEndpoint eventBusEndpoint, Processor processor) {
    ObjectHelper.notNull(eventBusEndpoint, "eventBusEndpoint");
    ObjectHelper.notNull(processor, "processor");

    this.eventBusEndpoint = eventBusEndpoint;
    this.processor = AsyncProcessorConverterHelper.convert(processor);
}
项目:syndesis    文件:ComponentProxyProducer.java   
public ComponentProxyProducer(final Endpoint endpoint, final Processor processor) {
    super(endpoint);
    this.processor = AsyncProcessorConverterHelper.convert(processor);
}
项目:Camel    文件:ChoiceProcessor.java   
public boolean process(final Exchange exchange, final AsyncCallback callback) {
    Iterator<Processor> processors = next().iterator();

    // callback to restore existing FILTER_MATCHED property on the Exchange
    final Object existing = exchange.getProperty(Exchange.FILTER_MATCHED);
    final AsyncCallback choiceCallback = new AsyncCallback() {
        @Override
        public void done(boolean doneSync) {
            if (existing != null) {
                exchange.setProperty(Exchange.FILTER_MATCHED, existing);
            } else {
                exchange.removeProperty(Exchange.FILTER_MATCHED);
            }
            callback.done(doneSync);
        }
    };

    // as we only pick one processor to process, then no need to have async callback that has a while loop as well
    // as this should not happen, eg we pick the first filter processor that matches, or the otherwise (if present)
    // and if not, we just continue without using any processor
    while (processors.hasNext()) {
        // get the next processor
        Processor processor = processors.next();

        // evaluate the predicate on filter predicate early to be faster
        // and avoid issues when having nested choices
        // as we should only pick one processor
        boolean matches = false;
        if (processor instanceof FilterProcessor) {
            FilterProcessor filter = (FilterProcessor) processor;
            try {
                matches = filter.matches(exchange);
                // as we have pre evaluated the predicate then use its processor directly when routing
                processor = filter.getProcessor();
            } catch (Throwable e) {
                exchange.setException(e);
            }
        } else {
            // its the otherwise processor, so its a match
            notFiltered++;
            matches = true;
        }

        // check for error if so we should break out
        if (!continueProcessing(exchange, "so breaking out of choice", LOG)) {
            break;
        }

        // if we did not match then continue to next filter
        if (!matches) {
            continue;
        }

        // okay we found a filter or its the otherwise we are processing
        AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
        return async.process(exchange, choiceCallback);
    }

    // when no filter matches and there is no otherwise, then just continue
    choiceCallback.done(true);
    return true;
}
项目:Camel    文件:DelegateAsyncProcessor.java   
public DelegateAsyncProcessor(Processor processor) {
    this(AsyncProcessorConverterHelper.convert(processor));
}
项目:Camel    文件:DelegateAsyncProcessor.java   
public void setProcessor(Processor processor) {
    this.processor = AsyncProcessorConverterHelper.convert(processor);
}
项目:Camel    文件:InterceptorToAsyncProcessorBridge.java   
public void setTarget(Processor target) {
    this.target = AsyncProcessorConverterHelper.convert(target);
}
项目:Camel    文件:Pipeline.java   
public boolean process(Exchange exchange, AsyncCallback callback) {
    Iterator<Processor> processors = getProcessors().iterator();
    Exchange nextExchange = exchange;
    boolean first = true;

    while (continueRouting(processors, nextExchange)) {
        if (first) {
            first = false;
        } else {
            // prepare for next run
            nextExchange = createNextExchange(nextExchange);
        }

        // get the next processor
        Processor processor = processors.next();

        AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
        boolean sync = process(exchange, nextExchange, callback, processors, async);

        // continue as long its being processed synchronously
        if (!sync) {
            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
            // the remainder of the pipeline will be completed async
            // so we break out now, then the callback will be invoked which then continue routing from where we left here
            return false;
        }

        LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());

        // check for error if so we should break out
        if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) {
            break;
        }
    }

    // logging nextExchange as it contains the exchange that might have altered the payload and since
    // we are logging the completion if will be confusing if we log the original instead
    // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), nextExchange);

    // copy results back to the original exchange
    ExchangeHelper.copyResults(exchange, nextExchange);

    callback.done(true);
    return true;
}
项目:Camel    文件:Pipeline.java   
private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback,
                        final Iterator<Processor> processors, final AsyncProcessor asyncProcessor) {
    // this does the actual processing so log at trace level
    LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);

    // implement asynchronous routing logic in callback so we can have the callback being
    // triggered and then continue routing where we left
    //boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange,
    boolean sync = asyncProcessor.process(exchange, new AsyncCallback() {
        public void done(boolean doneSync) {
            // we only have to handle async completion of the pipeline
            if (doneSync) {
                return;
            }

            // continue processing the pipeline asynchronously
            Exchange nextExchange = exchange;
            while (continueRouting(processors, nextExchange)) {
                AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next());

                // check for error if so we should break out
                if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) {
                    break;
                }

                nextExchange = createNextExchange(nextExchange);
                doneSync = process(original, nextExchange, callback, processors, processor);
                if (!doneSync) {
                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
                    return;
                }
            }

            ExchangeHelper.copyResults(original, nextExchange);
            LOG.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), original);
            callback.done(false);
        }
    });

    return sync;
}
项目:Camel    文件:AsyncProcessorTypeConverter.java   
/**
 * @deprecated use {@link AsyncProcessorConverterHelper#convert(org.apache.camel.Processor)} instead
 */
@Deprecated
public static AsyncProcessor convert(Processor value) {
    return AsyncProcessorConverterHelper.convert(value);
}
项目:Camel    文件:SedaConsumer.java   
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
    this.endpoint = endpoint;
    this.processor = AsyncProcessorConverterHelper.convert(processor);
    this.pollTimeout = endpoint.getPollTimeout();
    this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
}
项目:Camel    文件:EndpointMessageListener.java   
public EndpointMessageListener(JmsEndpoint endpoint, Processor processor) {
    this.endpoint = endpoint;
    this.processor = AsyncProcessorConverterHelper.convert(processor);
}
项目:Camel    文件:RouteboxSedaConsumer.java   
public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor processor) {
    super(endpoint);
    this.setProcessor(AsyncProcessorConverterHelper.convert(processor));
    this.producer = endpoint.getConfig().getInnerProducerTemplate();
}
项目:Camel    文件:HazelcastSedaConsumer.java   
public HazelcastSedaConsumer(final Endpoint endpoint, final Processor processor) {
    super(endpoint, processor);
    this.endpoint = (HazelcastSedaEndpoint) endpoint;
    this.processor = AsyncProcessorConverterHelper.convert(processor);
}
项目:Camel    文件:DisruptorConsumer.java   
public DisruptorConsumer(final DisruptorEndpoint endpoint, final Processor processor) {
    this.endpoint = endpoint;
    this.processor = AsyncProcessorConverterHelper.convert(processor);
}
项目:gumtree-spoon-ast-diff    文件:patched.java   
private void doProcessParallel(final ProcessorExchangePair pair) throws Exception {
    final Exchange exchange = pair.getExchange();
    Processor processor = pair.getProcessor();
    Producer producer = pair.getProducer();

    TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;

    // compute time taken if sending to another endpoint
    StopWatch watch = null;
    if (producer != null) {
        watch = new StopWatch();
    }

    try {
        // prepare tracing starting from a new block
        if (traced != null) {
            traced.pushBlock();
        }

        if (producer != null) {
            EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
        }
        // let the prepared process it, remember to begin the exchange pair
        AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
        pair.begin();
        // we invoke it synchronously as parallel async routing is too hard
        AsyncProcessorHelper.process(async, exchange);
    } finally {
        pair.done();
        // pop the block so by next round we have the same staring point and thus the tracing looks accurate
        if (traced != null) {
            traced.popBlock();
        }
        if (producer != null) {
            long timeTaken = watch.stop();
            Endpoint endpoint = producer.getEndpoint();
            // emit event that the exchange was sent to the endpoint
            // this is okay to do here in the finally block, as the processing is not using the async routing engine
            //( we invoke it synchronously as parallel async routing is too hard)
            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
        }
    }
}
项目:gumtree-spoon-ast-diff    文件:original.java   
private void doProcessParallel(final ProcessorExchangePair pair) throws Exception {
    final Exchange exchange = pair.getExchange();
    Processor processor = pair.getProcessor();
    Producer producer = pair.getProducer();

    TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;

    // compute time taken if sending to another endpoint
    StopWatch watch = null;
    if (producer != null) {
        watch = new StopWatch();
    }

    try {
        // prepare tracing starting from a new block
        if (traced != null) {
            traced.pushBlock();
        }

        if (producer != null) {
            EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
        }
        // let the prepared process it, remember to begin the exchange pair
        AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
        pair.begin();
        // we invoke it synchronously as parallel async routing is too hard
        AsyncProcessorHelper.process(async, exchange);
    } finally {
        pair.done();
        // pop the block so by next round we have the same staring point and thus the tracing looks accurate
        if (traced != null) {
            traced.popBlock();
        }
        if (producer != null) {
            long timeTaken = watch.stop();
            Endpoint endpoint = producer.getEndpoint();
            // emit event that the exchange was sent to the endpoint
            // this is okay to do here in the finally block, as the processing is not using the async routing engine
            //( we invoke it synchronously as parallel async routing is too hard)
            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
        }
    }
}
项目:vertx-camel-bridge    文件:FromVertxToCamelProducer.java   
/**
 * Creates a new instance of producer.
 *
 * @param vertx    the vert.x instance
 * @param producer the underlying producer, must not be {@code null}
 * @param outbound the outbound configuration, must not be {@code null}
 * @param blocking whether or not the processing is blocking and so should not be run on the event
 *                 loop
 * @param pool     the pool on which the blocking code is going to be executed
 */
public FromVertxToCamelProducer(Vertx vertx, Producer producer, OutboundMapping outbound, boolean blocking,
                                WorkerExecutor  pool) {
  this.endpoint = producer.getEndpoint();
  this.producer = AsyncProcessorConverterHelper.convert(producer);
  this.outbound = outbound;
  this.blocking = blocking;
  this.vertx = vertx;
  this.pool = pool;
}
项目:Camel    文件:InterceptorToAsyncProcessorBridge.java   
/**
 * Constructs the bridge
 *
 * @param interceptor the interceptor to bridge
 * @param target the target
 */
public InterceptorToAsyncProcessorBridge(Processor interceptor, AsyncProcessor target) {
    this.interceptor = AsyncProcessorConverterHelper.convert(interceptor);
    this.target = target;
}