Java 类org.apache.camel.spi.UnitOfWork 实例源码

项目:Camel    文件:ExchangeHelper.java   
/**
 * Creates a new instance and copies from the current message exchange so that it can be
 * forwarded to another destination as a new instance. Unlike regular copy this operation
 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
 * for async messaging, where the original and copied exchange are independent.
 *
 * @param exchange original copy of the exchange
 * @param handover whether the on completion callbacks should be handed over to the new copy.
 * @param useSameMessageId whether to use same message id on the copy message.
 */
public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) {
    String id = exchange.getExchangeId();

    // make sure to do a safe copy as the correlated copy can be routed independently of the source.
    Exchange copy = exchange.copy(true);
    // do not reuse message id on copy
    if (!useSameMessageId) {
        if (copy.hasOut()) {
            copy.getOut().setMessageId(null);
        }
        copy.getIn().setMessageId(null);
    }
    // do not share the unit of work
    copy.setUnitOfWork(null);
    // do not reuse the message id
    // hand over on completion to the copy if we got any
    UnitOfWork uow = exchange.getUnitOfWork();
    if (handover && uow != null) {
        uow.handoverSynchronization(copy);
    }
    // set a correlation id so we can track back the original exchange
    copy.setProperty(Exchange.CORRELATION_ID, id);
    return copy;
}
项目:Camel    文件:ExchangeHelper.java   
/**
 * Gets the original IN {@link Message} this Unit of Work was started with.
 * <p/>
 * The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()}
 * is enabled. If its disabled, then <tt>null</tt> is returned.
 *
 * @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled.
 */
public static Message getOriginalInMessage(Exchange exchange) {
    Message answer = null;

    // try parent first
    UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
    if (uow != null) {
        answer = uow.getOriginalInMessage();
    }
    // fallback to the current exchange
    if (answer == null) {
        uow = exchange.getUnitOfWork();
        if (uow != null) {
            answer = uow.getOriginalInMessage();
        }
    }
    return answer;
}
项目:Camel    文件:BridgeExceptionHandlerToErrorHandler.java   
@Override
public void handleException(String message, Exchange exchange, Throwable exception) {
    if (exchange == null) {
        exchange = consumer.getEndpoint().createExchange();
    }

    // set the caused exception
    exchange.setException(exception);
    // and the message
    exchange.getIn().setBody(message);
    // and mark as redelivery exhausted as we cannot do redeliveries
    exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);

    // wrap in UoW
    UnitOfWork uow = null;
    try {
        uow = consumer.createUoW(exchange);
        bridge.process(exchange);
    } catch (Exception e) {
        fallback.handleException("Error handling exception " + exception.getMessage(), exchange, e);
    } finally {
        UnitOfWorkHelper.doneUow(uow, exchange);
    }
}
项目:Camel    文件:ExpressionBuilder.java   
/**
 * Returns an Expression for the route id
 */
public static Expression routeIdExpression() {
    return new ExpressionAdapter() {
        public Object evaluate(Exchange exchange) {
            String answer = null;
            UnitOfWork uow = exchange.getUnitOfWork();
            RouteContext rc = uow != null ? uow.getRouteContext() : null;
            if (rc != null) {
                answer = rc.getRoute().getId();
            }
            if (answer == null) {
                // fallback and get from route id on the exchange
                answer = exchange.getFromRouteId();
            }
            return answer;
        }

        @Override
        public String toString() {
            return "routeId";
        }
    };
}
项目:gumtree-spoon-ast-diff    文件:patched.java   
/**
 * Strategy to create the unit of work to be used for the sub route
 *
 * @param routeContext the route context
 * @param processor    the processor
 * @param exchange     the exchange
 * @return the unit of work processor
 */
protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) {
    String routeId = routeContext != null ? routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()) : null;
    CamelInternalProcessor internal = new CamelInternalProcessor(processor);

    // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
    UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
    if (parent != null) {
        internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeId, parent));
    } else {
        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
    }

    // and then in route context so we can keep track which route this is at runtime
    if (routeContext != null) {
        internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext));
    }
    return internal;
}
项目:gumtree-spoon-ast-diff    文件:original.java   
/**
 * Strategy to create the unit of work to be used for the sub route
 *
 * @param routeContext the route context
 * @param processor    the processor
 * @param exchange     the exchange
 * @return the unit of work processor
 */
protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) {
    String routeId = routeContext != null ? routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()) : null;
    CamelInternalProcessor internal = new CamelInternalProcessor(processor);

    // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
    UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
    if (parent != null) {
        internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeId, parent));
    } else {
        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
    }

    // and then in route context so we can keep track which route this is at runtime
    if (routeContext != null) {
        internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext));
    }
    return internal;
}
项目:camel-springboot    文件:Application.java   
/**
 * Enable custom unit of work. UnitOfWorkFactory bean is automatically picked up by Camel context
 */
@Bean
UnitOfWorkFactory customUnitOfWorkFactory() {
    return new UnitOfWorkFactory() {
        @Override
        public UnitOfWork createUnitOfWork(Exchange exchange) {
            return new CustomMDCBreadCrumbIdUnitOfWork(exchange);
        }
    };
}
项目:Camel    文件:CamelInternalProcessor.java   
@Override
public Object before(Exchange exchange) throws Exception {
    UnitOfWork uow = exchange.getUnitOfWork();
    if (uow != null) {
        uow.beforeRoute(exchange, route);
    }
    return null;
}
项目:Camel    文件:CamelInternalProcessor.java   
@Override
public void after(Exchange exchange, Object object) throws Exception {
    UnitOfWork uow = exchange.getUnitOfWork();
    if (uow != null) {
        uow.afterRoute(exchange, route);
    }
}
项目:Camel    文件:CamelInternalProcessor.java   
@Override
public UnitOfWork before(Exchange exchange) throws Exception {
    // push the current route context
    final UnitOfWork unitOfWork = exchange.getUnitOfWork();
    if (unitOfWork != null) {
        unitOfWork.pushRouteContext(routeContext);
    }
    return unitOfWork;
}
项目:Camel    文件:CamelInternalProcessor.java   
@Override
public UnitOfWork before(Exchange exchange) throws Exception {
    // if the exchange doesn't have from route id set, then set it if it originated
    // from this unit of work
    if (routeContext != null && exchange.getFromRouteId() == null) {
        String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
        exchange.setFromRouteId(routeId);
    }

    // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW
    UnitOfWork created = null;

    if (exchange.getUnitOfWork() == null) {
        // If there is no existing UoW, then we should start one and
        // terminate it once processing is completed for the exchange.
        created = createUnitOfWork(exchange);
        exchange.setUnitOfWork(created);
        created.start();
    }

    // for any exchange we should push/pop route context so we can keep track of which route we are routing
    if (routeContext != null) {
        UnitOfWork existing = exchange.getUnitOfWork();
        if (existing != null) {
            existing.pushRouteContext(routeContext);
        }
    }

    return created;
}
项目:Camel    文件:CamelInternalProcessor.java   
@Override
public void after(Exchange exchange, UnitOfWork uow) throws Exception {
    UnitOfWork existing = exchange.getUnitOfWork();

    // execute done on uow if we created it, and the consumer is not doing it
    if (uow != null) {
        UnitOfWorkHelper.doneUow(uow, exchange);
    }

    // after UoW is done lets pop the route context which must be done on every existing UoW
    if (routeContext != null && existing != null) {
        existing.popRouteContext();
    }
}
项目:Camel    文件:MulticastProcessor.java   
/**
 * Strategy to create the unit of work to be used for the sub route
 *
 * @param routeContext the route context
 * @param processor    the processor
 * @param exchange     the exchange
 * @return the unit of work processor
 */
protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) {
    CamelInternalProcessor internal = new CamelInternalProcessor(processor);

    // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
    UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
    if (parent != null) {
        internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeContext, parent));
    } else {
        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
    }

    return internal;
}
项目:Camel    文件:RedeliveryErrorHandler.java   
private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
    log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
    // exception not handled, put exception back in the exchange
    exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
    exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
    // and put failure endpoint back as well
    exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
    // and store the route id so we know in which route we failed
    UnitOfWork uow = exchange.getUnitOfWork();
    if (uow != null && uow.getRouteContext() != null) {
        exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
    }
}
项目:Camel    文件:AsyncProcessorHelper.java   
/**
 * Calls the async version of the processor's process method.
 * <p/>
 * This implementation supports transacted {@link Exchange}s which ensure those are run in a synchronous fashion.
 * See more details at {@link org.apache.camel.AsyncProcessor}.
 *
 * @param processor the processor
 * @param exchange  the exchange
 * @param callback  the callback
 * @return <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
 * @deprecated should no longer be needed, instead invoke the process method on the {@link AsyncProcessor} directly,
 * instead of using this method.
 */
