Java 类org.apache.zookeeper.AsyncCallback 实例源码

项目:https-github.com-apache-zookeeper    文件:CreateTTLTest.java   
@Test
public void testCreateAsync()
        throws IOException, KeeperException, InterruptedException {
    AsyncCallback.Create2Callback callback = new AsyncCallback.Create2Callback() {
        @Override
        public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
            // NOP
        }
    };
    zk.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_WITH_TTL, callback, null, 100);

    final AtomicLong fakeElapsed = new AtomicLong(0);
    ContainerManager containerManager = newContainerManager(fakeElapsed);
    containerManager.checkContainers();
    Assert.assertNotNull("Ttl node should not have been deleted yet", zk.exists("/foo", false));

    fakeElapsed.set(1000);
    containerManager.checkContainers();
    Assert.assertNull("Ttl node should have been deleted", zk.exists("/foo", false));
}
项目:ditb    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
private boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final ServerName destination) {
  if (!state.isClosed() && !state.isOffline()) {
    this.server.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
      destination, cb, state);
  } catch (KeeperException e) {
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else {
      server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:distributedlog    文件:ZKSubscriptionsStore.java   
@Override
public Future<Map<String, DLSN>> getLastCommitPositions() {
    final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>();
    try {
        this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() {
            @Override
            public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
                if (KeeperException.Code.NONODE.intValue() == rc) {
                    result.setValue(new HashMap<String, DLSN>());
                } else if (KeeperException.Code.OK.intValue() != rc) {
                    result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
                } else {
                    getLastCommitPositions(result, children);
                }
            }
        }, null);
    } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
        result.setException(zkce);
    } catch (InterruptedException ie) {
        result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie));
    }
    return result;
}
项目:distributedlog    文件:Utils.java   
/**
 * Asynchronously create zookeeper path recursively and optimistically
 *
 * @param zkc Zookeeper client
 * @param pathToCreate  Zookeeper full path
 * @param parentPathShouldNotCreate zookeeper parent path should not be created
 * @param data Zookeeper data
 * @param acl Acl of the zk path
 * @param createMode Create mode of zk path
 */
