Java 类io.vertx.core.WorkerExecutor 实例源码

项目:vertx-dropwizard-metrics    文件:MetricsTest.java   
@Test
public void testThreadPoolMetricsOnClose() throws Exception {
  WorkerExecutor exec = vertx.createSharedWorkerExecutor("the-executor", 10);
  assertTrue(metricsService.getMetricsSnapshot(exec).size() > 0);
  assertTrue(metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-worker-thread").size() > 0);
  assertTrue(metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-internal-blocking").size() > 0);
  exec.close();
  assertTrue(metricsService.getMetricsSnapshot(exec).size() == 0);
  assertTrue(metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-worker-thread").size() > 0);
  assertTrue(metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-internal-blocking").size() > 0);
  CountDownLatch latch = new CountDownLatch(1);
  vertx.close(ar -> latch.countDown());
  awaitLatch(latch);
  assertEquals(new JsonObject(), metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-worker-thread"));
  assertEquals(new JsonObject(), metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-internal-blocking"));
}
项目:gauravbytes    文件:VertxWorkExecutorExample.java   
public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    CountDownLatch completion = new CountDownLatch(2);

    WorkerExecutor sharedWorker = vertx.createSharedWorkerExecutor("my-shared-pool", 20);
    sharedWorker.executeBlocking(successfulBlockingTask(), responseHandler(completion));
    sharedWorker.executeBlocking(failedBlockingTask(), responseHandler(completion));
    vertx.close();
}
项目:vertx-camel-bridge    文件:OutboundEndpointTest.java   
@Test
public void testWithBlockingWithWorker() throws Exception {
  AtomicBoolean calledSpy = new AtomicBoolean();
  AtomicBoolean startedSpy = new AtomicBoolean();
  vertx.createHttpServer().requestHandler(request -> {
    calledSpy.set(true);
    request.response().end("Alright");
  }).listen(8081, ar -> {
    startedSpy.set(ar.succeeded());
  });

  await().atMost(DEFAULT_TIMEOUT).untilAtomic(startedSpy, is(true));

  camel.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
      from("direct:my-route")
        .process(exchange -> Thread.sleep(3000))
        .to("http://localhost:8081");
    }
  });

  WorkerExecutor pool = vertx.createSharedWorkerExecutor("some-fancy-name");

  bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addOutboundMapping(fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true)
      .setWorkerExecutor(pool)));

  camel.start();
  BridgeHelper.startBlocking(bridge);

  vertx.eventBus().send("camel-route", "hello");

  await().atMost(DEFAULT_TIMEOUT).untilAtomic(calledSpy, is(true));
}
项目:gauravbytes    文件:VertxWorkExecutorExample.java   
public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    CountDownLatch completion = new CountDownLatch(2);

    WorkerExecutor sharedWorker = vertx.createSharedWorkerExecutor("my-shared-pool", 20);
    sharedWorker.executeBlocking(successfulBlockingTask(), responseHandler(completion));
    sharedWorker.executeBlocking(failedBlockingTask(), responseHandler(completion));
    vertx.close();
}
项目:vertx-rx    文件:ContextScheduler.java   
public ContextScheduler(WorkerExecutor workerExecutor, boolean ordered) {
  Objects.requireNonNull(workerExecutor, "workerExecutor is null");
  this.vertx = ((WorkerExecutorInternal) workerExecutor).vertx();
  this.context = null;
  this.workerExecutor = workerExecutor;
  this.blocking = true;
  this.ordered = ordered;
}
项目:vertx-rx    文件:ContextScheduler.java   
public ContextScheduler(WorkerExecutor workerExecutor, boolean ordered) {
  Objects.requireNonNull(workerExecutor, "workerExecutor is null");
  this.vertx = ((WorkerExecutorInternal) workerExecutor).vertx();
  this.context = null;
  this.workerExecutor = workerExecutor;
  this.blocking = true;
  this.ordered = ordered;
}
项目:sfs    文件:SfsVertxImpl.java   
@Override
public WorkerExecutor createSharedWorkerExecutor(String name) {
    return vertx.createSharedWorkerExecutor(name);
}
项目:sfs    文件:SfsVertxImpl.java   
@Override
public WorkerExecutor createSharedWorkerExecutor(String name, int poolSize) {
    return vertx.createSharedWorkerExecutor(name, poolSize);
}
项目:sfs    文件:SfsVertxImpl.java   
@Override
public WorkerExecutor createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) {
    return vertx.createSharedWorkerExecutor(name, poolSize, maxExecuteTime);
}
项目:vertx-dropwizard-metrics    文件:MetricsTest.java   
@Test
public void testThreadPoolMetrics() throws Exception {

  int size = 5;
  CountDownLatch done = new CountDownLatch(6);

  WorkerExecutor exec = vertx.createSharedWorkerExecutor("the-executor", size);
  JsonObject metrics = metricsService.getMetricsSnapshot(exec);

  assertMetricType("counter", metrics.getJsonObject("queue-size"));
  assertMetricType("timer", metrics.getJsonObject("queue-delay"));
  assertMetricType("counter", metrics.getJsonObject("in-use"));
  assertMetricType("timer", metrics.getJsonObject("usage"));
  assertMetricType("gauge", metrics.getJsonObject("pool-ratio"));
  assertMetricType("gauge", metrics.getJsonObject("max-pool-size"));

  assertCount(metrics.getJsonObject("usage"), 0);
  assertCount(metrics.getJsonObject("queue-delay"), 0);
  assertCount(metrics.getJsonObject("queue-size"), 0);
  assertCount(metrics.getJsonObject("in-use"), 0);
  assertEquals(metrics.getJsonObject("pool-ratio").getDouble("value"), (Double)0D);
  assertEquals(metrics.getJsonObject("max-pool-size").getInteger("value"), (Integer)5);

  //
  CountDownLatch gate = new CountDownLatch(1);
  CountDownLatch latch = new CountDownLatch(5);
  for (int i = 0; i < size;i++) {
    exec.<Boolean>executeBlocking(fut -> {
      try {
        latch.countDown();
        fut.complete(gate.await(10, TimeUnit.SECONDS));
      } catch (InterruptedException e) {
        fut.fail(e);
      }
    }, false, ar -> {
      assertTrue(ar.succeeded());
      assertTrue(ar.result());
      vertx.runOnContext(v -> done.countDown());
    });
  }

  awaitLatch(latch);
  metrics = metricsService.getMetricsSnapshot(exec);
  assertCount(metrics.getJsonObject("usage"), 0);
  assertCount(metrics.getJsonObject("queue-delay"), 5);
  assertCount(metrics.getJsonObject("queue-size"), 0);
  assertCount(metrics.getJsonObject("in-use"), size);
  assertEquals(metrics.getJsonObject("pool-ratio").getDouble("value"), (Double)1D);

  exec.executeBlocking(Future::complete, false, ar -> vertx.runOnContext(v -> done.countDown()));
  metrics = metricsService.getMetricsSnapshot(exec);
  assertCount(metrics.getJsonObject("usage"), 0);
  assertCount(metrics.getJsonObject("queue-delay"), 5);
  assertCount(metrics.getJsonObject("queue-size"), 1);
  assertCount(metrics.getJsonObject("in-use"), size);
  assertEquals(metrics.getJsonObject("pool-ratio").getDouble("value"), (Double)1D);

  gate.countDown();
  awaitLatch(done);
  metrics = metricsService.getMetricsSnapshot(exec);
  assertCount(metrics.getJsonObject("usage"), 6);
  assertCount(metrics.getJsonObject("queue-delay"), 6);
  assertCount(metrics.getJsonObject("queue-size"), 0);
  assertCount(metrics.getJsonObject("in-use"), 0);
  assertEquals(metrics.getJsonObject("pool-ratio").getDouble("value"), (Double)0D);
}
项目:vertx-rx    文件:ContextScheduler.java   
public ContextScheduler(WorkerExecutor workerExecutor) {
  this(workerExecutor, true);
}
项目:vertx-rx    文件:ContextScheduler.java   
public ContextScheduler(WorkerExecutor workerExecutor) {
  this(workerExecutor, true);
}
项目:vertx-camel-bridge    文件:FromVertxToCamelProducer.java   
/**
 * Creates a new instance of producer.
 *
 * @param vertx    the vert.x instance
 * @param producer the underlying producer, must not be {@code null}
 * @param outbound the outbound configuration, must not be {@code null}
 * @param blocking whether or not the processing is blocking and so should not be run on the event
 *                 loop
 * @param pool     the pool on which the blocking code is going to be executed
 */
