Java 类org.apache.camel.util.MessageHelper 实例源码

项目:Camel    文件:CamelInternalProcessor.java   
@Override
public Object before(Exchange exchange) throws Exception {
    if (backlogTracer.shouldTrace(processorDefinition, exchange)) {
        Date timestamp = new Date();
        String toNode = processorDefinition.getId();
        String exchangeId = exchange.getExchangeId();
        String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 4,
                backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars());

        // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route)
        String routeId = routeDefinition != null ? routeDefinition.getId() : null;
        if (first) {
            Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class);
            DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, messageAsXml);
            backlogTracer.traceEvent(pseudo);
        }
        DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, messageAsXml);
        backlogTracer.traceEvent(event);
    }

    return null;
}
项目:Camel    文件:RedeliveryErrorHandler.java   
protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
    Exception caught = exchange.getException();

    // we continue so clear any exceptions
    exchange.setException(null);
    // clear rollback flags
    exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
    // reset cached streams so they can be read again
    MessageHelper.resetStreamCache(exchange.getIn());

    // its continued then remove traces of redelivery attempted and caught exception
    exchange.getIn().removeHeader(Exchange.REDELIVERED);
    exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
    exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
    exchange.removeProperty(Exchange.FAILURE_HANDLED);
    // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception

    // create log message
    String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
    msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
    msg = msg + ". Handled and continue routing.";

    // log that we failed but want to continue
    logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, data, null);
}
项目:Camel    文件:JMXNotificationTraceEventHandler.java   
public void traceExchange(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception {
    if (notificationSender != null && tracer.isJmxTraceNotifications()) {
        String body = MessageHelper.extractBodyForLogging(exchange.getIn(), "", false, true, tracer.getTraceBodySize());

        if (body == null) {
            body = "";
        }
        String message = body.substring(0, Math.min(body.length(), MAX_MESSAGE_LENGTH));
        Map<String, Object> tm = createTraceMessage(node, exchange, body);

        Notification notification = new Notification("TraceNotification", exchange.toString(), num.getAndIncrement(), System.currentTimeMillis(), message);
        notification.setUserData(tm);

        notificationSender.sendNotification(notification);
    }

}
项目:Camel    文件:ManagedBrowsableEndpoint.java   
@Override
public String browseMessageAsXml(Integer index, Boolean includeBody) {
    List<Exchange> exchanges = getEndpoint().getExchanges();

    if (index >= exchanges.size()) {
        return null;
    }
    Exchange exchange = exchanges.get(index);
    if (exchange == null) {
        return null;
    }

    Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
    String xml = MessageHelper.dumpAsXml(msg, includeBody);

    return xml;
}
项目:Camel    文件:ZipkinClientRequestAdapter.java   
@Override
public Collection<KeyValueAnnotation> requestAnnotations() {
    KeyValueAnnotation key1 = KeyValueAnnotation.create("camel.client.endpoint.url", url);
    KeyValueAnnotation key2 = KeyValueAnnotation.create("camel.client.exchange.id", exchange.getExchangeId());
    KeyValueAnnotation key3 = KeyValueAnnotation.create("camel.client.exchange.pattern", exchange.getPattern().name());

    KeyValueAnnotation key4 = null;
    if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) {
        boolean streams = eventNotifier.isIncludeMessageBodyStreams();
        StreamCache cache = prepareBodyForLogging(exchange, streams);
        String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams);
        key4 = KeyValueAnnotation.create("camel.client.exchange.message.request.body", body);
        if (cache != null) {
            cache.reset();
        }
    }

    List<KeyValueAnnotation> list = new ArrayList<>();
    list.add(key1);
    list.add(key2);
    list.add(key3);
    if (key4 != null) {
        list.add(key4);
    }
    return list;
}
项目:Camel    文件:ZipkinServerRequestAdapter.java   
@Override
public Collection<KeyValueAnnotation> requestAnnotations() {
    KeyValueAnnotation key1 = KeyValueAnnotation.create("camel.server.endpoint.url", url);
    KeyValueAnnotation key2 = KeyValueAnnotation.create("camel.server.exchange.id", exchange.getExchangeId());
    KeyValueAnnotation key3 = KeyValueAnnotation.create("camel.server.exchange.pattern", exchange.getPattern().name());

    KeyValueAnnotation key4 = null;
    if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) {
        boolean streams = eventNotifier.isIncludeMessageBodyStreams();
        StreamCache cache = prepareBodyForLogging(exchange, streams);
        String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams);
        key4 = KeyValueAnnotation.create("camel.server.exchange.message.request.body", body);
        if (cache != null) {
            cache.reset();
        }
    }

    List<KeyValueAnnotation> list = new ArrayList<>();
    list.add(key1);
    list.add(key2);
    list.add(key3);
    if (key4 != null) {
        list.add(key4);
    }
    return list;
}
项目:Camel    文件:JettyContentTypeTest.java   
protected void sendMessageWithContentType(String charset, boolean usingGZip) {
    Endpoint endpoint = context.getEndpoint("http://localhost:{{port}}/myapp/myservice");
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("<order>123</order>");
    exchange.getIn().setHeader("User", "Claus");
    exchange.getIn().setHeader("SOAPAction", "test");
    if (charset == null) {
        exchange.getIn().setHeader("Content-Type", "text/xml");
    } else {
        exchange.getIn().setHeader("Content-Type", "text/xml; charset=" + charset);
    }
    if (usingGZip) {
        exchange.getIn().setHeader(Exchange.CONTENT_ENCODING, "gzip");
    }
    template.send(endpoint, exchange);

    String body = exchange.getOut().getBody(String.class);
    assertEquals("<order>OK</order>", body);
    assertEquals("Get a wrong content-type ", MessageHelper.getContentType(exchange.getOut()), "text/xml");
}
项目:Camel    文件:IgniteComputeProducer.java   
@Override
public void apply(IgniteFuture<Object> future) {
    Message in = exchange.getIn();
    Message out = exchange.getOut();
    MessageHelper.copyHeaders(in, out, true);

    Object result = null;

    try {
        result = future.get();
    } catch (Exception e) {
        exchange.setException(e);
        callback.done(false);
        return;
    }

    exchange.getOut().setBody(result);
    callback.done(false);
}
项目:Camel    文件:IgniteMessagingProducer.java   
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
    Message in = exchange.getIn();
    Message out = exchange.getOut();
    MessageHelper.copyHeaders(exchange.getIn(), out, true);

    Object body = in.getBody();

    if (endpoint.getSendMode() == IgniteMessagingSendMode.UNORDERED) {
        if (body instanceof Collection<?> && !endpoint.isTreatCollectionsAsCacheObjects()) {
            messaging.send(topicFor(exchange), (Collection<?>) body);
        } else {
            messaging.send(topicFor(exchange), body);
        }
    } else {
        messaging.sendOrdered(topicFor(exchange), body, endpoint.getTimeout());
    }

    IgniteHelper.maybePropagateIncomingBody(endpoint, in, out);

    return true;
}
项目:beyondj    文件:DefaultJettyHttpBinding.java   
protected void populateResponse(Exchange exchange, JettyContentExchange httpExchange,
                                Message in, HeaderFilterStrategy strategy, int responseCode) throws IOException {
    Message answer = exchange.getOut();

    answer.setHeader(Exchange.HTTP_RESPONSE_CODE, responseCode);

    // must use response fields to get the http headers as
    // httpExchange.getHeaders() does not work well with multi valued headers
    Map<String, Collection<String>> headers = httpExchange.getResponseHeaders();
    for (Map.Entry<String, Collection<String>> ent : headers.entrySet()) {
        String name = ent.getKey();
        Collection<String> values = ent.getValue();
        for (String value : values) {
            if (name.toLowerCase().equals("content-type")) {
                name = Exchange.CONTENT_TYPE;
                exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.getCharsetNameFromContentType(value));
            }
            if (strategy != null && !strategy.applyFilterToExternalHeaders(name, value, exchange)) {
                HttpHelper.appendHeader(answer.getHeaders(), name, value);
            }
        }
    }

    // preserve headers from in by copying any non existing headers
    // to avoid overriding existing headers with old values
    MessageHelper.copyHeaders(exchange.getIn(), answer, false);

    // extract body after headers has been set as we want to ensure content-type from Jetty HttpExchange
    // has been populated first
    answer.setBody(extractResponseBody(exchange, httpExchange));
}
项目:Camel    文件:DefaultTraceEventMessage.java   
/**
 * Creates a {@link DefaultTraceEventMessage} based on the given node it was traced while processing
 * the current {@link Exchange}
 *
 * @param toNode the node where this trace is intercepted
 * @param exchange the current {@link Exchange}
 */