public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic(
    final ZooKeeperClient zkc,
    final String pathToCreate,
    final Optional<String> parentPathShouldNotCreate,
    final byte[] data,
    final List<ACL> acl,
    final CreateMode createMode) {
    final Promise<BoxedUnit> result = new Promise<BoxedUnit>();

    zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
            data, acl, createMode, new AsyncCallback.StringCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            handleKeeperExceptionCode(rc, path, result);
        }
    }, result);

    return result;
}
项目:LCIndex-HBase-0.94.16    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final Object ctx) {
  if (!state.isClosed() && !state.isOffline()) {
      new RuntimeException("Unexpected state trying to OFFLINE; " + state);
    this.master.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  state.update(RegionState.State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
      this.master.getServerName(), cb, ctx);
  } catch (KeeperException e) {
    // TODO: this error handling will never execute, as the callback is async.
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else { 
      master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:pbase    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
private boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final ServerName destination) {
  if (!state.isClosed() && !state.isOffline()) {
    this.server.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
      destination, cb, state);
  } catch (KeeperException e) {
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else {
      server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:HIndex    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
private boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final ServerName destination) {
  if (!state.isClosed() && !state.isOffline()) {
    this.server.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
      destination, cb, state);
  } catch (KeeperException e) {
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else {
      server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:uimaster    文件:ZookeeperHelper.java   
protected void processKeeperException(final KeeperException e, final Watcher watcher,final DataAction dataAction,
        boolean asynchFlag) {
    switch (e.code()) {
    case BADVERSION:

        if (asynchFlag) {
            zooKeeper.getData(e.getPath(), watcher, new AsyncCallback.DataCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                    ZData zData = ZDataImpl.newInstance(path, data, stat.getVersion());
                    updateData(zData, dataAction, watcher);
                }
            }, null);
        } else {

        }

        break;
    default:
        logger.warn("encounter exception", e);
    }
}
项目:IRIndex    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final Object ctx) {
  if (!state.isClosed() && !state.isOffline()) {
      new RuntimeException("Unexpected state trying to OFFLINE; " + state);
    this.master.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  state.update(RegionState.State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
      this.master.getServerName(), cb, ctx);
  } catch (KeeperException e) {
    // TODO: this error handling will never execute, as the callback is async.
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else { 
      master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:hbase    文件:TestReadOnlyZKClient.java   
@Test
public void testNotCloseZkWhenPending() throws Exception {
  ZooKeeper mockedZK = mock(ZooKeeper.class);
  Exchanger<AsyncCallback.DataCallback> exchanger = new Exchanger<>();
  doAnswer(i -> {
    exchanger.exchange(i.getArgument(2));
    return null;
  }).when(mockedZK).getData(anyString(), anyBoolean(),
    any(AsyncCallback.DataCallback.class), any());
  doAnswer(i -> null).when(mockedZK).close();
  when(mockedZK.getState()).thenReturn(ZooKeeper.States.CONNECTED);
  RO_ZK.zookeeper = mockedZK;
  CompletableFuture<byte[]> future = RO_ZK.get(PATH);
  AsyncCallback.DataCallback callback = exchanger.exchange(null);
  // 2 * keep alive time to ensure that we will not close the zk when there are pending requests
  Thread.sleep(6000);
  assertNotNull(RO_ZK.zookeeper);
  verify(mockedZK, never()).close();
  callback.processResult(Code.OK.intValue(), PATH, null, DATA, null);
  assertArrayEquals(DATA, future.get());
  // now we will close the idle connection.
  waitForIdleConnectionClosed();
  verify(mockedZK, times(1)).close();
}
项目:RStore    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final Object ctx) {
  if (!state.isClosed() && !state.isOffline()) {
      new RuntimeException("Unexpected state trying to OFFLINE; " + state);
    this.master.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  state.update(RegionState.State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
      this.master.getServerName(), cb, ctx);
  } catch (KeeperException e) {
    master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    return false;
  }
  return true;
}
项目:curator    文件:CuratorMultiTransactionImpl.java   
@Override
public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception
{
    try
    {
        final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background");
        AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, List<OpResult> opResults)
            {
                trace.commit();
                List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null;
                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext());
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, null);
    }
}
项目:curator    文件:BackgroundSyncImpl.java   
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
    final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("BackgroundSyncImpl");
    final String data = operationAndData.getData();
    client.getZooKeeper().sync
    (
        data,
        new AsyncCallback.VoidCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx)
            {
                trace.setReturnCode(rc).setRequestBytesLength(data).commit();
                CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, null, ctx, null, null, null, null, null, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        },
        context
    );
}
项目:curator    文件:GetACLBuilderImpl.java   
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
    try
    {
        final OperationTrace             trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Background");
        AsyncCallback.ACLCallback   callback = new AsyncCallback.ACLCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat)
            {
                trace.setReturnCode(rc).setPath(path).setStat(stat).commit();
                CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        client.getZooKeeper().getACL(operationAndData.getData(), responseStat, callback, backgrounding.getContext());
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, null);
    }
}
项目:PyroDB    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
private boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final ServerName destination) {
  if (!state.isClosed() && !state.isOffline()) {
    this.server.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
      destination, cb, state);
  } catch (KeeperException e) {
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else {
      server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:c5    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
private boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final ServerName destination) {
  if (!state.isClosed() && !state.isOffline()) {
    this.server.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
      destination, cb, state);
  } catch (KeeperException e) {
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else {
      server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:HBase-Research    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final Object ctx) {
  if (!state.isClosed() && !state.isOffline()) {
      new RuntimeException("Unexpected state trying to OFFLINE; " + state);
    this.master.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  state.update(RegionState.State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
      this.master.getServerName(), cb, ctx);
  } catch (KeeperException e) {
    // TODO: this error handling will never execute, as the callback is async.
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else { 
      master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:hbase-0.94.8-qod    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final Object ctx) {
  if (!state.isClosed() && !state.isOffline()) {
      new RuntimeException("Unexpected state trying to OFFLINE; " + state);
    this.master.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  state.update(RegionState.State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
      this.master.getServerName(), cb, ctx);
  } catch (KeeperException e) {
    // TODO: this error handling will never execute, as the callback is async.
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else { 
      master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:hbase-0.94.8-qod    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final Object ctx) {
  if (!state.isClosed() && !state.isOffline()) {
      new RuntimeException("Unexpected state trying to OFFLINE; " + state);
    this.master.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  state.update(RegionState.State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
      this.master.getServerName(), cb, ctx);
  } catch (KeeperException e) {
    // TODO: this error handling will never execute, as the callback is async.
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else { 
      master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:DominoHBase    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
private boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final ServerName destination) {
  if (!state.isClosed() && !state.isOffline()) {
    this.server.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  regionStates.updateRegionState(
    state.getRegion(), RegionState.State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
      destination, cb, state);
  } catch (KeeperException e) {
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else {
      server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:hindex    文件:AssignmentManager.java   
/**
 * Set region as OFFLINED up in zookeeper asynchronously.
 * @param state
 * @return True if we succeeded, false otherwise (State was incorrect or failed
 * updating zk).
 */
boolean asyncSetOfflineInZooKeeper(final RegionState state,
    final AsyncCallback.StringCallback cb, final Object ctx) {
  if (!state.isClosed() && !state.isOffline()) {
      new RuntimeException("Unexpected state trying to OFFLINE; " + state);
    this.master.abort("Unexpected state trying to OFFLINE; " + state,
      new IllegalStateException());
    return false;
  }
  state.update(RegionState.State.OFFLINE);
  try {
    ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
      this.master.getServerName(), cb, ctx);
  } catch (KeeperException e) {
    // TODO: this error handling will never execute, as the callback is async.
    if (e instanceof NodeExistsException) {
      LOG.warn("Node for " + state.getRegion() + " already exists");
    } else { 
      master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
    }
    return false;
  }
  return true;
}
项目:bigstreams    文件:BookKeeperTools.java   
MultiCallback(int expected, AsyncCallback.VoidCallback cb, Object context) {
    this.expected = expected;
    this.cb = cb;
    this.context = context;
    if (expected == 0) {
        cb.processResult(Code.OK.intValue(), null, context);
    }
}
项目:bigstreams    文件:BookKeeperTools.java   
/**
 * This method asynchronously gets the set of available Bookies that the
 * dead input bookie's data will be copied over into. If the user passed in
 * a specific destination bookie, then just use that one. Otherwise, we'll
 * randomly pick one of the other available bookies to use for each ledger
 * fragment we are replicating.
 * 
 * @param bookieSrc
 *            Source bookie that had a failure. We want to replicate the
 *            ledger fragments that were stored there.
 * @param bookieDest
 *            Optional destination bookie that if passed, we will copy all
 *            of the ledger fragments from the source bookie over to it.
 * @param cb
 *            RecoverCallback to invoke once all of the data on the dead
 *            bookie has been recovered and replicated.
 * @param context
 *            Context for the RecoverCallback to call.
 */
private void getAvailableBookies(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
        final RecoverCallback cb, final Object context) {
    final List<InetSocketAddress> availableBookies = new LinkedList<InetSocketAddress>();
    if (bookieDest != null) {
        availableBookies.add(bookieDest);
        // Now poll ZK to get the active ledgers
        getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
    } else {
        zk.getChildren(BOOKIES_PATH, null, new AsyncCallback.ChildrenCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, List<String> children) {
                if (rc != Code.OK.intValue()) {
                    LOG.error("ZK error getting bookie nodes: ", KeeperException.create(KeeperException.Code
                            .get(rc), path));
                    cb.recoverComplete(BKException.Code.ZKException, context);
                    return;
                }
                for (String bookieNode : children) {
                    String parts[] = bookieNode.split(COLON);
                    if (parts.length < 2) {
                        LOG.error("Bookie Node retrieved from ZK has invalid name format: " + bookieNode);
                        cb.recoverComplete(BKException.Code.ZKException, context);
                        return;
                    }
                    availableBookies.add(new InetSocketAddress(parts[0], Integer.parseInt(parts[1])));
                }
                // Now poll ZK to get the active ledgers
                getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
            }
        }, null);
    }
}
项目:zookeeper    文件:ACLTest.java   
/**
 * get acl async
 */
@Test
@ZooConfig(initNodes = {"/node"}, async = true)
public void testGetACLAsync() throws Exception {
    zooKeeper.getACL("/node", null, new AsyncCallback.ACLCallback() {

        @Override
        public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
            assertTrue(acl.equals(ZooDefs.Ids.OPEN_ACL_UNSAFE));
            assertResult(rc, "/node", path, ctx, stat);
        }

    }, null);
}
项目:zookeeper    文件:ACLTest.java   
/**
 * set acl sync
 */
@Test
@ZooConfig(initNodes = {"/node"}, async = true)
public void testSetACLAsync() throws Exception {
    zooKeeper.setACL("/node", Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone"))), -1, new AsyncCallback.StatCallback() {

        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            assertResult(rc, "/node", path, ctx, null);
        }

    }, null);
}
项目:zookeeper    文件:NodeTest.java   
@Test
@ZooConfig(initNodes = {"/node"}, async = true)
public void testCreateNodeAsync() {
    zooKeeper.create("/node/child", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {

        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            assertEquals("/node/child", name);
            assertResult(rc, "/node/child", path, ctx, null);
        }

    }, null);
}
项目:zookeeper    文件:NodeTest.java   
@Test
@ZooConfig(initNodes = {"/node"}, async = true)
public void testDeleteNodeAsync() throws Exception {
    zooKeeper.delete("/node", -1, new AsyncCallback.VoidCallback() {

        @Override
        public void processResult(int rc, String path, Object ctx) {
            assertResult(rc, "/node", path, ctx, null);
        }

    }, null);
}
项目:zookeeper    文件:NodeTest.java   
@Test
@ZooConfig(initNodes = {"/node"}, async = true)
public void testNodeExistsAsync() throws Exception {
    zooKeeper.exists("/node", false, new AsyncCallback.StatCallback() {

        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            assertResult(rc, "/node", path, ctx, stat);
        }

    }, null);
}
项目:zookeeper    文件:NodeTest.java   
@Test
@ZooConfig(initNodes = {"/node"}, async = true)
public void testGetNodeDataAsync() throws Exception {
    zooKeeper.setData("/node", "data".getBytes(), -1);
    zooKeeper.getData("/node", false, new AsyncCallback.DataCallback() {

        @Override
        public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
            assertEquals("data", new String(data));
            assertResult(rc, "/node", path, ctx, stat);
        }

    }, null);
}
项目:zookeeper    文件:NodeTest.java   
@Test
@ZooConfig(initNodes = {"/node"}, async = true)
public void testSetNodeDataAsync() throws Exception {
    zooKeeper.setData("/node", "data".getBytes(), -1, new AsyncCallback.StatCallback() {

        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            assertResult(rc, "/node", path, ctx, stat);
        }

    }, null);
}
项目:zookeeper    文件:NodeTest.java   
@Test
@ZooConfig(initNodes = {"/node/child"}, async = true)
public void testGetChildrenAsync() throws Exception {
    zooKeeper.getChildren("/node", false, new AsyncCallback.ChildrenCallback() {

        @Override
        public void processResult(int rc, String path, Object ctx, List<String> children) {
            assertTrue(children.equals(Arrays.asList("child")));
            assertResult(rc, "/node", path, ctx, stat);
        }

    }, null);
}
项目:distributedlog    文件:ZKSubscriptionStateStore.java   
Future<DLSN> getLastCommitPositionFromZK() {
    final Promise<DLSN> result = new Promise<DLSN>();
    try {
        logger.debug("Reading last commit position from path {}", zkPath);
        zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc);
                if (KeeperException.Code.NONODE.intValue() == rc) {
                    result.setValue(DLSN.NonInclusiveLowerBound);
                } else if (KeeperException.Code.OK.intValue() != rc) {
                    result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
                } else {
                    try {
                        DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8));
                        result.setValue(dlsn);
                    } catch (Exception t) {
                        logger.warn("Invalid last commit position found from path {}", zkPath, t);
                        // invalid dlsn recorded in subscription state store
                        result.setValue(DLSN.NonInclusiveLowerBound);
                    }
                }
            }
        }, null);
    } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
        result.setException(zkce);
    } catch (InterruptedException ie) {
        result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
    }
    return result;
}
项目:distributedlog    文件:LedgerAllocatorPool.java   
private void createAllocators(int numAllocators) throws InterruptedException, IOException {
    final AtomicInteger numPendings = new AtomicInteger(numAllocators);
    final AtomicInteger numFailures = new AtomicInteger(0);
    final CountDownLatch latch = new CountDownLatch(1);
    AsyncCallback.StringCallback createCallback = new AsyncCallback.StringCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            if (KeeperException.Code.OK.intValue() != rc) {
                numFailures.incrementAndGet();
                latch.countDown();
                return;
            }
            if (numPendings.decrementAndGet() == 0 && numFailures.get() == 0) {
                latch.countDown();
            }
        }
    };
    for (int i = 0; i < numAllocators; i++) {
        zkc.get().create(poolPath + "/A", new byte[0],
                         zkc.getDefaultACL(),
                         CreateMode.PERSISTENT_SEQUENTIAL,
                         createCallback, null);
    }
    latch.await();
    if (numFailures.get() > 0) {
        throw new IOException("Failed to create " + numAllocators + " allocators.");
    }
}
项目:distributedlog    文件:ZKSessionLock.java   
/**
 * Get client id and its ephemeral owner.
 *
 * @param zkClient
 *          zookeeper client
 * @param lockPath
 *          lock path
 * @param nodeName
 *          node name
 * @return client id and its ephemeral owner.
 */