public FromVertxToCamelProducer(Vertx vertx, Producer producer, OutboundMapping outbound, boolean blocking,
                                WorkerExecutor  pool) {
  this.endpoint = producer.getEndpoint();
  this.producer = AsyncProcessorConverterHelper.convert(producer);
  this.outbound = outbound;
  this.blocking = blocking;
  this.vertx = vertx;
  this.pool = pool;
}
项目:vertx-camel-bridge    文件:OutboundMapping.java   
/**
 * @return the worker thread worker to use to execute the processing. This option is only used if blocking is set to
 * {@code true}. If not set, it uses the the default worker worker.
 */
public WorkerExecutor getWorkerExecutor() {
  return worker;
}
项目:vertx-camel-bridge    文件:OutboundMapping.java   
/**
 * Sets the worker thread worker used to execute the blocking processing. This option is only used if blocking is set to
 * {@code true}. If not set, it uses the the default worker worker.
 *
 * @param pool the worker worker on which the code is executed
 * @return the current instance of {@link OutboundMapping}
 */
public OutboundMapping setWorkerExecutor(WorkerExecutor pool) {
  this.worker = pool;
  return this;
}
项目:vertx-rx    文件:RxHelper.java   
/**
 * Create a scheduler for a {@link io.vertx.core.WorkerExecutor} object, actions are executed on the threads of this executor.
 *
 * @param executor the worker executor object
 * @return the scheduler
 */
public static Scheduler blockingScheduler(WorkerExecutor executor) {
  return new ContextScheduler(executor, false);
}