public DefaultTraceEventMessage(final Date timestamp, final ProcessorDefinition<?> toNode, final Exchange exchange) {
    this.tracedExchange = exchange;
    Message in = exchange.getIn();

    // need to use defensive copies to avoid Exchange altering after the point of interception
    this.timestamp = timestamp;
    this.fromEndpointUri = exchange.getFromEndpoint() != null ? exchange.getFromEndpoint().getEndpointUri() : null;
    this.previousNode = extractFromNode(exchange);
    this.toNode = extractToNode(exchange);
    this.exchangeId = exchange.getExchangeId();
    this.routeId = exchange.getFromRouteId();
    this.shortExchangeId = extractShortExchangeId(exchange);
    this.exchangePattern = exchange.getPattern().toString();
    this.properties = exchange.getProperties().isEmpty() ? null : exchange.getProperties().toString();
    this.headers = in.getHeaders().isEmpty() ? null : in.getHeaders().toString();
    // We should not turn the message body into String
    this.body = MessageHelper.extractBodyForLogging(in, "");
    this.bodyType = MessageHelper.getBodyTypeName(in);
    if (exchange.hasOut()) {
        Message out = exchange.getOut();
        this.outHeaders = out.getHeaders().isEmpty() ? null : out.getHeaders().toString();
        this.outBody = MessageHelper.extractBodyAsString(out);
        this.outBodyType = MessageHelper.getBodyTypeName(out);
    }
    this.causedByException = extractCausedByException(exchange);
}
项目:Camel    文件:BacklogDebugger.java   
@Override
public void beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) {
    // store a copy of the message so we can see that from the debugger
    Date timestamp = new Date();
    String toNode = definition.getId();
    String routeId = ProcessorDefinitionHelper.getRouteId(definition);
    String exchangeId = exchange.getExchangeId();
    String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), isBodyIncludeFiles(), getBodyMaxChars());
    long uid = debugCounter.incrementAndGet();

    BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, messageAsXml);
    suspendedBreakpointMessages.put(nodeId, msg);

    // suspend at this breakpoint
    final SuspendedExchange se = suspendedBreakpoints.get(nodeId);
    if (se != null) {
        // now wait until we should continue
        logger.log("NodeBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId());
        try {
            boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS);
            if (!hit) {
                logger.log("NodeBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN);
            } else {
                logger.log("NodeBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId());
            }
        } catch (InterruptedException e) {
            // ignore
        }
    }
}
项目:Camel    文件:BacklogDebugger.java   
@Override
public void beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) {
    // store a copy of the message so we can see that from the debugger
    Date timestamp = new Date();
    String toNode = definition.getId();
    String routeId = ProcessorDefinitionHelper.getRouteId(definition);
    String exchangeId = exchange.getExchangeId();
    String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), isBodyIncludeFiles(), getBodyMaxChars());
    long uid = debugCounter.incrementAndGet();

    BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, messageAsXml);
    suspendedBreakpointMessages.put(toNode, msg);

    // suspend at this breakpoint
    SuspendedExchange se = new SuspendedExchange(exchange, new CountDownLatch(1));
    suspendedBreakpoints.put(toNode, se);

    // now wait until we should continue
    logger.log("StepBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId());
    try {
        boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS);
        if (!hit) {
            logger.log("StepBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN);
        } else {
            logger.log("StepBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId());
        }
    } catch (InterruptedException e) {
        // ignore
    }
}
项目:Camel    文件:StreamCachingInterceptor.java   
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
    StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
    if (newBody != null) {
        exchange.getIn().setBody(newBody);
    }
    MessageHelper.resetStreamCache(exchange.getIn());

    return processor.process(exchange, callback);
}
项目:Camel    文件:RoutingSlip.java   
protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) {
    Exchange copy = new DefaultExchange(current);
    // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
    // before processing the next step in the pipeline, so we have a snapshot of the exchange
    // just before. This snapshot is used if Camel should do redeliveries (re try) using
    // DeadLetterChannel. That is why it's important the id is the same, as it is the *same*
    // exchange being routed.
    copy.setExchangeId(current.getExchangeId());
    copyOutToIn(copy, current);

    // ensure stream caching is reset
    MessageHelper.resetStreamCache(copy.getIn());

    return copy;
}
项目:Camel    文件:RecipientListProcessor.java   
public void begin() {
    // we have already acquired and prepare the producer
    LOG.trace("RecipientProcessorExchangePair #{} begin: {}", index, exchange);
    exchange.setProperty(Exchange.RECIPIENT_LIST_ENDPOINT, endpoint.getEndpointUri());
    // ensure stream caching is reset
    MessageHelper.resetStreamCache(exchange.getIn());
    // if the MEP on the endpoint is different then
    if (pattern != null) {
        originalPattern = exchange.getPattern();
        LOG.trace("Using exchangePattern: {} on exchange: {}", pattern, exchange);
        exchange.setPattern(pattern);
    }
}
项目:Camel    文件:DefaultExchangeFormatter.java   
protected String getBodyAsString(Message message) {
    if (message.getBody() instanceof Future) {
        if (!isShowFuture()) {
            // just use a to string of the future object
            return message.getBody().toString();
        }
    }

    return MessageHelper.extractBodyForLogging(message, "", isShowStreams(), isShowFiles(), getMaxChars(message));
}
项目:Camel    文件:RedeliveryErrorHandler.java   
protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
    if (!redeliveryEnabled) {
        throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly.");
    }
    // there must be a defensive copy of the exchange
    ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this);

    // okay we will give it another go so clear the exception so we can try again
    exchange.setException(null);

    // clear rollback flags
    exchange.setProperty(Exchange.ROLLBACK_ONLY, null);

    // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
    // and then put these on the exchange when doing a redelivery / fault processor

    // preserve these headers
    Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
    Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
    Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);

    // we are redelivering so copy from original back to exchange
    exchange.getIn().copyFrom(data.original.getIn());
    exchange.setOut(null);
    // reset cached streams so they can be read again
    MessageHelper.resetStreamCache(exchange.getIn());

    // put back headers
    if (redeliveryCounter != null) {
        exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
    }
    if (redeliveryMaxCounter != null) {
        exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
    }
    if (redelivered != null) {
        exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
    }
}
项目:Camel    文件:ManagedBrowsableEndpoint.java   
@Override
public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) {
    if (fromIndex == null) {
        fromIndex = 0;
    }
    if (toIndex == null) {
        toIndex = Integer.MAX_VALUE;
    }
    if (fromIndex > toIndex) {
        throw new IllegalArgumentException("From index cannot be larger than to index, was: " + fromIndex + " > " + toIndex);
    }

    List<Exchange> exchanges = getEndpoint().getExchanges();
    if (exchanges.size() == 0) {
        return null;
    }

    StringBuilder sb = new StringBuilder();
    sb.append("<messages>");
    for (int i = fromIndex; i < exchanges.size() && i <= toIndex; i++) {
        Exchange exchange = exchanges.get(i);
        Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
        String xml = MessageHelper.dumpAsXml(msg, includeBody);
        sb.append("\n").append(xml);
    }
    sb.append("\n</messages>");
    return sb.toString();
}
项目:Camel    文件:BaseTypeConverterRegistry.java   
protected TypeConversionException createTypeConversionException(Exchange exchange, Class<?> type, Object value, Throwable cause) {
    Object body;
    // extract the body for logging which allows to limit the message body in the exception/stacktrace
    // and also can be used to turn off logging sensitive message data
    if (exchange != null) {
        body = MessageHelper.extractValueForLogging(value, exchange.getIn());
    } else {
        body = value;
    }
    return new TypeConversionException(body, type, cause);
}
项目:Camel    文件:DefaultAsyncProcessorAwaitManager.java   
@Override
public void interrupt(Exchange exchange) {
    AwaitThreadEntry entry = (AwaitThreadEntry) inflight.get(exchange);
    if (entry != null) {
        try {
            StringBuilder sb = new StringBuilder();
            sb.append("Interrupted while waiting for asynchronous callback, will release the following blocked thread which was waiting for exchange to finish processing with exchangeId: ");
            sb.append(exchange.getExchangeId());
            sb.append("\n");

            sb.append(dumpBlockedThread(entry));

            // dump a route stack trace of the exchange
            String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, false);
            if (routeStackTrace != null) {
                sb.append(routeStackTrace);
            }
            LOG.warn(sb.toString());

        } catch (Exception e) {
            throw ObjectHelper.wrapRuntimeCamelException(e);
        } finally {
            if (statistics.isStatisticsEnabled()) {
                interruptedCounter.incrementAndGet();
            }
            exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
            entry.getLatch().countDown();
        }
    }
}
项目:Camel    文件:XPathBuilder.java   
/**
 * Strategy method to extract the document from the exchange.
 */