@Deprecated
public static boolean process(final AsyncProcessor processor, final Exchange exchange, final AsyncCallback callback) {
    boolean sync;

    if (exchange.isTransacted()) {
        // must be synchronized for transacted exchanges
        LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
        try {
            process(processor, exchange);
        } catch (Throwable e) {
            exchange.setException(e);
        }
        callback.done(true);
        sync = true;
    } else {
        final UnitOfWork uow = exchange.getUnitOfWork();

        // allow unit of work to wrap callback in case it need to do some special work
        // for example the MDCUnitOfWork
        AsyncCallback async = callback;
        if (uow != null) {
            async = uow.beforeProcess(processor, exchange, callback);
        }

        // we support asynchronous routing so invoke it
        sync = processor.process(exchange, async);

        // execute any after processor work (in current thread, not in the callback)
        if (uow != null) {
            uow.afterProcess(processor, exchange, callback, sync);
        }
    }

    if (LOG.isTraceEnabled()) {
        LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
                new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange});
    }
    return sync;
}
项目:Camel    文件:DefaultConsumer.java   
/**
 * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
 * the processed {@link Exchange} then this method should be use to create and start
 * the {@link UnitOfWork} on the exchange.
 *
 * @param exchange the exchange
 * @return the created and started unit of work
 * @throws Exception is thrown if error starting the unit of work
 *
 * @see #doneUoW(org.apache.camel.Exchange)
 */
public UnitOfWork createUoW(Exchange exchange) throws Exception {
    // if the exchange doesn't have from route id set, then set it if it originated
    // from this unit of work
    if (route != null && exchange.getFromRouteId() == null) {
        exchange.setFromRouteId(route.getId());
    }

    UnitOfWork uow = endpoint.getCamelContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
    exchange.setUnitOfWork(uow);
    uow.start();
    return uow;
}
项目:Camel    文件:DefaultRuntimeEndpointRegistry.java   
private String getRouteId(Exchange exchange) {
    String answer = null;
    UnitOfWork uow = exchange.getUnitOfWork();
    RouteContext rc = uow != null ? uow.getRouteContext() : null;
    if (rc != null) {
        answer = rc.getRoute().getId();
    }
    if (answer == null) {
        // fallback and get from route id on the exchange
        answer = exchange.getFromRouteId();
    }
    return answer;
}
项目:Camel    文件:DefaultUnitOfWorkFactory.java   
@Override
public UnitOfWork createUnitOfWork(Exchange exchange) {
    UnitOfWork answer;
    if (exchange.getContext().isUseMDCLogging()) {
        answer = new MDCUnitOfWork(exchange);
    } else {
        answer = new DefaultUnitOfWork(exchange);
    }
    return answer;
}
项目:Camel    文件:DefaultExchange.java   
public boolean isTransacted() {
    UnitOfWork uow = getUnitOfWork();
    if (uow != null) {
        return uow.isTransacted();
    } else {
        return false;
    }
}
项目:Camel    文件:DefaultExchange.java   
public void setUnitOfWork(UnitOfWork unitOfWork) {
    this.unitOfWork = unitOfWork;
    if (unitOfWork != null && onCompletions != null) {
        // now an unit of work has been assigned so add the on completions
        // we might have registered already
        for (Synchronization onCompletion : onCompletions) {
            unitOfWork.addSynchronization(onCompletion);
        }
        // cleanup the temporary on completion list as they now have been registered
        // on the unit of work
        onCompletions.clear();
        onCompletions = null;
    }
}
项目:Camel    文件:CachedOutputStreamTest.java   
protected void setUp() throws Exception {
    super.setUp();

    deleteDirectory("target/cachedir");
    createDirectory("target/cachedir");

    exchange = new DefaultExchange(context);
    UnitOfWork uow = new DefaultUnitOfWork(exchange);
    exchange.setUnitOfWork(uow);
}
项目:switchyard    文件:CamelTransformer.java   
/**
 * {@inheritDoc}
 */