static Future<Pair<String, Long>> asyncParseClientID(ZooKeeper zkClient, String lockPath, String nodeName) {
    String[] parts = nodeName.split("_");
    // member_<clientid>_s<owner_session>_
    if (4 == parts.length && parts[2].startsWith("s")) {
        long sessionOwner = Long.parseLong(parts[2].substring(1));
        String clientId;
        try {
            clientId = URLDecoder.decode(parts[1], UTF_8.name());
            return Future.value(Pair.of(clientId, sessionOwner));
        } catch (UnsupportedEncodingException e) {
            // if failed to parse client id, we have to get client id by zookeeper#getData.
        }
    }
    final Promise<Pair<String, Long>> promise = new Promise<Pair<String, Long>>();
    zkClient.getData(lockPath + "/" + nodeName, false, new AsyncCallback.DataCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
            if (KeeperException.Code.OK.intValue() != rc) {
                promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
            } else {
                promise.setValue(Pair.of(deserializeClientId(data), stat.getEphemeralOwner()));
            }
        }
    }, null);
    return promise;
}
项目:distributedlog    文件:ZKSessionLock.java   
private void deleteLockNode(final Promise<BoxedUnit> promise) {
    if (null == currentNode) {
        promise.setValue(BoxedUnit.UNIT);
        return;
    }

    zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() {
        @Override
        public void processResult(final int rc, final String path, Object ctx) {
            lockStateExecutor.submit(lockPath, new SafeRunnable() {
                @Override
                public void safeRun() {
                    if (KeeperException.Code.OK.intValue() == rc) {
                        LOG.info("Deleted lock node {} for {} successfully.", path, lockId);
                    } else if (KeeperException.Code.NONODE.intValue() == rc ||
                            KeeperException.Code.SESSIONEXPIRED.intValue() == rc) {
                        LOG.info("Delete node failed. Node already gone for node {} id {}, rc = {}",
                                new Object[] { path, lockId, KeeperException.Code.get(rc) });
                    } else {
                        LOG.error("Failed on deleting lock node {} for {} : {}",
                                new Object[] { path, lockId, KeeperException.Code.get(rc) });
                    }

                    FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockUnlockCleanup);
                    promise.setValue(BoxedUnit.UNIT);
                }
            });
        }
    }, null);
}
项目:distributedlog    文件:ZKSessionLock.java   
/**
 * Check Lock Owner Phase 1 : Get all lock waiters.
 *
 * @param lockWatcher
 *          lock watcher.
 * @param wait
 *          whether to wait for ownership.
 * @param promise
 *          promise to satisfy with current lock owner
 */