protected Object getDocument(Exchange exchange, Object body) {
    try {
        return doGetDocument(exchange, body);
    } catch (Exception e) {
        throw ObjectHelper.wrapRuntimeCamelException(e);
    } finally {
        // call the reset if the in message body is StreamCache
        MessageHelper.resetStreamCache(exchange.getIn());
    }
}
项目:Camel    文件:DefaultHttpBinding.java   
public void doWriteResponse(Message message, HttpServletResponse response, Exchange exchange) throws IOException {
    // set the status code in the response. Default is 200.
    if (message.getHeader(Exchange.HTTP_RESPONSE_CODE) != null) {
        int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
        response.setStatus(code);
    }
    // set the content type in the response.
    String contentType = MessageHelper.getContentType(message);
    if (contentType != null) {
        response.setContentType(contentType);
    }

    // append headers
    // must use entrySet to ensure case of keys is preserved
    for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
        String key = entry.getKey();
        Object value = entry.getValue();
        // use an iterator as there can be multiple values. (must not use a delimiter)
        final Iterator<?> it = ObjectHelper.createIterator(value, null);
        while (it.hasNext()) {
            String headerValue = convertHeaderValueToString(exchange, it.next());
            if (headerValue != null && headerFilterStrategy != null
                    && !headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, exchange)) {
                response.addHeader(key, headerValue);
            }
        }
    }

    // write the body.
    if (message.getBody() != null) {
        if (GZIPHelper.isGzip(message)) {
            doWriteGZIPResponse(message, response, exchange);
        } else {
            doWriteDirectResponse(message, response, exchange);
        }
    }
}
项目:Camel    文件:HttpProducer.java   
protected void populateResponse(Exchange exchange, HttpRequestBase httpRequest, HttpResponse httpResponse,
                                Message in, HeaderFilterStrategy strategy, int responseCode) throws IOException, ClassNotFoundException {
    // We just make the out message is not create when extractResponseBody throws exception
    Object response = extractResponseBody(httpRequest, httpResponse, exchange, getEndpoint().isIgnoreResponseBody());
    Message answer = exchange.getOut();

    answer.setHeader(Exchange.HTTP_RESPONSE_CODE, responseCode);
    if (httpResponse.getStatusLine() != null) {
        answer.setHeader(Exchange.HTTP_RESPONSE_TEXT, httpResponse.getStatusLine().getReasonPhrase());
    }
    answer.setBody(response);

    // propagate HTTP response headers
    Header[] headers = httpResponse.getAllHeaders();
    for (Header header : headers) {
        String name = header.getName();
        String value = header.getValue();
        if (name.toLowerCase().equals("content-type")) {
            name = Exchange.CONTENT_TYPE;
            exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.getCharsetNameFromContentType(value));
        }
        // use http helper to extract parameter value as it may contain multiple values
        Object extracted = HttpHelper.extractHttpParameterValue(value);
        if (strategy != null && !strategy.applyFilterToExternalHeaders(name, extracted, exchange)) {
            HttpHelper.appendHeader(answer.getHeaders(), name, extracted);
        }
    }

    // endpoint might be configured to copy headers from in to out
    // to avoid overriding existing headers with old values just
    // filter the http protocol headers
    if (getEndpoint().isCopyHeaders()) {
        MessageHelper.copyHeaders(exchange.getIn(), answer, httpProtocolHeaderFilterStrategy, false);
    }
}
项目:Camel    文件:ZipkinClientResponseAdaptor.java   
@Override
public Collection<KeyValueAnnotation> responseAnnotations() {
    KeyValueAnnotation key1 = KeyValueAnnotation.create("camel.client.endpoint.url", url);
    KeyValueAnnotation key2 = KeyValueAnnotation.create("camel.client.exchange.id", exchange.getExchangeId());
    KeyValueAnnotation key3 = KeyValueAnnotation.create("camel.client.exchange.pattern", exchange.getPattern().name());

    KeyValueAnnotation key4 = null;
    if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) {
        boolean streams = eventNotifier.isIncludeMessageBodyStreams();
        StreamCache cache = prepareBodyForLogging(exchange, streams);
        String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams);
        key4 = KeyValueAnnotation.create("camel.client.exchange.message.response.body", body);
        if (cache != null) {
            cache.reset();
        }
    }

    KeyValueAnnotation key5 = null;
    // lets capture http response code for http based components
    String responseCode = exchange.hasOut() ? exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE, String.class) : exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, String.class);
    if (responseCode != null) {
        key5 = KeyValueAnnotation.create("camel.client.exchange.message.response.code", responseCode);
    }

    List<KeyValueAnnotation> list = new ArrayList<>();
    list.add(key1);
    list.add(key2);
    list.add(key3);
    if (key4 != null) {
        list.add(key4);
    }
    if (key5 != null) {
        list.add(key5);
    }
    return list;
}
项目:Camel    文件:DefaultJettyHttpBinding.java   
protected void populateResponse(Exchange exchange, JettyContentExchange httpExchange,
                                Message in, HeaderFilterStrategy strategy, int responseCode) throws IOException {
    Message answer = exchange.getOut();

    answer.setHeader(Exchange.HTTP_RESPONSE_CODE, responseCode);

    // must use response fields to get the http headers as
    // httpExchange.getHeaders() does not work well with multi valued headers
    Map<String, Collection<String>> headers = httpExchange.getResponseHeaders();
    for (Map.Entry<String, Collection<String>> ent : headers.entrySet()) {
        String name = ent.getKey();
        Collection<String> values = ent.getValue();
        for (String value : values) {
            if (name.toLowerCase().equals("content-type")) {
                name = Exchange.CONTENT_TYPE;
                exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.getCharsetNameFromContentType(value));
            }
            if (strategy != null && !strategy.applyFilterToExternalHeaders(name, value, exchange)) {
                HttpHelper.appendHeader(answer.getHeaders(), name, value);
            }
        }
    }

    // preserve headers from in by copying any non existing headers
    // to avoid overriding existing headers with old values
    // We also need to apply the httpProtocolHeaderFilterStrategy to filter the http protocol header
    MessageHelper.copyHeaders(exchange.getIn(), answer, httpProtocolHeaderFilterStrategy, false);

    // extract body after headers has been set as we want to ensure content-type from Jetty HttpExchange
    // has been populated first
    answer.setBody(extractResponseBody(exchange, httpExchange));
}
项目:Camel    文件:DefaultUndertowHttpBinding.java   
@Override
public Object toHttpRequest(ClientRequest clientRequest, Message message) {

    Object body = message.getBody();

    // set the content type in the response.
    String contentType = MessageHelper.getContentType(message);
    if (contentType != null) {
        // set content-type
        clientRequest.getRequestHeaders().put(Headers.CONTENT_TYPE, contentType);
        LOG.trace("Content-Type: {}", contentType);
    }

    TypeConverter tc = message.getExchange().getContext().getTypeConverter();

    //copy headers from Message to Request
    for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
        String key = entry.getKey();
        Object value = entry.getValue();
        // use an iterator as there can be multiple values. (must not use a delimiter)
        final Iterator<?> it = ObjectHelper.createIterator(value, null);
        while (it.hasNext()) {
            String headerValue = tc.convertTo(String.class, it.next());
            if (headerValue != null && headerFilterStrategy != null
                && !headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
                LOG.trace("HTTP-Header: {}={}", key, headerValue);
                clientRequest.getRequestHeaders().add(new HttpString(key), headerValue);
            }
        }
    }

    return body;
}
项目:Camel    文件:JettyImageFileTest.java   
private void sendImageContent(boolean usingGZip) throws Exception {
    Endpoint endpoint = context.getEndpoint("http://localhost:{{port}}/myapp/myservice");
    Exchange exchange = endpoint.createExchange();        
    if (usingGZip) {
        exchange.getIn().setHeader(Exchange.CONTENT_ENCODING, "gzip");
    }
    template.send(endpoint, exchange);

    assertNotNull(exchange.getOut().getBody());
    assertEquals("Get a wrong content-type ", MessageHelper.getContentType(exchange.getOut()), "image/jpeg");
}
项目:Camel    文件:JettyContentTypeTest.java   
@Test
public void testMixedContentType() throws Exception {
    Endpoint endpoint = context.getEndpoint("http://localhost:{{port}}/myapp/myservice");
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("<order>123</order>");
    exchange.getIn().setHeader("Content-Type", "text/xml");
    template.send(endpoint, exchange);

    String body = exchange.getOut().getBody(String.class);
    assertEquals("FAIL", body);
    assertEquals("Get a wrong content-type ", MessageHelper.getContentType(exchange.getOut()), "text/plain");
}
项目:Camel    文件:IgniteIdGenProducer.java   
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
    Message in = exchange.getIn();
    Message out = exchange.getOut();
    MessageHelper.copyHeaders(in, out, true);

    Long id = in.getBody(Long.class);

    switch (idGenOperationFor(exchange)) {

    case ADD_AND_GET:
        out.setBody(atomicSeq.addAndGet(id));
        break;

    case GET:
        out.setBody(atomicSeq.get());
        break;

    case GET_AND_ADD:
        out.setBody(atomicSeq.getAndAdd(id));
        break;

    case GET_AND_INCREMENT:
        out.setBody(atomicSeq.getAndIncrement());
        break;

    case INCREMENT_AND_GET:
        out.setBody(atomicSeq.incrementAndGet());
        break;

    default:
        exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite ID Generator producer."));
        return true;
    }

    return true;
}
项目:Camel    文件:VertxProducer.java   
@Override
public void handle(AsyncResult<Message<Object>> event) {
    try {
        // preserve headers
        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
        Throwable e = event.cause();
        if (e != null) {
            exchange.setException(e);
        } else {
            exchange.getOut().setBody(event.result().body());
        }
    } finally {
        callback.done(false);
    }
}
项目:Camel    文件:HttpProducer.java   
protected void populateResponse(Exchange exchange, HttpMethod method, Message in, HeaderFilterStrategy strategy, int responseCode) throws IOException, ClassNotFoundException {
    //We just make the out message is not create when extractResponseBody throws exception,
    Object response = extractResponseBody(method, exchange, getEndpoint().isIgnoreResponseBody());
    Message answer = exchange.getOut();

    answer.setHeader(Exchange.HTTP_RESPONSE_CODE, responseCode);
    answer.setHeader(Exchange.HTTP_RESPONSE_TEXT, method.getStatusText());
    answer.setBody(response);

    // propagate HTTP response headers
    Header[] headers = method.getResponseHeaders();
    for (Header header : headers) {
        String name = header.getName();
        String value = header.getValue();
        if (name.toLowerCase().equals("content-type")) {
            name = Exchange.CONTENT_TYPE;
            exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.getCharsetNameFromContentType(value));
        }

        // use http helper to extract parameter value as it may contain multiple values
        Object extracted = HttpHelper.extractHttpParameterValue(value);
        if (strategy != null && !strategy.applyFilterToExternalHeaders(name, extracted, exchange)) {
            HttpHelper.appendHeader(answer.getHeaders(), name, extracted);
        }
    }

    // endpoint might be configured to copy headers from in to out
    // to avoid overriding existing headers with old values just
    // filter the http protocol headers
    if (getEndpoint().isCopyHeaders()) {
        MessageHelper.copyHeaders(exchange.getIn(), answer, httpProtocolHeaderFilterStrategy, false);
    }
}
项目:Camel    文件:DefaultAhcBinding.java   
@Override
public void onStatusReceived(AhcEndpoint endpoint, Exchange exchange, HttpResponseStatus responseStatus) throws Exception {
    // preserve headers from in by copying any non existing headers
    // to avoid overriding existing headers with old values
    // Just filter the http protocol headers 
    MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), httpProtocolHeaderFilterStrategy, false);
    exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, responseStatus.getStatusCode());
    exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_TEXT, responseStatus.getStatusText());
}
项目:Camel    文件:CMISProducer.java   
private String getMimeType(Message message) throws NoSuchHeaderException {
    String mimeType = message.getHeader(PropertyIds.CONTENT_STREAM_MIME_TYPE, String.class);
    if (mimeType == null) {
        mimeType = MessageHelper.getContentType(message);
    }
    return mimeType;
}
项目:Camel    文件:DefaultTraceFormatter.java   
public Object format(final TraceInterceptor interceptor, final ProcessorDefinition<?> node, final Exchange exchange) {
    Message in = exchange.getIn();
    Message out = null;
    if (exchange.hasOut()) {
        out = exchange.getOut();
    }

    StringBuilder sb = new StringBuilder();
    sb.append(extractBreadCrumb(interceptor, node, exchange));

    if (showExchangePattern) {
        sb.append(", Pattern:").append(exchange.getPattern());
    }
    // only show properties if we have any
    if (showProperties && !exchange.getProperties().isEmpty()) {
        sb.append(", Properties:").append(exchange.getProperties());
    }
    // only show headers if we have any
    if (showHeaders && !in.getHeaders().isEmpty()) {
        sb.append(", Headers:").append(in.getHeaders());
    }
    if (showBodyType) {
        sb.append(", BodyType:").append(MessageHelper.getBodyTypeName(in));
    }
    if (showBody) {
        sb.append(", Body:").append(MessageHelper.extractBodyForLogging(in, ""));
    }
    if (showOutHeaders && out != null) {
        sb.append(", OutHeaders:").append(out.getHeaders());
    }
    if (showOutBodyType && out != null) {
        sb.append(", OutBodyType:").append(MessageHelper.getBodyTypeName(out));
    }
    if (showOutBody && out != null) {
        sb.append(", OutBody:").append(MessageHelper.extractBodyForLogging(out, ""));
    }        
    if (showException && exchange.getException() != null) {
        sb.append(", Exception:").append(exchange.getException());
    }

    // replace ugly <<<, with <<<
    String s = sb.toString();
    s = s.replaceFirst("<<<,", "<<<");

    if (maxChars > 0) {
        if (s.length() > maxChars) {
            s = s.substring(0, maxChars) + "...";
        }
        return s;
    } else {
        return s;
    }
}
项目:Camel    文件:ExpressionBuilder.java   
/**
 * Returns the message history (including exchange details or not)
 */
