Java 类org.apache.camel.Producer 实例源码

项目:beyondj    文件:JettyHttpEndpoint.java   
@Override
public Producer createProducer() throws Exception {
    JettyHttpProducer answer = new JettyHttpProducer(this);
    if (client != null) {
        // use shared client, and ensure its started so we can use it
        client.start();
        answer.setSharedClient(client);
        answer.setBinding(getJettyBinding(client));
    } else {
        HttpClient httpClient = createJettyHttpClient();
        answer.setClient(httpClient);
        answer.setBinding(getJettyBinding(httpClient));
    }

    if (isSynchronous()) {
        return new SynchronousDelegateProducer(answer);
    } else {
        return answer;
    }
}
项目:Camel    文件:FromFtpMoveFileTest.java   
private void prepareFtpServer() throws Exception {
    // prepares the FTP Server by creating a file on the server that we want to unit
    // test that we can pool and store as a local file
    Endpoint endpoint = context.getEndpoint(getFtpUrl());
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("Hello World this file will be moved");
    exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
    Producer producer = endpoint.createProducer();
    producer.start();
    producer.process(exchange);
    producer.stop();

    // assert file is created
    File file = new File(FTP_ROOT_DIR + "/movefile/hello.txt");
    assertTrue("The file should exists", file.exists());
}
项目:Camel    文件:DefaultProducerCacheTest.java   
public void testCacheProducerAcquireAndRelease() throws Exception {
    ProducerCache cache = new ProducerCache(this, context);
    cache.start();

    assertEquals("Size should be 0", 0, cache.size());

    // test that we cache at most 1000 producers to avoid it eating to much memory
    for (int i = 0; i < 1003; i++) {
        Endpoint e = context.getEndpoint("direct:queue:" + i);
        Producer p = cache.acquireProducer(e);
        cache.releaseProducer(e, p);
    }

    // the eviction is async so force cleanup
    cache.cleanUp();

    assertEquals("Size should be 1000", 1000, cache.size());
    cache.stop();
}
项目:Camel    文件:NettyTransferExchangeOptionTest.java   
private Exchange sendExchange(boolean setException) throws Exception {
    Endpoint endpoint = context.getEndpoint("netty:tcp://localhost:{{port}}?transferExchange=true");
    Exchange exchange = endpoint.createExchange();

    Message message = exchange.getIn();
    message.setBody("Hello!");
    message.setHeader("cheese", "feta");
    exchange.setProperty("ham", "old");
    exchange.setProperty("setException", setException);

    Producer producer = endpoint.createProducer();
    producer.start();

    // ensure to stop producer after usage
    try {
        producer.process(exchange);
    } finally {
        producer.stop();
    }

    return exchange;
}
项目:Camel    文件:ProducerCache.java   
public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
    this.source = source;
    this.camelContext = camelContext;
    if (producerServicePool == null) {
        // use shared producer pool which lifecycle is managed by CamelContext
        this.pool = camelContext.getProducerServicePool();
        this.stopServicePool = false;
    } else {
        this.pool = producerServicePool;
        this.stopServicePool = true;
    }
    this.producers = cache;
    if (producers instanceof LRUCache) {
        maxCacheSize = ((LRUCache) producers).getMaxCacheSize();
    }

    // only if JMX is enabled
    if (camelContext.getManagementStrategy().getManagementAgent() != null) {
        this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended();
    } else {
        this.extendedStatistics = false;
    }
}
项目:Camel    文件:FromFtpPreMoveFilePostfixTest.java   
private void prepareFtpServer() throws Exception {
    // prepares the FTP Server by creating a file on the server that we want to unit
    // test that we can pool and store as a local file
    Endpoint endpoint = context.getEndpoint(getFtpUrl());
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("Hello World this file will be moved");
    exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
    Producer producer = endpoint.createProducer();
    producer.start();
    producer.process(exchange);
    producer.stop();

    // assert file is created
    File file = new File(FTP_ROOT_DIR + "/movefile/hello.txt");
    assertTrue("The file should exists", file.exists());
}
项目:Camel    文件:MinaVMTransferExchangeOptionTest.java   
private Exchange sendExchange(boolean setException) throws Exception {
    Endpoint endpoint = context.getEndpoint("mina:vm://localhost:{{port}}?sync=true&encoding=UTF-8&transferExchange=true");
    Exchange exchange = endpoint.createExchange();

    Message message = exchange.getIn();
    message.setBody("Hello!");
    message.setHeader("cheese", "feta");
    exchange.setProperty("ham", "old");
    exchange.setProperty("setException", setException);

    Producer producer = endpoint.createProducer();
    producer.start();
    producer.process(exchange);

    return exchange;
}
项目:Camel    文件:EmptyProducerCache.java   
@Override
public Producer acquireProducer(Endpoint endpoint) {
    // always create a new producer
    Producer answer;
    try {
        answer = endpoint.createProducer();
        if (getCamelContext().isStartingRoutes() && answer.isSingleton()) {
            // if we are currently starting a route, then add as service and enlist in JMX
            // - but do not enlist non-singletons in JMX
            // - note addService will also start the service
            getCamelContext().addService(answer);
        } else {
            // must then start service so producer is ready to be used
            ServiceHelper.startService(answer);
        }
    } catch (Exception e) {
        throw new FailedToCreateProducerException(endpoint, e);
    }
    return answer;
}
项目:Camel    文件:SedaEndpointTest.java   
public void testSedaEndpoint() throws Exception {
    SedaEndpoint seda = new SedaEndpoint("seda://foo", context.getComponent("seda"), queue);
    assertNotNull(seda);

    assertEquals(1000, seda.getSize());
    assertSame(queue, seda.getQueue());
    assertEquals(1, seda.getConcurrentConsumers());

    Producer prod = seda.createProducer();
    seda.onStarted((SedaProducer) prod);
    assertEquals(1, seda.getProducers().size());

    Consumer cons = seda.createConsumer(new Processor() {
        public void process(Exchange exchange) throws Exception {
            // do nothing
        }
    });
    seda.onStarted((SedaConsumer) cons);
    assertEquals(1, seda.getConsumers().size());

    assertEquals(0, seda.getExchanges().size());
}
项目:syndesis    文件:ComponentProxyEndpoint.java   
@Override
public Producer createProducer() throws Exception {
    final Producer producer = endpoint.createProducer();

    final Processor beforeProducer = getBeforeProducer();
    final Processor afterProducer = getAfterProducer();

    // use a pipeline to process before, producer, after in that order
    // create producer with the pipeline
    final Processor pipeline = Pipeline.newInstance(getCamelContext(), beforeProducer, producer, afterProducer);

    return new ComponentProxyProducer(endpoint, pipeline);
}
项目:Camel    文件:XmppEndpoint.java   
public Producer createProducer() throws Exception {
    if (room != null) {
        return createGroupChatProducer();
    } else {
        if (isPubsub()) {
            return createPubSubProducer();
        }
        if (getParticipant() == null) {
            throw new IllegalArgumentException("No room or participant configured on this endpoint: " + this);
        }
        return createPrivateChatProducer(getParticipant());
    }
}
项目:Camel    文件:Mina2ProducerShutdownTest.java   
private void sendMessage() throws Exception {
    Endpoint endpoint = context.getEndpoint(URI);
    Producer producer = endpoint.createProducer();

    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("Hello World");

    producer.start();
    producer.process(exchange);
    producer.stop();
}
项目:Camel    文件:SplunkEndpoint.java   
public Producer createProducer() throws Exception {
    String[] uriSplit = splitUri(getEndpointUri());
    if (uriSplit.length > 0) {
        ProducerType producerType = ProducerType.fromUri(uriSplit[0]);
        return new SplunkProducer(this, producerType);
    }
    throw new IllegalArgumentException("Cannot create any producer with uri " + getEndpointUri() + ". A producer type was not provided (or an incorrect pairing was used).");
}
项目:Camel    文件:SparkEndpoint.java   
@Override
public Producer createProducer() throws Exception {
    LOG.trace("Creating {} Spark producer.", endpointType);
    if (endpointType == EndpointType.rdd) {
        LOG.trace("About to create RDD producer.");
        return new RddSparkProducer(this);
    } else if (endpointType == EndpointType.dataframe) {
        LOG.trace("About to create DataFrame producer.");
        return new DataFrameSparkProducer(this);
    } else {
        LOG.trace("About to create Hive producer.");
        return new HiveSparkProducer(this);
    }
}
项目:Camel    文件:DirectRouteTest.java   
public void testSedaQueue() throws Exception {
    CamelContext container = new DefaultCamelContext();

    final AtomicBoolean invoked = new AtomicBoolean();

    // lets add some routes
    container.addRoutes(new RouteBuilder() {
        public void configure() {
            from("direct:test.a").to("direct:test.b");
            from("direct:test.b").process(new Processor() {
                public void process(Exchange e) {
                    invoked.set(true);
                }
            });
        }
    });

    container.start();

    // now lets fire in a message
    Endpoint endpoint = container.getEndpoint("direct:test.a");
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setHeader("cheese", 123);

    Producer producer = endpoint.createProducer();
    producer.process(exchange);

    // now lets sleep for a while
    assertTrue("Did not receive the message!", invoked.get());

    container.stop();
}
项目:Camel    文件:CustomProducerServicePoolTest.java   
public Producer acquire(Endpoint endpoint) {
    if (endpoint instanceof MyEndpoint) {
        Producer answer = producer;
        producer = null;
        return answer;
    } else {
        return null;
    }
}
项目:Camel    文件:Jt400Endpoint.java   
@Override
public Producer createProducer() throws Exception {
    if (Jt400Type.DTAQ == configuration.getType()) {
        return new Jt400DataQueueProducer(this);
    } else {
        return new Jt400PgmProducer(this);
    }
}
项目:Camel    文件:FromQueueThenConsumeFtpToMockTest.java   
private void prepareFtpServer() throws Exception {
    // prepares the FTP Server by creating a file on the server that we want to unit
    // test that we can pool once
    Endpoint endpoint = context.getEndpoint(getStoreUrl());
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("Bye World");
    exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
    Producer producer = endpoint.createProducer();
    producer.start();
    producer.process(exchange);
    producer.stop();
}
项目:Camel    文件:CamelClientEndpoint.java   
public static void main(final String[] args) throws Exception {
    System.out.println("Notice this client requires that the CamelServer is already running!");

    AbstractApplicationContext context = new ClassPathXmlApplicationContext("camel-client.xml");
    CamelContext camel = context.getBean("camel-client", CamelContext.class);

    // get the endpoint from the camel context
    Endpoint endpoint = camel.getEndpoint("jms:queue:numbers");

    // create the exchange used for the communication
    // we use the in out pattern for a synchronized exchange where we expect a response
    Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
    // set the input on the in body
    // must be correct type to match the expected type of an Integer object
    exchange.getIn().setBody(11);

    // to send the exchange we need an producer to do it for us
    Producer producer = endpoint.createProducer();
    // start the producer so it can operate
    producer.start();

    // let the producer process the exchange where it does all the work in this oneline of code
    System.out.println("Invoking the multiply with 11");
    producer.process(exchange);

    // get the response from the out body and cast it to an integer
    int response = exchange.getOut().getBody(Integer.class);
    System.out.println("... the result is: " + response);

    // stopping the JMS producer has the side effect of the "ReplyTo Queue" being properly
    // closed, making this client not to try any further reads for the replies from the server
    producer.stop();

    // we're done so let's properly close the application context
    IOHelper.close(context);
}
项目:Camel    文件:FromFtpNoopIdempotentFalseTest.java   
private void prepareFtpServer() throws Exception {
    // prepares the FTP Server by creating a file on the server that we want to unit
    // test that we can pool and store as a local file
    Endpoint endpoint = context.getEndpoint(getFtpUrl());
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("Hello World");
    exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
    Producer producer = endpoint.createProducer();
    producer.start();
    producer.process(exchange);
    producer.stop();
}
项目:Camel    文件:DefaultTraceEventHandler.java   
private synchronized Producer getTraceEventProducer(Exchange exchange) throws Exception {
    if (traceEventProducer == null) {
        // create producer when we have access the the camel context (we dont in doStart)
        Endpoint endpoint = tracer.getDestination() != null ? tracer.getDestination() : exchange.getContext().getEndpoint(tracer.getDestinationUri());
        traceEventProducer = endpoint.createProducer();
        ServiceHelper.startService(traceEventProducer);
    }
    return traceEventProducer;
}
项目:Camel    文件:JmsEndpoint.java   
public Producer createProducer() throws Exception {
    Producer answer = new JmsProducer(this);
    if (isSynchronous()) {
        return new SynchronousDelegateProducer(answer);
    } else {
        return answer;
    }
}
项目:Camel    文件:FtpLoginNoRetryTest.java   
private void uploadFile(String username, String password) throws Exception {
    Endpoint endpoint = context.getEndpoint("ftp://" + username + "@localhost:" + getPort() + "/login?password=" + password + "&maximumReconnectAttempts=0");

    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("Hello World from FTPServer");
    exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt");
    Producer producer = endpoint.createProducer();
    producer.start();
    producer.process(exchange);
    producer.stop();
}
项目:Camel    文件:BoxEndpoint.java   
public Producer createProducer() throws Exception {
    // validate producer APIs
    if (getApiName() == BoxApiName.POLL_EVENTS) {
        throw new IllegalArgumentException("Producer endpoints do not support endpoint prefix "
            + BoxApiName.POLL_EVENTS.getName());
    }
    return new BoxProducer(this);
}
项目:Camel    文件:FromFtpClientConfigRefTest.java   
private void prepareFtpServer() throws Exception {
    // prepares the FTP Server by creating a file on the server
    Endpoint endpoint = context.getEndpoint(getFtpUrl());
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("Hello World");
    exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
    Producer producer = endpoint.createProducer();
    producer.start();
    producer.process(exchange);
    producer.stop();
}
项目:Camel    文件:DefaultProducerCacheTest.java   
public void testReleaseProducerInvokesStopAndShutdownByNonSingletonProducers() throws Exception {
    ProducerCache cache = new ProducerCache(this, context, 1);
    cache.start();

    assertEquals("Size should be 0", 0, cache.size());

    for (int i = 0; i < 3; i++) {
        Endpoint e = new MyEndpoint(false, i);
        Producer p = cache.acquireProducer(e);
        cache.releaseProducer(e, p);
    }

    assertEquals("Size should be 0", 0, cache.size());

    // should have stopped all 3
    assertEquals(3, stopCounter.get());

    // should have shutdown all 3
    assertEquals(3, shutdownCounter.get());

    cache.stop();

    // no more stop after stopping the cache
    assertEquals(3, stopCounter.get());

    // no more shutdown after stopping the cache
    assertEquals(3, shutdownCounter.get());
}
项目:Camel    文件:MetricsEndpoint.java   
@Override
public Producer createProducer() throws Exception {
    if (metricsType == MetricsType.COUNTER) {
        return new CounterProducer(this);
    } else if (metricsType == MetricsType.HISTOGRAM) {
        return new HistogramProducer(this);
    } else if (metricsType == MetricsType.METER) {
        return new MeterProducer(this);
    } else if (metricsType == MetricsType.TIMER) {
        return new TimerProducer(this);
    } else {
        throw new IllegalArgumentException("Metrics type " + metricsType + " is not supported");
    }
}
项目:Camel    文件:SqlEndpoint.java   
public Producer createProducer() throws Exception {
    SqlPrepareStatementStrategy prepareStrategy = getPrepareStatementStrategy() != null ? getPrepareStatementStrategy() : new DefaultSqlPrepareStatementStrategy(getSeparator());
    SqlProducer result = new SqlProducer(this, query, getJdbcTemplate(), prepareStrategy, isBatch(),
            isAlwaysPopulateStatement(), isUseMessageBodyForSql());
    result.setParametersCount(getParametersCount());
    return result;
}
项目:Camel    文件:FromFtpToFileNoFileNameHeaderTest.java   
private void prepareFtpServer() throws Exception {
    // prepares the FTP Server by creating a file on the server that we want to unit
    // test that we can pool and store as a local file
    Endpoint endpoint = context.getEndpoint(getFtpUrl());
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("Hello World from FTPServer");
    exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
    Producer producer = endpoint.createProducer();
    producer.start();
    producer.process(exchange);
    producer.stop();
}
项目:Camel    文件:SedaRouteTest.java   
public void testSedaQueue() throws Exception {
    final CountDownLatch latch = new CountDownLatch(1);

    CamelContext context = new DefaultCamelContext();

    // lets add some routes
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from("seda:test.a").to("seda:test.b");
            from("seda:test.b").process(new Processor() {
                public void process(Exchange e) {
                    log.debug("Received exchange: " + e.getIn());
                    latch.countDown();
                }
            });
        }
    });

    context.start();

    // now lets fire in a message
    Endpoint endpoint = context.getEndpoint("seda:test.a");
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setHeader("cheese", 123);

    Producer producer = endpoint.createProducer();
    producer.process(exchange);

    // now lets sleep for a while
    assertTrue(latch.await(5, TimeUnit.SECONDS));

    context.stop();
}
项目:Camel    文件:NettyEndpoint.java   
public Producer createProducer() throws Exception {
    Producer answer = new NettyProducer(this, configuration);
    if (isSynchronous()) {
        return new SynchronousDelegateProducer(answer);
    } else {
        return answer;
    }
}
项目:Camel    文件:XmlRpcEndpoint.java   
public Producer createProducer() throws Exception {
    Producer answer = new XmlRpcProducer(this);
    if (isSynchronous()) {
        return new SynchronousDelegateProducer(answer);
    } else {
        return answer;
    }
}
项目:Camel    文件:CxfEndpoint.java   
public Producer createProducer() throws Exception {
    Producer answer = new CxfProducer(this);
    if (isSynchronous()) {
        return new SynchronousDelegateProducer(answer);
    } else {
        return answer;
    }
}
项目:Camel    文件:ProducerCache.java   
protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {
    String key = endpoint.getEndpointUri();
    Producer answer = producers.get(key);
    if (pooled && answer == null) {
        // try acquire from connection pool
        answer = pool.acquire(endpoint);
    }

    if (answer == null) {
        // create a new producer
        try {
            answer = endpoint.createProducer();
            // add as service which will also start the service
            // (false => we and handling the lifecycle of the producer in this cache)
            getCamelContext().addService(answer, false);
        } catch (Exception e) {
            throw new FailedToCreateProducerException(endpoint, e);
        }

        // add producer to cache or pool if applicable
        if (pooled && answer instanceof ServicePoolAware) {
            LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer);
            answer = pool.addAndAcquire(endpoint, answer);
        } else if (answer.isSingleton()) {
            LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer);
            producers.put(key, answer);
        }
    }

    if (answer != null) {
        // record statistics
        if (extendedStatistics) {
            statistics.onHit(key);
        }
    }

    return answer;
}
项目:Camel    文件:FtpConsumerTemplateTest.java   
private void prepareFtpServer() throws Exception {
    Endpoint endpoint = context.getEndpoint(getFtpUrl());
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("Hello World");
    exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
    Producer producer = endpoint.createProducer();
    producer.start();
    producer.process(exchange);
    producer.stop();

    // assert file is created
    File file = new File(FTP_ROOT_DIR + "/template/hello.txt");
    assertTrue("The file should exist: " + file, file.exists());
}
项目:Camel    文件:CMISQueryProducerTest.java   
@Test
public void getResultCountFromHeader() throws Exception {
    Endpoint endpoint = context.getEndpoint("cmis://" + getUrl() + "?queryMode=true");
    Producer producer = endpoint.createProducer();

    Exchange exchange = createExchangeWithInBody(
            "SELECT * FROM cmis:document WHERE CONTAINS('Camel test content.')");
    producer.process(exchange);

    @SuppressWarnings("unchecked")
    List<Map<String, Object>> documents = exchange.getOut().getBody(List.class);
    assertEquals(2, documents.size());
    assertEquals(2, exchange.getOut().getHeader("CamelCMISResultCount"));
}
项目:Camel    文件:DefaultExchangeFormatterTest.java   
public void testSendExchangeWithOut() throws Exception {
    Endpoint endpoint = resolveMandatoryEndpoint("log:org.apache.camel.TEST?showAll=true&multiline=true");
    Exchange exchange = endpoint.createExchange();
    exchange.getIn().setBody("Hello World");
    exchange.getOut().setBody(22);

    Producer producer = endpoint.createProducer();
    producer.start();
    producer.process(exchange);
    producer.stop();
}
项目:Camel    文件:ProducerCache.java   
/**
 * Gets the cache evicted statistic
 * <p/>
 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
 *
 * @return the evicted
 */
