Java 类io.vertx.core.CompositeFuture 实例源码

项目:vxrifa    文件:ReceiverGenerator.java   
ReceiverGenerator generateUnregisterMethod() {

    MethodSpec.Builder unregisterMB = MethodSpec.methodBuilder("unregisterReceiver");

    // Generates cosumers waiting Future for success handler unregistration
    unregisterMB.addAnnotation(Override.class)
            .addModifiers(Modifier.PUBLIC)
            .addStatement("return $T.all($N.stream().map((consumer) -> {"
                    + "$T future = $T.future();"
                    + "consumer.unregister(future);"
                    + "return future;"                       
                    + "}).collect($T.toList()))",
                    CompositeFuture.class,
                    consumersField,
                    ParameterizedTypeName.get(ClassName.get(Future.class), TypeName.get(Void.class)),
                    TypeName.get(Future.class),
                    TypeName.get(Collectors.class)
            )
            .returns(ParameterizedTypeName.get(ClassName.get(Future.class), WildcardTypeName.subtypeOf(Object.class)));

    tsb.addMethod(unregisterMB.build());

    return this;

}
项目:ethereum-ingest    文件:Importing.java   
private void setupCompletionListeners() {
    List<Future> futures = new ArrayList<>();
    if (config.isTxImport()) {
        futures.add(txImport);
    }
    if (config.isBlockImport()) {
        futures.add(blockImport);
    }
    CompositeFuture.all(futures).setHandler(done -> {
        if (done.succeeded()) {
            finishWithSuccess();
        } else {
            cancelImport(null);
            Form.showAlertFromError(done.cause());
            Async.setScene(SETTINGS_FXML);
        }
    });
}
项目:introduction-to-vert.x    文件:MyFirstVerticle.java   
private Future<SQLConnection> createSomeDataIfNone(SQLConnection connection) {
    Future<SQLConnection> future = Future.future();
    connection.query("SELECT * FROM Articles", select -> {
        if (select.failed()) {
            future.fail(select.cause());
        } else {
            if (select.result().getResults().isEmpty()) {
                Article article1 = new Article("Fallacies of distributed computing",
                    "https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing");
                Article article2 = new Article("Reactive Manifesto",
                    "https://www.reactivemanifesto.org/");
                Future<Article> insertion1 = insert(connection, article1, false);
                Future<Article> insertion2 = insert(connection, article2, false);
                CompositeFuture.all(insertion1, insertion2)
                    .setHandler(r -> future.handle(r.map(connection)));
            } else {
                future.complete(connection);
            }
        }
    });
    return future;
}
项目:vertx-jooq-async    文件:VertxSomethingDaoTest.java   
@Test
public void fetchOneByConditionWithMultipleMatchesShouldFail() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    Future<Integer> insertFuture1 = Future.future();
    Future<Integer> insertFuture2 = Future.future();

    Something someNewObject = createSomething();
    dao.insertReturningPrimaryAsync(someNewObject,insertFuture1);
    dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(someNewObject.getSomehugenumber()),insertFuture2);
    CompositeFuture.all(insertFuture1,insertFuture2).
            setHandler(consumeOrFailHandler(v->{
                dao.fetchOneAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(someNewObject.getSomehugenumber()),h->{
                    Assert.assertNotNull(h.cause());
                    //cursor fetched more than one row
                    Assert.assertEquals(TooManyRowsException.class, h.cause().getClass());
                    dao.deleteExecAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(someNewObject.getSomehugenumber()),countdownLatchHandler(latch));
                });
            }));
    await(latch);
}
项目:vertx-jooq-async    文件:VertxSomethingDaoTest.java   
@Test
public void fetchByConditionWithMultipleMatchesShouldSucceed() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    Future<Integer> insertFuture1 = Future.future();
    Future<Integer> insertFuture2 = Future.future();

    Something someNewObject = createSomething();
    dao.insertReturningPrimaryAsync(someNewObject,insertFuture1);
    dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(someNewObject.getSomehugenumber()),insertFuture2);
    CompositeFuture.all(insertFuture1, insertFuture2).
            setHandler(consumeOrFailHandler(v -> {
                dao.fetchAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(someNewObject.getSomehugenumber()), h -> {
                    Assert.assertNotNull(h.result());
                    //cursor fetched more than one row
                    Assert.assertEquals(2, h.result().size());
                    dao.deleteExecAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(someNewObject.getSomehugenumber()), countdownLatchHandler(latch));
                });
            }));
    await(latch);
}
项目:vertx-forge    文件:ProjectGeneratorService.java   
public void generate(Message<JsonObject> message) {
    JsonObject metadata = message.body();
    String build = metadata.getString("build", "maven");
    String language = metadata.getString("language", "java");
    //Act as a activation flags in .gitignore
    metadata.put(build, true);
    metadata.put(language, true);
    String baseDir = metadata.getString("baseDir");
    CompositeFuture.all(
        generateFile(metadata, baseDir, BUILD.get(build)),
        generateFile(metadata, baseDir, LANGUAGES.get(language)),
        generateFile(metadata, baseDir, ".gitignore"),
        generateFile(metadata, baseDir, ".editorconfig")
    ).setHandler(ar -> {
        if (ar.failed()) {
            log.error("Impossible to generate project {} : {}", metadata, ar.cause().getMessage());
            message.fail(500, ar.cause().getMessage());
        } else {
            message.reply(null);
        }
    });
}
项目:chili-core    文件:Service.java   
private void startInstances(Future<Void> future, RealmContext context) {
    List<Future> futures = new ArrayList<>();
    for (InstanceSettings instance : context.instances()) {
        Future deploy = Future.future();
        futures.add(deploy);

        context.handler(() -> new InstanceHandler(new InstanceContext(context, instance))).setHandler((done) -> {
            if (done.succeeded()) {
                deploy.complete();
            } else {
                context.onInstanceFailed(instance.getName(), done.cause());
                deploy.fail(done.cause());
            }
        });
    }
    CompositeFuture.all(futures).setHandler(done -> {
        if (done.succeeded()) {
            future.complete();
        } else {
            future.fail(done.cause());
        }
    });
}
项目:vertx-circuit-breaker    文件:CircuitBreakerMetricsTest.java   
@Test
public void testWithFailedCommands(TestContext tc) {
  breaker = CircuitBreaker.create("some-circuit-breaker", vertx);
  Async async = tc.async();

  Future<Void> command1 = breaker.execute(commandThatFails());
  Future<Void> command2 = breaker.execute(commandThatWorks());
  Future<Void> command3 = breaker.execute(commandThatWorks());
  Future<Void> command4 = breaker.execute(commandThatFails());

  CompositeFuture.join(command1, command2, command3, command4)
    .setHandler(ar -> {
      assertThat(metrics())
        .contains("name", "some-circuit-breaker")
        .contains("state", CircuitBreakerState.CLOSED.name())
        .contains("totalErrorCount", 2) // Failure + Timeout + Exception
        .contains("totalSuccessCount", 2)
        .contains("totalTimeoutCount", 0)
        .contains("totalExceptionCount", 0)
        .contains("totalFailureCount", 2)
        .contains("totalOperationCount", 4)
        .contains("totalSuccessPercentage", 50)
        .contains("totalErrorPercentage", 50);
      async.complete();
    });
}
项目:vertx-circuit-breaker    文件:CircuitBreakerMetricsTest.java   
@Test
public void testEviction(TestContext tc) {
  breaker = CircuitBreaker.create("some-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMetricsRollingWindow(10));
  Async async = tc.async();


  int count = 1000;

  List<Future> list = new ArrayList<>();
  for (int i = 0; i < count; i++) {
    list.add(breaker.execute(commandThatWorks()));
  }

  CompositeFuture.all(list)
    .setHandler(ar -> {
      assertThat(ar).succeeded();
      assertThat(metrics().getInteger("totalOperationCount")).isEqualTo(1000);
      assertThat(metrics().getInteger("rollingOperationCount")).isLessThan(1000);
      async.complete();
    });
}
项目:direwolves    文件:ApiPluginCmd.java   
private void doSubCmd(ApiDiscovery discovery, List<ApiDefinition> definitions,
                          ApiSubCmd subCmd,
                          JsonObject jsonObject, Future<JsonObject> complete) {
  List<Future> futures = new ArrayList<>();
  for (ApiDefinition definition : definitions) {
    Future<Void> future = Future.future();
    futures.add(future);
    subCmd.handle(definition, jsonObject);
    discovery.publish(definition, ar -> {
      if (ar.succeeded()) {
        future.complete();
      } else {
        future.fail(ar.cause());
      }
    });
  }
  CompositeFuture.all(futures)
          .setHandler(ar -> {
            if (ar.succeeded()) {
              complete.complete(succeedResult());
            } else {
              complete.fail(ar.cause());
            }
          });
}
项目:direwolves    文件:DeleteApiCmd.java   
private void deleteByName(ApiDiscovery discovery, List<String> names,
                          Future<JsonObject> complete) {
  List<Future> futures = new ArrayList<>();
  for (String name : names) {
    Future<Void> future = Future.future();
    futures.add(future);
    discovery.unpublish(name, ar -> {
      if (ar.succeeded()) {
        future.complete();
      } else {
        future.fail(ar.cause());
      }
    });
  }
  CompositeFuture.all(futures)
          .setHandler(ar -> {
            if (ar.succeeded()) {
              complete.complete(succeedResult());
            } else {
              complete.fail(ar.cause());
            }
          });
}
项目:direwolves    文件:ZookeeperServiceImporter.java   
private synchronized void unregisterAllServices(Future<Void> completed) {
  List<Future> list = new ArrayList<>();

  imports.forEach(svc -> {
    Future<Void> unreg = Future.future();
    svc.unregister(publisher, unreg);
    list.add(unreg);
  });

  CompositeFuture.all(list).setHandler(x -> {
    if (x.failed()) {
      completed.fail(x.cause());
    } else {
      completed.complete();
    }
  });
}
项目:okapi    文件:DiscoveryManager.java   
public void restartModules(Handler<ExtendedAsyncResult<Void>> fut) {
  deploymentStore.getAll(res1 -> {
    if (res1.failed()) {
      fut.handle(new Failure<>(res1.getType(), res1.cause()));
    } else {
      List<Future> futures = new LinkedList<>();
      for (DeploymentDescriptor dd : res1.result()) {
        Future<DeploymentDescriptor> f = Future.future();
        addAndDeploy1(dd, f::handle);
        futures.add(f);
      }
      CompositeFuture.all(futures).setHandler(res2 -> {
        if (res2.failed()) {
          fut.handle(new Failure<>(INTERNAL, res2.cause()));
        } else {
          fut.handle(new Success<>());
        }
      });
    }
  });
}
项目:okapi    文件:LockedStringMap.java   
private void getKeys2(String val, Handler<ExtendedAsyncResult<Collection<String>>> fut) {
  Collection<String> result = new TreeSet<>();
  if (val == null || val.isEmpty()) {
    fut.handle(new Success<>(result));
  } else {
    KeyList keys = Json.decodeValue(val, KeyList.class);

    List<Future> futures = new LinkedList<>();
    for (String k : keys.keys) {
      Future<Void> f = Future.future();
      list.get(k, res -> {
        if (res.failed()) {
          f.handle(Future.failedFuture(res.cause()));
        } else {
          String v = res.result();
          if (v != null) {
            result.add(k);
          }
          f.handle(Future.succeededFuture());
        }
      });
      futures.add(f);
    }
    CompositeFuture.all(futures).setHandler(res -> {
      if (res.failed()) {
        fut.handle(new Failure<>(INTERNAL, res.cause()));
      } else {
        fut.handle(new Success<>(result));
      }
    });
  }
}
项目:vertx-service-discovery    文件:ZookeeperServiceImporter.java   
private synchronized void unregisterAllServices(Future<Void> done) {
  List<Future> list = new ArrayList<>();

  new HashSet<>(registrations).forEach(reg -> {
    Future<Void> unreg = Future.future();
    publisher.unpublish(reg.record().getRegistration(), unreg.completer());
  });
  registrations.clear();

  CompositeFuture.all(list).setHandler(x -> {
    if (x.failed()) {
      done.fail(x.cause());
    } else {
      done.complete();
    }
  });
}
项目:vertx-service-discovery    文件:DockerLinksServiceImporter.java   
@Override
public void close(Handler<Void> completionHandler) {
  List<Future> list = new ArrayList<>();
  for (Record record : records) {
    publisher.unpublish(record.getRegistration(),
        v -> list.add(v.succeeded() ? Future.succeededFuture() : Future.failedFuture(v.cause())));
  }

  CompositeFuture.all(list).setHandler(ar -> {
        if (ar.succeeded()) {
          LOGGER.info("Successfully closed the service importer " + this);
        } else {
          LOGGER.error("A failure has been caught while stopping " + this, ar.cause());
        }
        if (completionHandler != null) {
          completionHandler.handle(null);
        }
      }
  );
}
项目:scales    文件:LauncherVerticle.java   
@Override
public void start(Future<Void> startFuture) throws Exception {

    DeploymentOptionsParser.parseVerticleDeploymentOptionsJsonFile();

    // We can safely deploy Command, EventStore, and Query in parallel.
    // Facade must be the last one.

    Future<Void> startEventStoreFuture = startVerticle(this.vertx, Verticle.EventStore);
    Future<Void> startCommandFuture = startVerticle(this.vertx, Verticle.Command);
    Future<Void> startQueryFuture = startVerticle(this.vertx, Verticle.Query);

    CompositeFuture.all(startCommandFuture, startEventStoreFuture, startQueryFuture).setHandler(ar -> {
        if (ar.succeeded()) {
            startFuture.setHandler(startVerticle(this.vertx, Verticle.Facade).completer());
        } else {
            startFuture.fail(ar.cause());
        }}
    );
}
项目:DAVe    文件:MainVerticle.java   
private void closeAllDeployments() {
    LOG.info("Undeploying verticles");

    List<Future> futures = new LinkedList<>();
    this.verticleDeployments.forEach((verticleName, deploymentID) -> {
        if (deploymentID != null && vertx.deploymentIDs().contains(deploymentID)) {
            LOG.info("Undeploying {} with ID: {}", verticleName, deploymentID);
            Future<Void> future = Future.future();
            vertx.undeploy(deploymentID, future.completer());
            futures.add(future);
        }
    });

    CompositeFuture.all(futures).setHandler(ar -> {
        if (ar.succeeded()) {
            LOG.info("Undeployed all verticles");
        } else {
            LOG.error("Failed to undeploy some verticles", ar.cause());
        }
    });
}
项目:hono    文件:HonoSenderBase.java   
/**
 * Get both Hono clients and connect them to Hono's microservices.
 *
 * @return The result of the creation and connection of the Hono clients.
 */
private Future<Void> getHonoClients() {
    // we need two clients to get it working, define futures for them
    final Future<RegistrationClient> registrationClientTracker = getRegistrationClient();
    final Future<MessageSender> messageSenderTracker = getMessageSender();

    final Future<Void> result = Future.future();

    CompositeFuture.all(registrationClientTracker, messageSenderTracker).setHandler(s -> {
        if (result.failed()) {
            System.err.println(
                    "hono clients could not be created : " + s.cause().getMessage());
            result.fail(s.cause());
        } else {
            registrationClient = registrationClientTracker.result();
            messageSender = messageSenderTracker.result();
            result.complete();
        }
    });

    return result;
}
项目:hono    文件:Application.java   
@Override
protected final Future<Void> deployRequiredVerticles(int maxInstances) {

    Future<Void> result = Future.future();
    CompositeFuture.all(
            deployAuthenticationService(), // we only need 1 authentication service
            deployRegistrationService(),
            deployCredentialsService()).setHandler(ar -> {
        if (ar.succeeded()) {
            result.complete();
        } else {
            result.fail(ar.cause());
        }
    });
    return result;
}
项目:hono    文件:CredentialsRestServerTest.java   
/**
 * Deploys the server to vert.x.
 * 
 * @param context The vert.x test context.
 */
@BeforeClass
public static void setUp(final TestContext context) {

    final Future<String> restServerDeploymentTracker = Future.future();
    final Future<String> credentialsServiceDeploymentTracker = Future.future();

    credentialsService = new FileBasedCredentialsService();
    credentialsService.setConfig(new FileBasedCredentialsConfigProperties());
    vertx.deployVerticle(credentialsService, credentialsServiceDeploymentTracker.completer());

    final ServiceConfigProperties restServerProps = new ServiceConfigProperties();
    restServerProps.setInsecurePortEnabled(true);
    restServerProps.setInsecurePort(0);
    restServerProps.setInsecurePortBindAddress(HOST);

    deviceRegistryRestServer = new DeviceRegistryRestServer();
    deviceRegistryRestServer.addEndpoint(new CredentialsHttpEndpoint(vertx));
    deviceRegistryRestServer.setConfig(restServerProps);
    vertx.deployVerticle(deviceRegistryRestServer, restServerDeploymentTracker.completer());

    CompositeFuture.all(restServerDeploymentTracker, credentialsServiceDeploymentTracker)
            .setHandler(context.asyncAssertSuccess());

}
项目:hono    文件:CredentialsRestServerTest.java   
private static Future<Integer> addMultipleCredentials(final List<JsonObject> credentialsList) {

        final Future<Integer> result = Future.future();
        @SuppressWarnings("rawtypes")
        final List<Future> addTrackers = new ArrayList<>();
        for (JsonObject creds : credentialsList) {
            addTrackers.add(addCredentials(creds));
        }

        CompositeFuture.all(addTrackers).setHandler(r -> {
            if (r.succeeded()) {
                result.complete(HttpURLConnection.HTTP_CREATED);
            } else {
                result.fail(r.cause());
            }
        });
        return result;
    }
项目:hono    文件:AbstractVertxBasedMqttProtocolAdapter.java   
@Override
public void doStop(final Future<Void> stopFuture) {

    Future<Void> serverTracker = Future.future();
    if (this.server != null) {
        this.server.close(serverTracker.completer());
    } else {
        serverTracker.complete();
    }

    Future<Void> insecureServerTracker = Future.future();
    if (this.insecureServer != null) {
        this.insecureServer.close(insecureServerTracker.completer());
    } else {
        insecureServerTracker.complete();
    }

    CompositeFuture.all(serverTracker, insecureServerTracker)
        .compose(d -> stopFuture.complete(), stopFuture);
}
项目:hono    文件:AbstractVertxBasedHttpProtocolAdapter.java   
@Override
public final void doStart(final Future<Void> startFuture) {

    checkPortConfiguration()
        .compose(s -> preStartup())
        .compose(s -> {
            Router router = createRouter();
            if (router == null) {
                return Future.failedFuture("no router configured");
            } else {
                addRoutes(router);
                return CompositeFuture.all(bindSecureHttpServer(router), bindInsecureHttpServer(router));
            }
        })
        .compose(s -> {
            try {
                onStartupSuccess();
                startFuture.complete();
            } catch (Exception e) {
                LOG.error("error in onStartupSuccess", e);
                startFuture.fail(e);
            }
        }, startFuture);
}
项目:hono    文件:HttpServiceBase.java   
private Future<Router> startEndpoints() {

        final Future<Router> startFuture = Future.future();
        final Router router = createRouter();
        if (router == null) {
            startFuture.fail("no router configured");
        } else {
            addEndpointRoutes(router);
            addCustomRoutes(router);
            @SuppressWarnings("rawtypes")
            List<Future> endpointFutures = new ArrayList<>(endpoints.size());
            for (HttpEndpoint ep : endpoints) {
                LOG.info("starting endpoint [name: {}, class: {}]", ep.getName(), ep.getClass().getName());
                endpointFutures.add(ep.start());
            }
            CompositeFuture.all(endpointFutures).setHandler(startup -> {
                if (startup.succeeded()) {
                    startFuture.complete(router);
                } else {
                    startFuture.fail(startup.cause());
                }
            });
        }
        return startFuture;
    }
项目:hono    文件:HttpServiceBase.java   
private Future<Void> stopEndpoints() {

        final Future<Void> stopFuture = Future.future();
        @SuppressWarnings("rawtypes")
        List<Future> endpointFutures = new ArrayList<>(endpoints.size());
        for (HttpEndpoint ep : endpoints) {
            LOG.info("stopping endpoint [name: {}, class: {}]", ep.getName(), ep.getClass().getName());
            endpointFutures.add(ep.stop());
        }
        CompositeFuture.all(endpointFutures).setHandler(shutdown -> {
            if (shutdown.succeeded()) {
                stopFuture.complete();
            } else {
                stopFuture.fail(shutdown.cause());
            }
        });
        return stopFuture;

    }
项目:hono    文件:AbstractProtocolAdapterBase.java   
private CompositeFuture closeServiceClients() {

        Future<Void> messagingTracker = Future.future();
        if (messaging == null) {
            messagingTracker.complete();
        } else {
            messaging.shutdown(messagingTracker.completer());
        }

        Future<Void> registrationTracker = Future.future();
        if (registration == null) {
            registrationTracker.complete();
        } else {
            registration.shutdown(registrationTracker.completer());
        }

        Future<Void> credentialsTracker = Future.future();
        if (credentialsAuthProvider == null) {
            credentialsTracker.complete();
        } else {
            credentialsTracker = credentialsAuthProvider.stop();
        }
        return CompositeFuture.all(messagingTracker, registrationTracker, credentialsTracker);
    }
项目:hono    文件:AmqpServiceBase.java   
private Future<Void> startEndpoints() {

        @SuppressWarnings("rawtypes")
        List<Future> endpointFutures = new ArrayList<>(endpoints.size());
        for (AmqpEndpoint ep : endpoints.values()) {
            LOG.info("starting endpoint [name: {}, class: {}]", ep.getName(), ep.getClass().getName());
            endpointFutures.add(ep.start());
        }
        final Future<Void> startFuture = Future.future();
        CompositeFuture.all(endpointFutures).setHandler(startup -> {
            if (startup.succeeded()) {
                startFuture.complete();
            } else {
                startFuture.fail(startup.cause());
            }
        });
        return startFuture;
    }
项目:hono    文件:AmqpServiceBase.java   
private Future<Void> stopEndpoints() {

        @SuppressWarnings("rawtypes")
        List<Future> endpointFutures = new ArrayList<>(endpoints.size());
        for (AmqpEndpoint ep : endpoints.values()) {
            LOG.info("stopping endpoint [name: {}, class: {}]", ep.getName(), ep.getClass().getName());
            endpointFutures.add(ep.stop());
        }
        final Future<Void> stopFuture = Future.future();
        CompositeFuture.all(endpointFutures).setHandler(shutdown -> {
            if (shutdown.succeeded()) {
                stopFuture.complete();
            } else {
                stopFuture.fail(shutdown.cause());
            }
        });
        return stopFuture;
    }
项目:vertx-dataloader    文件:DataLoader.java   
/**
 * Requests to load the data with the specified key asynchronously, and returns a future of the resulting value.
 * <p>
 * If batching is enabled (the default), you'll have to call {@link DataLoader#dispatch()} at a later stage to
 * start batch execution. If you forget this call the future will never be completed (unless already completed,
 * and returned from cache).
 *
 * @param key the key to load
 * @return the future of the value
 */
public Future<V> load(K key) {
    Objects.requireNonNull(key, "Key cannot be null");
    Object cacheKey = getCacheKey(key);
    if (loaderOptions.cachingEnabled() && futureCache.containsKey(cacheKey)) {
        return futureCache.get(cacheKey);
    }

    Future<V> future = Future.future();
    if (loaderOptions.batchingEnabled()) {
        loaderQueue.put(key, future);
    } else {
        CompositeFuture compositeFuture = batchLoadFunction.load(Collections.singleton(key));
        if (compositeFuture.succeeded()) {
            future.complete(compositeFuture.result().resultAt(0));
        } else {
            future.fail(compositeFuture.cause());
        }
    }
    if (loaderOptions.cachingEnabled()) {
        futureCache.set(cacheKey, future);
    }
    return future;
}
项目:vertx-dataloader    文件:DataLoader.java   
/**
 * Dispatches the queued load requests to the batch execution function and returns a composite future of the result.
 * <p>
 * If batching is disabled, or there are no queued requests, then a succeeded composite future is returned.
 *
 * @return the composite future of the queued load requests
 */
public CompositeFuture dispatch() {
    if (!loaderOptions.batchingEnabled() || loaderQueue.size() == 0) {
        return CompositeFuture.join(Collections.emptyList());
    }
    CompositeFuture batch = batchLoadFunction.load(loaderQueue.keySet());
    dispatchedQueues.put(batch, new LinkedHashMap<>(loaderQueue));
    batch.setHandler(rh -> {
        AtomicInteger index = new AtomicInteger(0);
        dispatchedQueues.get(batch).forEach((key, future) -> {
            if (batch.succeeded(index.get())) {
                future.complete(batch.resultAt(index.get()));
            } else {
                future.fail(batch.cause(index.get()));
            }
            index.incrementAndGet();
        });
        dispatchedQueues.remove(batch);
    });
    loaderQueue.clear();
    return batch;
}
项目:vertx-dataloader    文件:DataLoaderTest.java   
@Test
public void should_Build_a_really_really_simple_data_loader() {
    AtomicBoolean success = new AtomicBoolean();
    DataLoader<Integer, Integer> identityLoader = new DataLoader<>(keys ->
        CompositeFuture.join(keys.stream()
                .map(Future::succeededFuture)
                .collect(Collectors.toCollection(ArrayList::new))));

    Future<Integer> future1 = identityLoader.load(1);
    future1.setHandler(rh -> {
        assertThat(rh.result(), equalTo(1));
        success.set(rh.succeeded());
    });
    identityLoader.dispatch();
    await().untilAtomic(success, is(true));
}
项目:vertx-dataloader    文件:DataLoaderTest.java   
@Test
public void should_Support_loading_multiple_keys_in_one_call() {
    AtomicBoolean success = new AtomicBoolean();
    DataLoader<Integer, Integer> identityLoader = new DataLoader<>(keys ->
            CompositeFuture.join(keys.stream()
                    .map(Future::succeededFuture)
                    .collect(Collectors.toCollection(ArrayList::new))));

    CompositeFuture futureAll = identityLoader.loadMany(asList(1, 2));
    futureAll.setHandler(rh -> {
        assertThat(rh.result().size(), is(2));
        success.set(rh.succeeded());
    });
    identityLoader.dispatch();
    await().untilAtomic(success, is(true));
    assertThat(futureAll.list(), equalTo(asList(1, 2)));
}
项目:vertx-dataloader    文件:DataLoaderTest.java   
@Test
public void should_Cache_on_redispatch() {
  ArrayList<Collection> loadCalls = new ArrayList<>();
  DataLoader<String, String> identityLoader = idLoader(new DataLoaderOptions(), loadCalls);

  Future<String> future1 = identityLoader.load("A");
  identityLoader.dispatch();

  CompositeFuture future2 = identityLoader.loadMany(asList("A", "B"));
  identityLoader.dispatch();

  await().until(() -> future1.isComplete() && future2.isComplete());
  assertThat(future1.result(), equalTo("A"));
  assertThat(future2.list(), equalTo(asList("A", "B")));
  assertThat(loadCalls, equalTo(asList(asList("A"), asList("B"))));
}
项目:vertx-dataloader    文件:DataLoaderTest.java   
@Test
public void should_Represent_failures_and_successes_simultaneously() {
    AtomicBoolean success = new AtomicBoolean();
    ArrayList<Collection> loadCalls = new ArrayList<>();
    DataLoader<Integer, Integer> evenLoader = idLoaderWithErrors(new DataLoaderOptions(), loadCalls);

    Future<Integer> future1 = evenLoader.load(1);
    Future<Integer> future2 = evenLoader.load(2);
    Future<Integer> future3 = evenLoader.load(3);
    Future<Integer> future4 = evenLoader.load(4);
    CompositeFuture result = evenLoader.dispatch();
    result.setHandler(rh -> success.set(true));

    await().untilAtomic(success, is(true));
    assertThat(future1.failed(), is(true));
    assertThat(future1.cause(), instanceOf(IllegalStateException.class));
    assertThat(future2.result(), equalTo(2));
    assertThat(future3.failed(), is(true));
    assertThat(future4.result(), equalTo(4));

    assertThat(loadCalls, equalTo(Collections.singletonList(asList(1, 2, 3, 4))));
}
项目:vertx-dataloader    文件:DataLoaderTest.java   
@Test
public void should_Batch_loads_occurring_within_futures() {
    ArrayList<Collection> loadCalls = new ArrayList<>();
    DataLoader<String, String> identityLoader = idLoader(DataLoaderOptions.create(), loadCalls);

    Future.<String>future().setHandler(rh -> {
        identityLoader.load("a");
        Future.future().setHandler(rh2 -> {
            identityLoader.load("b");
            Future.future().setHandler(rh3 -> {
                identityLoader.load("c");
                Future.future().setHandler(rh4 ->
                        identityLoader.load("d")).complete();
            }).complete();
        }).complete();
    }).complete();
    CompositeFuture composite = identityLoader.dispatch();

    await().until((Callable<Boolean>) composite::isComplete);
    assertThat(loadCalls, equalTo(
            Collections.singletonList(asList("a", "b", "c", "d"))));
}
项目:vertx-blueprint-microservice    文件:BaseMicroserviceVerticle.java   
@Override
public void stop(Future<Void> future) throws Exception {
  // In current design, the publisher is responsible for removing the service
  List<Future> futures = new ArrayList<>();
  registeredRecords.forEach(record -> {
    Future<Void> cleanupFuture = Future.future();
    futures.add(cleanupFuture);
    discovery.unpublish(record.getRegistration(), cleanupFuture.completer());
  });

  if (futures.isEmpty()) {
    discovery.close();
    future.complete();
  } else {
    CompositeFuture.all(futures)
      .setHandler(ar -> {
        discovery.close();
        if (ar.failed()) {
          future.fail(ar.cause());
        } else {
          future.complete();
        }
      });
  }
}
项目:enmasse    文件:Main.java   
private void deployVerticles(Future<Void> startPromise, Deployment ... deployments) {
    List<Future> futures = new ArrayList<>();
    for (Deployment deployment : deployments) {
        Future<Void> promise = Future.future();
        futures.add(promise);
        vertx.deployVerticle(deployment.verticle, deployment.options, result -> {
            if (result.succeeded()) {
                promise.complete();
            } else {
                promise.fail(result.cause());
            }
        });
    }

    CompositeFuture.all(futures).setHandler(result -> {
        if (result.succeeded()) {
            startPromise.complete();
        } else {
            startPromise.fail(result.cause());
        }
    });
}
项目:vertx-jooq    文件:VertxSomethingDaoTest.java   
@Test
public void fetchOneByConditionWithMultipleMatchesShouldFail() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    Future<Integer> insertFuture1 = Future.future();
    Future<Integer> insertFuture2 = Future.future();
    dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(1L),insertFuture1);
    dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(1L),insertFuture2);
    CompositeFuture.all(insertFuture1,insertFuture2).
            setHandler(consumeOrFailHandler(v->{
                dao.fetchOneAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(1L),h->{
                    Assert.assertNotNull(h.cause());
                    //cursor fetched more than one row
                    Assert.assertEquals(TooManyRowsException.class, h.cause().getClass());
                    dao.deleteExecAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(1L),countdownLatchHandler(latch));
                });
            }));
    await(latch);
}
项目:vertx-jooq    文件:VertxSomethingDaoTest.java   
@Test
public void fetchByConditionWithMultipleMatchesShouldSucceed() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    Future<Integer> insertFuture1 = Future.future();
    Future<Integer> insertFuture2 = Future.future();
    dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(1L),insertFuture1);
    dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(1L),insertFuture2);
    CompositeFuture.all(insertFuture1, insertFuture2).
            setHandler(consumeOrFailHandler(v -> {
                dao.fetchAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(1L), h -> {
                    Assert.assertNotNull(h.result());
                    //cursor fetched more than one row
                    Assert.assertEquals(2, h.result().size());
                    dao.deleteExecAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(1L), countdownLatchHandler(latch));
                });
            }));
    await(latch);
}