private void checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
                                             final boolean wait,
                                             final Promise<String> promise) {
    zk.getChildren(lockPath, false, new AsyncCallback.Children2Callback() {
        @Override
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
            processLockWaiters(lockWatcher, wait, rc, children, promise);
        }
    }, null);
}
项目:curator    文件:RemoveWatchesBuilderImpl.java   
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData)
        throws Exception
{
    try
    {
        final TimeTrace   trace = client.getZookeeperClient().startTracer("RemoteWatches-Background");

        AsyncCallback.VoidCallback callback = new AsyncCallback.VoidCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx)
            {
                trace.commit();
                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.REMOVE_WATCHES, rc, path, null, ctx, null, null, null, null, null, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        };

        ZooKeeper zkClient = client.getZooKeeper();
        NamespaceWatcher namespaceWatcher = makeNamespaceWatcher(operationAndData.getData());
        if(namespaceWatcher == null)
        {
            zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext());
        }
        else
        {
            zkClient.removeWatches(operationAndData.getData(), namespaceWatcher, watcherType, local, callback, operationAndData.getContext());
        }
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, null);
    }
}
项目:curator    文件:GetConfigBuilderImpl.java   
@Override
public void performBackgroundOperation(final OperationAndData<Void> operationAndData) throws Exception
{
    try
    {
        final TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Background");
        AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
            {
                watching.commitWatcher(rc, false);
                trace.commit();
                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG, rc, path, null, ctx, stat, data, null, null, null, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        if ( watching.isWatched() )
        {
            client.getZooKeeper().getConfig(true, callback, backgrounding.getContext());
        }
        else
        {
            client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), callback, backgrounding.getContext());
        }
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, watching);
    }
}
项目:curator    文件:SetDataBuilderImpl.java   
@Override
public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception
{
    try
    {
        final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Background");
        final byte[] data = operationAndData.getData().getData();
        client.getZooKeeper().setData
        (
            operationAndData.getData().getPath(),
            data,
            version,
            new AsyncCallback.StatCallback()
            {
                @SuppressWarnings({"unchecked"})
                @Override
                public void processResult(int rc, String path, Object ctx, Stat stat)
                {
                    trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).setStat(stat).commit();
                    CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_DATA, rc, path, null, ctx, stat, null, null, null, null, null);
                    client.processBackgroundOperation(operationAndData, event);
                }
            },
            backgrounding.getContext()
        );
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, null);
    }
}
项目:curator    文件:ExistsBuilderImpl.java   
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
    try
    {
        final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("ExistsBuilderImpl-Background");
        AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat)
            {
                watching.commitWatcher(rc, true);
                trace.setReturnCode(rc).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(stat).commit();
                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null, null);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        if ( watching.isWatched() )
        {
            client.getZooKeeper().exists(operationAndData.getData(), true, callback, backgrounding.getContext());
        }
        else
        {
            client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(operationAndData.getData()), callback, backgrounding.getContext());
        }
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, watching);
    }
}