Java 类io.vertx.core.spi.cluster.AsyncMultiMap 实例源码

项目:vertx-zero    文件:FakeClusterManager.java   
@Override
public <K, V> void getAsyncMultiMap(final String name, final Handler<AsyncResult<AsyncMultiMap<K, V>>> resultHandler) {
    ConcurrentMap map = asyncMultiMaps.get(name);
    if (map == null) {
        map = new ConcurrentHashMap<>();
        final ConcurrentMap prevMap = asyncMultiMaps.putIfAbsent(name, map);
        if (prevMap != null) {
            map = prevMap;
        }
    }
    @SuppressWarnings("unchecked") final ConcurrentMap<K, ChoosableSet<V>> theMap = map;
    this.vertx.runOnContext(v -> resultHandler.handle(Future.succeededFuture(new FakeAsyncMultiMap<>(theMap))));
}
项目:vertx-infinispan    文件:InfinispanClusterManager.java   
@Override
public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> resultHandler) {
  vertx.executeBlocking(future -> {
    Cache<MultiMapKey, Object> cache = cacheManager.getCache(name);
    InfinispanAsyncMultiMap<K, V> asyncMultiMap = new InfinispanAsyncMultiMap<>(vertx, cache);
    synchronized (this) {
      multimaps.add(asyncMultiMap);
    }
    future.complete(asyncMultiMap);
  }, false, resultHandler);
}
项目:atomix-vertx    文件:AtomixClusterManager.java   
@Override
public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) {
  atomix.<K, V>consistentMultimapBuilder(name)
      .withSerializer(createSerializer())
      .withConsistency(Consistency.LINEARIZABLE)
      .withPersistence(Persistence.PERSISTENT)
      .withReplication(Replication.SYNCHRONOUS)
      .withRecovery(Recovery.RECOVER)
      .withMaxRetries(5)
      .buildAsync()
      .whenComplete(VertxFutures.convertHandler(
          handler, map -> new AtomixAsyncMultiMap<>(vertx, map.async()), vertx.getOrCreateContext()));
}
项目:vertx-jgroups    文件:JGroupsClusterManager.java   
@Override
public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) {
  logTrace(() -> String.format("Create new AsyncMultiMap [%s] on address [%s]", name, address));
  vertx.executeBlocking((future) -> {
    checkCluster();
    AsyncMultiMap<K, V> map = cacheManager.<K, V>createAsyncMultiMap(name);
    future.complete(map);
  }, handler);
}
项目:vertx-hazelcast    文件:HazelcastClusterManager.java   
/**
 * Every eventbus handler has an ID. SubsMap (subscriber map) is a MultiMap which
 * maps handler-IDs with server-IDs and thus allows the eventbus to determine where
 * to send messages.
 *
 * @param name          A unique name by which the the MultiMap can be identified within the cluster.
 *                      See the cluster config file (e.g. cluster.xml in case of HazelcastClusterManager) for
 *                      additional MultiMap config parameters.
 * @param resultHandler handler receiving the multimap
 */
@Override
public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> resultHandler) {
  vertx.executeBlocking(fut -> {
    com.hazelcast.core.MultiMap<K, V> multiMap = hazelcast.getMultiMap(name);
    HazelcastAsyncMultiMap<K, V> asyncMultiMap = new HazelcastAsyncMultiMap<>(vertx, multiMap);
    synchronized (this) {
      multimaps.add(asyncMultiMap);
    }
    fut.complete(asyncMultiMap);
  }, resultHandler);
}
项目:vertx-ignite    文件:IgniteClusterManager.java   
@Override
public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) {
  vertx.executeBlocking(
    fut -> fut.complete(new AsyncMultiMapImpl<>(this.<K, Set<V>>getCache(name), vertx)), handler
  );
}
项目:vertx-jgroups    文件:CacheManager.java   
public <K, V> AsyncMultiMap<K, V> createAsyncMultiMap(String name) {
    logDebug(() -> String.format("method createAsyncMultiMap address[%s] name[%s]", channel.getAddressAsString(), name));
    MultiMap<K, V> map = multiMapService.<K, V>multiMapCreate(name);
    return new AsyncMultiMapWrapper<>(name, map, executorService);
}
项目:vert.3x-gateway    文件:ZookeeperClusterManager.java   
/**
 * Every eventbus handler has an ID. SubsMap (subscriber map) is a MultiMap which
 * maps handler-IDs with server-IDs and thus allows the eventbus to determine where
 * to send messages.
 *
 * @param name A unique name by which the the MultiMap can be identified within the cluster.
 *             See the cluster config file (e.g. io.vertx.spi.cluster.impl.zookeeper.zookeeper.properties in case of ZookeeperClusterManager) for
 *             additional MultiMap config parameters.
 * @return subscription map
 */
@Override
public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) {
  vertx.runOnContext(event -> handler.handle(Future.succeededFuture(new ZKAsyncMultiMap<>(vertx, curator, name))));
}