public long getEvicted() {
    long evicted = -1;
    if (producers instanceof LRUCache) {
        LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
        evicted = cache.getEvicted();
    }
    return evicted;
}
项目:Camel    文件:CxfEndpointTest.java   
@Test
public void testCxfEndpointConfigurer() throws Exception {
    SimpleRegistry registry = new SimpleRegistry();
    CxfEndpointConfigurer configurer = EasyMock.createMock(CxfEndpointConfigurer.class);
    Processor processor = EasyMock.createMock(Processor.class);
    registry.put("myConfigurer", configurer);
    CamelContext camelContext = new DefaultCamelContext(registry);
    CxfComponent cxfComponent = new CxfComponent(camelContext);
    CxfEndpoint endpoint = (CxfEndpoint)cxfComponent.createEndpoint(routerEndpointURI + "&cxfEndpointConfigurer=#myConfigurer");

    configurer.configure(EasyMock.isA(AbstractWSDLBasedEndpointFactory.class));
    EasyMock.expectLastCall();
    configurer.configureServer(EasyMock.isA(Server.class));
    EasyMock.expectLastCall();
    EasyMock.replay(configurer);
    endpoint.createConsumer(processor);
    EasyMock.verify(configurer);

    EasyMock.reset(configurer);
    configurer.configure(EasyMock.isA(AbstractWSDLBasedEndpointFactory.class));
    EasyMock.expectLastCall();
    configurer.configureClient(EasyMock.isA(Client.class));
    EasyMock.expectLastCall();
    EasyMock.replay(configurer);
    Producer producer = endpoint.createProducer();
    producer.start();
    EasyMock.verify(configurer);

}