public static Expression messageHistoryExpression(final boolean detailed) {
    return new ExpressionAdapter() {

        private ExchangeFormatter formatter;

        public Object evaluate(Exchange exchange) {
            ExchangeFormatter ef = null;
            if (detailed) {
                // use the exchange formatter to log exchange details
                ef = getOrCreateExchangeFormatter(exchange.getContext());
            }
            return MessageHelper.dumpMessageHistoryStacktrace(exchange, ef, false);
        }

        private ExchangeFormatter getOrCreateExchangeFormatter(CamelContext camelContext) {
            if (formatter == null) {
                Set<ExchangeFormatter> formatters = camelContext.getRegistry().findByType(ExchangeFormatter.class);
                if (formatters != null && formatters.size() == 1) {
                    formatter = formatters.iterator().next();
                } else {
                    // setup exchange formatter to be used for message history dump
                    DefaultExchangeFormatter def = new DefaultExchangeFormatter();
                    def.setShowExchangeId(true);
                    def.setMultiline(true);
                    def.setShowHeaders(true);
                    def.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
                    try {
                        Integer maxChars = CamelContextHelper.parseInteger(camelContext, camelContext.getProperty(Exchange.LOG_DEBUG_BODY_MAX_CHARS));
                        if (maxChars != null) {
                            def.setMaxChars(maxChars);
                        }
                    } catch (Exception e) {
                        throw ObjectHelper.wrapRuntimeCamelException(e);
                    }
                    formatter = def;
                }
            }
            return formatter;
        }

        @Override
        public String toString() {
            return "messageHistory(" + detailed + ")";
        }
    };
}
项目:Camel    文件:ZipkinServerResponseAdapter.java   
@Override
public Collection<KeyValueAnnotation> responseAnnotations() {
    String id = exchange.getExchangeId();
    String mep = exchange.getPattern().name();

    KeyValueAnnotation key1 = KeyValueAnnotation.create("camel.server.endpoint.url", url);
    KeyValueAnnotation key2 = KeyValueAnnotation.create("camel.server.exchange.id", id);
    KeyValueAnnotation key3 = KeyValueAnnotation.create("camel.server.exchange.pattern", mep);

    KeyValueAnnotation key4 = null;
    if (exchange.getException() != null) {
        String message = exchange.getException().getMessage();
        key4 = KeyValueAnnotation.create("camel.server.exchange.failure", message);
    } else if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) {
        boolean streams = eventNotifier.isIncludeMessageBodyStreams();
        StreamCache cache = prepareBodyForLogging(exchange, streams);
        String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams);
        key4 = KeyValueAnnotation.create("camel.server.exchange.message.response.body", body);
        if (cache != null) {
            cache.reset();
        }
    }

    KeyValueAnnotation key5 = null;
    // lets capture http response code for http based components
    String responseCode = exchange.hasOut() ? exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE, String.class) : exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, String.class);
    if (responseCode != null) {
        key5 = KeyValueAnnotation.create("camel.server.exchange.message.response.code", responseCode);
    }

    List<KeyValueAnnotation> list = new ArrayList<>();
    list.add(key1);
    list.add(key2);
    list.add(key3);
    if (key4 != null) {
        list.add(key4);
    }
    if (key5 != null) {
        list.add(key5);
    }
    return list;
}
项目:Camel    文件:MongoDbProducer.java   
private void copyHeaders(Exchange exchange) {
    MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
}
项目:Camel    文件:DefaultUndertowHttpBinding.java   
@Override
public Object toHttpResponse(HttpServerExchange httpExchange, Message message) {
    boolean failed = message.getExchange().isFailed();
    int defaultCode = failed ? 500 : 200;

    int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class);

    httpExchange.setResponseCode(code);

    TypeConverter tc = message.getExchange().getContext().getTypeConverter();

    //copy headers from Message to Response
    for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
        String key = entry.getKey();
        Object value = entry.getValue();
        // use an iterator as there can be multiple values. (must not use a delimiter)
        final Iterator<?> it = ObjectHelper.createIterator(value, null);
        while (it.hasNext()) {
            String headerValue = tc.convertTo(String.class, it.next());
            if (headerValue != null && headerFilterStrategy != null
                && !headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
                LOG.trace("HTTP-Header: {}={}", key, headerValue);
                httpExchange.getResponseHeaders().add(new HttpString(key), headerValue);
            }
        }
    }

    Exception exception = message.getExchange().getException();

    if (exception != null) {
        httpExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, MimeMappings.DEFAULT_MIME_MAPPINGS.get("txt"));

        StringWriter sw = new StringWriter();
        PrintWriter pw = new PrintWriter(sw);
        exception.printStackTrace(pw);

        String exceptionMessage = sw.toString();

        ExchangeHelper.setFailureHandled(message.getExchange());
        return exceptionMessage;
    }

    // set the content type in the response.
    String contentType = MessageHelper.getContentType(message);
    if (contentType != null) {
        // set content-type
        httpExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, contentType);
        LOG.trace("Content-Type: {}", contentType);
    }

    return message.getBody();
}
项目:Camel    文件:XQueryBuilder.java   
/**
 * Creates a dynamic context for the given exchange
 */