@Override
public Message transform(Message message) {
    try {
        Exchange exchange = _endpoint.createExchange();
        UnitOfWork uow = exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
        RouteContext rc = new DefaultRouteContext(exchange.getContext());
        uow.pushRouteContext(rc);
        exchange.setUnitOfWork(uow);
        uow.start();
        exchange.getIn().setBody(message.getContent());
        copyProperties(message.getContext(), exchange);

        Producer producer = _endpoint.createProducer();
        producer.process(exchange);
        if (exchange.isFailed()) {
            if (exchange.getException() != null) {
                throw TransformMessages.MESSAGES.failedToTransformViaCamelEndpoint(_endpoint.getEndpointUri(), exchange.getException());
            } else {
                throw TransformMessages.MESSAGES.failedToTransformViaCamelEndpoint(_endpoint.getEndpointUri(), exchange.getIn().getBody(String.class));
            }
        }
        if (QNameUtil.isJavaMessageType(getTo())) {
            message.setContent(exchange.getIn().getBody(QNameUtil.toJavaMessageType(getTo())));
        } else {
            message.setContent(exchange.getIn().getBody());
        }
        return message;
    } catch (Exception e) {
        throw TransformMessages.MESSAGES.failedToTransformViaCamelEndpoint(_endpoint.getEndpointUri(), e);
    }
}
项目:Camel    文件:CamelInternalProcessor.java   
@Override
public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception {
    if (unitOfWork != null) {
        unitOfWork.popRouteContext();
    }
}
项目:Camel    文件:CamelInternalProcessor.java   
protected UnitOfWork createUnitOfWork(Exchange exchange) {
    return exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
}
项目:Camel    文件:CamelInternalProcessor.java   
public ChildUnitOfWorkProcessorAdvice(RouteContext routeContext, UnitOfWork parent) {
    super(routeContext);
    this.parent = parent;
}
项目:Camel    文件:CamelInternalProcessor.java   
@Override
protected UnitOfWork createUnitOfWork(Exchange exchange) {
    // let the parent create a child unit of work to be used
    return parent.createChildUnitOfWork(exchange);
}
项目:Camel    文件:CamelInternalProcessor.java   
@Override
public UnitOfWork before(Exchange exchange) throws Exception {
    // begin savepoint
    exchange.getUnitOfWork().beginSubUnitOfWork(exchange);
    return exchange.getUnitOfWork();
}
项目:Camel    文件:CamelInternalProcessor.java   
@Override
public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception {
    // end sub unit of work
    unitOfWork.endSubUnitOfWork(exchange);
}
项目:Camel    文件:MulticastProcessor.java   
protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) {
    Processor answer;

    boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);

    // do not wrap in error handler if we are inside a try block
    if (!tryBlock && routeContext != null) {
        // wrap the producer in error handler so we have fine grained error handling on
        // the output side instead of the input side
        // this is needed to support redelivery on that output alone and not doing redelivery
        // for the entire multicast block again which will start from scratch again

        // create key for cache
        final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor);

        // lookup cached first to reuse and preserve memory
        answer = errorHandlers.get(key);
        if (answer != null) {
            LOG.trace("Using existing error handler for: {}", processor);
            return answer;
        }

        LOG.trace("Creating error handler for: {}", processor);
        ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
        // create error handler (create error handler directly to keep it light weight,
        // instead of using ProcessorDefinition.wrapInErrorHandler)
        try {
            processor = builder.createErrorHandler(routeContext, processor);

            // and wrap in unit of work processor so the copy exchange also can run under UoW
            answer = createUnitOfWorkProcessor(routeContext, processor, exchange);

            boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null;

            // must start the error handler
            ServiceHelper.startServices(answer);

            // here we don't cache the child unit of work
            if (!child) {
                // add to cache
                errorHandlers.putIfAbsent(key, answer);
            }

        } catch (Exception e) {
            throw ObjectHelper.wrapRuntimeCamelException(e);
        }
    } else {
        // and wrap in unit of work processor so the copy exchange also can run under UoW
        answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
    }

    return answer;
}
项目:Camel    文件:RedeliveryErrorHandler.java   
protected void handleException(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
    Exception e = exchange.getException();

    // store the original caused exception in a property, so we can restore it later
    exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);

    // find the error handler to use (if any)
    OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
    if (exceptionPolicy != null) {
        data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
        data.handledPredicate = exceptionPolicy.getHandledPolicy();
        data.continuedPredicate = exceptionPolicy.getContinuedPolicy();
        data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
        data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy();

        // route specific failure handler?
        Processor processor = null;
        UnitOfWork uow = exchange.getUnitOfWork();
        if (uow != null && uow.getRouteContext() != null) {
            String routeId = uow.getRouteContext().getRoute().getId();
            processor = exceptionPolicy.getErrorHandler(routeId);
        } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) {
            // note this should really not happen, but we have this code as a fail safe
            // to be backwards compatible with the old behavior
            log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId());
            processor = exceptionPolicy.getErrorHandlers().iterator().next();
        }
        if (processor != null) {
            data.failureProcessor = processor;
        }

        // route specific on redelivery?
        processor = exceptionPolicy.getOnRedelivery();
        if (processor != null) {
            data.onRedeliveryProcessor = processor;
        }
        // route specific on exception occurred?
        processor = exceptionPolicy.getOnExceptionOccurred();
        if (processor != null) {
            data.onExceptionProcessor = processor;
        }
    }

    // only log if not failure handled or not an exhausted unit of work
    if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) {
        String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange)
                + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
        logFailedDelivery(true, false, false, false, isDeadLetterChannel, exchange, msg, data, e);
    }

    data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data);
}
项目:Camel    文件:MDCUnitOfWork.java   
@Override
public UnitOfWork newInstance(Exchange exchange) {
    return new MDCUnitOfWork(exchange);
}
项目:Camel    文件:DefaultUnitOfWork.java   
UnitOfWork newInstance(Exchange exchange) {
    return new DefaultUnitOfWork(exchange);
}
项目:Camel    文件:DefaultUnitOfWork.java   
@Override
public void setParentUnitOfWork(UnitOfWork parentUnitOfWork) {
    this.parent = parentUnitOfWork;
}
项目:Camel    文件:DefaultUnitOfWork.java   
public UnitOfWork createChildUnitOfWork(Exchange childExchange) {
    // create a new child unit of work, and mark me as its parent
    UnitOfWork answer = newInstance(childExchange);
    answer.setParentUnitOfWork(this);
    return answer;
}
项目:Camel    文件:DefaultExchange.java   
public UnitOfWork getUnitOfWork() {
    return unitOfWork;
}
项目:Camel    文件:CustomUnitOfWorkFactoryTest.java   
@Override
public UnitOfWork createUnitOfWork(Exchange exchange) {
    return new MyUnitOfWork(exchange);
}
项目:gumtree-spoon-ast-diff    文件:patched.java   
protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) {
    Processor answer;

    boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);

    // do not wrap in error handler if we are inside a try block
    if (!tryBlock && routeContext != null) {
        // wrap the producer in error handler so we have fine grained error handling on
        // the output side instead of the input side
        // this is needed to support redelivery on that output alone and not doing redelivery
        // for the entire multicast block again which will start from scratch again

        // create key for cache
        final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor);

        // lookup cached first to reuse and preserve memory
        answer = errorHandlers.get(key);
        if (answer != null) {
            LOG.trace("Using existing error handler for: {}", processor);
            return answer;
        }

        LOG.trace("Creating error handler for: {}", processor);
        ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
        // create error handler (create error handler directly to keep it light weight,
        // instead of using ProcessorDefinition.wrapInErrorHandler)
        try {
            processor = builder.createErrorHandler(routeContext, processor);

            // and wrap in unit of work processor so the copy exchange also can run under UoW
            answer = createUnitOfWorkProcessor(routeContext, processor, exchange);

            boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null;

            // must start the error handler
            ServiceHelper.startServices(answer);

            // here we don't cache the child unit of work
            if (!child) {
                // add to cache
                errorHandlers.putIfAbsent(key, answer);
            }

        } catch (Exception e) {
            throw ObjectHelper.wrapRuntimeCamelException(e);
        }
    } else {
        // and wrap in unit of work processor so the copy exchange also can run under UoW
        answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
    }

    return answer;
}
项目:gumtree-spoon-ast-diff    文件:original.java   
protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) {
    Processor answer;

    boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);

    // do not wrap in error handler if we are inside a try block
    if (!tryBlock && routeContext != null) {
        // wrap the producer in error handler so we have fine grained error handling on
        // the output side instead of the input side
        // this is needed to support redelivery on that output alone and not doing redelivery
        // for the entire multicast block again which will start from scratch again

        // create key for cache
        final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor);

        // lookup cached first to reuse and preserve memory
        answer = errorHandlers.get(key);
        if (answer != null) {
            LOG.trace("Using existing error handler for: {}", processor);
            return answer;
        }

        LOG.trace("Creating error handler for: {}", processor);
        ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
        // create error handler (create error handler directly to keep it light weight,
        // instead of using ProcessorDefinition.wrapInErrorHandler)
        try {
            processor = builder.createErrorHandler(routeContext, processor);

            // and wrap in unit of work processor so the copy exchange also can run under UoW
            answer = createUnitOfWorkProcessor(routeContext, processor, exchange);

            boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null;

            // must start the error handler
            ServiceHelper.startServices(answer);

            // here we don't cache the child unit of work
            if (!child) {
                // add to cache
                errorHandlers.putIfAbsent(key, answer);
            }

        } catch (Exception e) {
            throw ObjectHelper.wrapRuntimeCamelException(e);
        }
    } else {
        // and wrap in unit of work processor so the copy exchange also can run under UoW
        answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
    }

    return answer;
}
项目:rassyeyanie    文件:TestExchange.java   
@Override
   public UnitOfWork getUnitOfWork() {
return null;
   }
项目:rassyeyanie    文件:TestExchange.java   
@Override
public void setUnitOfWork(UnitOfWork unitOfWork) {
}