protected DynamicQueryContext createDynamicContext(Exchange exchange) throws Exception {
    Configuration config = getConfiguration();
    DynamicQueryContext dynamicQueryContext = new DynamicQueryContext(config);

    Message in = exchange.getIn();
    Item item = null;
    if (ObjectHelper.isNotEmpty(getHeaderName())) {
        item = in.getHeader(getHeaderName(), Item.class);
    } else {
        item = in.getBody(Item.class);
    }
    if (item != null) {
        dynamicQueryContext.setContextItem(item);
    } else {
        Object body = null;
        if (ObjectHelper.isNotEmpty(getHeaderName())) {
            body = in.getHeader(getHeaderName());
        } else {
            body = in.getBody();
        }

        // the underlying input stream, which we need to close to avoid locking files or other resources
        InputStream is = null;
        try {
            Source source;
            // only convert to input stream if really needed
            if (isInputStreamNeeded(exchange)) {
                if (ObjectHelper.isNotEmpty(getHeaderName())) {
                    is = exchange.getIn().getHeader(getHeaderName(), InputStream.class);
                } else {
                    is = exchange.getIn().getBody(InputStream.class);
                }
                source = getSource(exchange, is);
            } else {
                source = getSource(exchange, body);
            }

            // special for bean invocation
            if (source == null) {
                if (body instanceof BeanInvocation) {
                    // if its a null bean invocation then handle that
                    BeanInvocation bi = exchange.getContext().getTypeConverter().convertTo(BeanInvocation.class, body);
                    if (bi.getArgs() != null && bi.getArgs().length == 1 && bi.getArgs()[0] == null) {
                        // its a null argument from the bean invocation so use null as answer
                        source = null;
                    }
                }
            }

            if (source == null) {
                // indicate it was not possible to convert to a Source type
                throw new NoTypeConversionAvailableException(body, Source.class);
            }

            DocumentInfo doc = config.buildDocument(source);
            dynamicQueryContext.setContextItem(doc);
        } finally {
            // can deal if is is null
            IOHelper.close(is);
        }
    }

    configureQuery(dynamicQueryContext, exchange);
    // call the reset if the in message body is StreamCache
    MessageHelper.resetStreamCache(exchange.getIn());
    return dynamicQueryContext;
}