Java 类org.apache.hadoop.util.ZKUtil 实例源码

项目:hadoop    文件:RMZKUtils.java   
/**
 * Utility method to fetch ZK auth info from the configuration
 */
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
    throws Exception {
  String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
  try {
    zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
    if (zkAuthConf != null) {
      return ZKUtil.parseAuth(zkAuthConf);
    } else {
      return Collections.emptyList();
    }
  } catch (Exception e) {
    LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
    throw e;
  }
}
项目:hadoop    文件:ZKRMStateStore.java   
/**
 * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
 * ZooKeeper access, construct the {@link ACL}s for the store's root node.
 * In the constructed {@link ACL}, all the users allowed by zkAcl are given
 * rwa access, while the current RM has exclude create-delete access.
 *
 * To be called only when HA is enabled and the configuration doesn't set ACL
 * for the root node.
 */
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
    Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
  List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
  for (ACL acl : sourceACLs) {
    zkRootNodeAcl.add(new ACL(
        ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
        acl.getId()));
  }

  zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
      YarnConfiguration.RM_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
  Id rmId = new Id(zkRootNodeAuthScheme,
      DigestAuthenticationProvider.generateDigest(
          zkRootNodeUsername + ":" + zkRootNodePassword));
  zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
  return zkRootNodeAcl;
}
项目:hadoop    文件:TestZKRMStateStoreZKClientConnections.java   
@Test(timeout = 20000)
public void testInvalidZKAclConfiguration() {
  TestZKClient zkClientTester = new TestZKClient();
  YarnConfiguration conf = new YarnConfiguration();
  conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*");
  try {
    zkClientTester.getRMStateStore(conf);
    fail("ZKRMStateStore created with bad ACL");
  } catch (ZKUtil.BadAclFormatException bafe) {
    // expected behavior
  } catch (Exception e) {
    String error = "Incorrect exception on BadAclFormat";
    LOG.error(error, e);
    fail(error);
  }
}
项目:aliyun-oss-hadoop-fs    文件:RMZKUtils.java   
/**
 * Utility method to fetch ZK auth info from the configuration
 */
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
    throws Exception {
  String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
  try {
    zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
    if (zkAuthConf != null) {
      return ZKUtil.parseAuth(zkAuthConf);
    } else {
      return Collections.emptyList();
    }
  } catch (Exception e) {
    LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
    throw e;
  }
}
项目:aliyun-oss-hadoop-fs    文件:ZKRMStateStore.java   
/**
 * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
 * ZooKeeper access, construct the {@link ACL}s for the store's root node.
 * In the constructed {@link ACL}, all the users allowed by zkAcl are given
 * rwa access, while the current RM has exclude create-delete access.
 *
 * To be called only when HA is enabled and the configuration doesn't set ACL
 * for the root node.
 */
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
    Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
  List<ACL> zkRootNodeAcl = new ArrayList<>();
  for (ACL acl : sourceACLs) {
    zkRootNodeAcl.add(new ACL(
        ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
        acl.getId()));
  }

  zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
      YarnConfiguration.RM_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
  Id rmId = new Id(zkRootNodeAuthScheme,
      DigestAuthenticationProvider.generateDigest(
          zkRootNodeUsername + ":" + zkRootNodePassword));
  zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
  return zkRootNodeAcl;
}
项目:aliyun-oss-hadoop-fs    文件:ZKRMStateStore.java   
private void createConnection() throws Exception {
  // Curator connection
  CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
  builder = builder.connectString(zkHostPort)
      .connectionTimeoutMs(zkSessionTimeout)
      .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval));

  // Set up authorization based on fencing scheme
  List<AuthInfo> authInfos = new ArrayList<>();
  for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
    authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
  }
  if (useDefaultFencingScheme) {
    byte[] defaultFencingAuth =
        (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(
            Charset.forName("UTF-8"));
    authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth));
  }
  builder = builder.authorization(authInfos);

  // Connect to ZK
  curatorFramework = builder.build();
  curatorFramework.start();
}
项目:aliyun-oss-hadoop-fs    文件:TestZKRMStateStoreZKClientConnections.java   
@Test(timeout = 20000)
public void testInvalidZKAclConfiguration() {
  TestZKClient zkClientTester = new TestZKClient();
  YarnConfiguration conf = new YarnConfiguration();
  conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*");
  try {
    zkClientTester.getRMStateStore(conf);
    fail("ZKRMStateStore created with bad ACL");
  } catch (ZKUtil.BadAclFormatException bafe) {
    // expected behavior
  } catch (Exception e) {
    String error = "Incorrect exception on BadAclFormat";
    LOG.error(error, e);
    fail(error);
  }
}
项目:big-c    文件:RMZKUtils.java   
/**
 * Utility method to fetch ZK auth info from the configuration
 */
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
    throws Exception {
  String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
  try {
    zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
    if (zkAuthConf != null) {
      return ZKUtil.parseAuth(zkAuthConf);
    } else {
      return Collections.emptyList();
    }
  } catch (Exception e) {
    LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
    throw e;
  }
}
项目:big-c    文件:ZKRMStateStore.java   
/**
 * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
 * ZooKeeper access, construct the {@link ACL}s for the store's root node.
 * In the constructed {@link ACL}, all the users allowed by zkAcl are given
 * rwa access, while the current RM has exclude create-delete access.
 *
 * To be called only when HA is enabled and the configuration doesn't set ACL
 * for the root node.
 */
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
    Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
  List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
  for (ACL acl : sourceACLs) {
    zkRootNodeAcl.add(new ACL(
        ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
        acl.getId()));
  }

  zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
      YarnConfiguration.RM_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
  Id rmId = new Id(zkRootNodeAuthScheme,
      DigestAuthenticationProvider.generateDigest(
          zkRootNodeUsername + ":" + zkRootNodePassword));
  zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
  return zkRootNodeAcl;
}
项目:big-c    文件:TestZKRMStateStoreZKClientConnections.java   
@Test(timeout = 20000)
public void testInvalidZKAclConfiguration() {
  TestZKClient zkClientTester = new TestZKClient();
  YarnConfiguration conf = new YarnConfiguration();
  conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*");
  try {
    zkClientTester.getRMStateStore(conf);
    fail("ZKRMStateStore created with bad ACL");
  } catch (ZKUtil.BadAclFormatException bafe) {
    // expected behavior
  } catch (Exception e) {
    String error = "Incorrect exception on BadAclFormat";
    LOG.error(error, e);
    fail(error);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:RMZKUtils.java   
/**
 * Utility method to fetch ZK auth info from the configuration
 */
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
    throws Exception {
  String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
  try {
    zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
    if (zkAuthConf != null) {
      return ZKUtil.parseAuth(zkAuthConf);
    } else {
      return Collections.emptyList();
    }
  } catch (Exception e) {
    LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
    throw e;
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ZKRMStateStore.java   
/**
 * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
 * ZooKeeper access, construct the {@link ACL}s for the store's root node.
 * In the constructed {@link ACL}, all the users allowed by zkAcl are given
 * rwa access, while the current RM has exclude create-delete access.
 *
 * To be called only when HA is enabled and the configuration doesn't set ACL
 * for the root node.
 */
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
    Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
  List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
  for (ACL acl : sourceACLs) {
    zkRootNodeAcl.add(new ACL(
        ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
        acl.getId()));
  }

  zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
      YarnConfiguration.RM_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
  Id rmId = new Id(zkRootNodeAuthScheme,
      DigestAuthenticationProvider.generateDigest(
          zkRootNodeUsername + ":" + zkRootNodePassword));
  zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
  return zkRootNodeAcl;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestZKRMStateStoreZKClientConnections.java   
@Test(timeout = 20000)
public void testInvalidZKAclConfiguration() {
  TestZKClient zkClientTester = new TestZKClient();
  YarnConfiguration conf = new YarnConfiguration();
  conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*");
  try {
    zkClientTester.getRMStateStore(conf);
    fail("ZKRMStateStore created with bad ACL");
  } catch (ZKUtil.BadAclFormatException bafe) {
    // expected behavior
  } catch (Exception e) {
    String error = "Incorrect exception on BadAclFormat";
    LOG.error(error, e);
    fail(error);
  }
}
项目:hops    文件:RMZKUtils.java   
/**
 * Utility method to fetch ZK auth info from the configuration
 */
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
    throws Exception {
  String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
  try {
    zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
    if (zkAuthConf != null) {
      return ZKUtil.parseAuth(zkAuthConf);
    } else {
      return Collections.emptyList();
    }
  } catch (Exception e) {
    LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
    throw e;
  }
}
项目:hops    文件:ZKRMStateStore.java   
/**
 * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
 * ZooKeeper access, construct the {@link ACL}s for the store's root node.
 * In the constructed {@link ACL}, all the users allowed by zkAcl are given
 * rwa access, while the current RM has exclude create-delete access.
 *
 * To be called only when HA is enabled and the configuration doesn't set ACL
 * for the root node.
 */
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
    Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
  List<ACL> zkRootNodeAcl = new ArrayList<>();
  for (ACL acl : sourceACLs) {
    zkRootNodeAcl.add(new ACL(
        ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
        acl.getId()));
  }

  zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
      YarnConfiguration.RM_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
  Id rmId = new Id(zkRootNodeAuthScheme,
      DigestAuthenticationProvider.generateDigest(
          zkRootNodeUsername + ":" + resourceManager.getZkRootNodePassword()));
  zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
  return zkRootNodeAcl;
}
项目:hops    文件:TestZKRMStateStoreZKClientConnections.java   
@Test(timeout = 20000)
public void testInvalidZKAclConfiguration() {
  TestZKClient zkClientTester = new TestZKClient();
  YarnConfiguration conf = new YarnConfiguration();
  conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*");
  try {
    zkClientTester.getRMStateStore(conf);
    fail("ZKRMStateStore created with bad ACL");
  } catch (ZKUtil.BadAclFormatException bafe) {
    // expected behavior
  } catch (Exception e) {
    String error = "Incorrect exception on BadAclFormat";
    LOG.error(error, e);
    fail(error);
  }
}
项目:hadoop-TCP    文件:TestZKUtil.java   
@Test
public void testConfIndirection() throws IOException {
  assertNull(ZKUtil.resolveConfIndirection(null));
  assertEquals("x", ZKUtil.resolveConfIndirection("x"));

  TEST_FILE.getParentFile().mkdirs();
  Files.write("hello world", TEST_FILE, Charsets.UTF_8);
  assertEquals("hello world", ZKUtil.resolveConfIndirection(
      "@" + TEST_FILE.getAbsolutePath()));

  try {
    ZKUtil.resolveConfIndirection("@" + BOGUS_FILE);
    fail("Did not throw for non-existent file reference");
  } catch (FileNotFoundException fnfe) {
    assertTrue(fnfe.getMessage().startsWith(BOGUS_FILE));
  }
}
项目:hardfs    文件:TestZKUtil.java   
@Test
public void testConfIndirection() throws IOException {
  assertNull(ZKUtil.resolveConfIndirection(null));
  assertEquals("x", ZKUtil.resolveConfIndirection("x"));

  TEST_FILE.getParentFile().mkdirs();
  Files.write("hello world", TEST_FILE, Charsets.UTF_8);
  assertEquals("hello world", ZKUtil.resolveConfIndirection(
      "@" + TEST_FILE.getAbsolutePath()));

  try {
    ZKUtil.resolveConfIndirection("@" + BOGUS_FILE);
    fail("Did not throw for non-existent file reference");
  } catch (FileNotFoundException fnfe) {
    assertTrue(fnfe.getMessage().startsWith(BOGUS_FILE));
  }
}
项目:hadoop-on-lustre2    文件:RMZKUtils.java   
/**
 * Utility method to fetch ZK auth info from the configuration
 */
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
    throws Exception {
  String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
  try {
    zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
    if (zkAuthConf != null) {
      return ZKUtil.parseAuth(zkAuthConf);
    } else {
      return Collections.emptyList();
    }
  } catch (Exception e) {
    LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
    throw e;
  }
}
项目:hadoop-on-lustre2    文件:ZKRMStateStore.java   
/**
 * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
 * ZooKeeper access, construct the {@link ACL}s for the store's root node.
 * In the constructed {@link ACL}, all the users allowed by zkAcl are given
 * rwa access, while the current RM has exclude create-delete access.
 *
 * To be called only when HA is enabled and the configuration doesn't set ACL
 * for the root node.
 */
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
    Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
  List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
  for (ACL acl : sourceACLs) {
    zkRootNodeAcl.add(new ACL(
        ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
        acl.getId()));
  }

  zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
      YarnConfiguration.RM_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
  Id rmId = new Id(zkRootNodeAuthScheme,
      DigestAuthenticationProvider.generateDigest(
          zkRootNodeUsername + ":" + zkRootNodePassword));
  zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
  return zkRootNodeAcl;
}
项目:hadoop-on-lustre2    文件:TestZKRMStateStoreZKClientConnections.java   
@Test(timeout = 20000)
public void testInvalidZKAclConfiguration() {
  TestZKClient zkClientTester = new TestZKClient();
  YarnConfiguration conf = new YarnConfiguration();
  conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*");
  try {
    zkClientTester.getRMStateStore(conf);
    fail("ZKRMStateStore created with bad ACL");
  } catch (ZKUtil.BadAclFormatException bafe) {
    // expected behavior
  } catch (Exception e) {
    String error = "Incorrect exception on BadAclFormat";
    LOG.error(error, e);
    fail(error);
  }
}
项目:hadoop-oss    文件:ZKFailoverController.java   
private void initZK() throws HadoopIllegalArgumentException, IOException,
    KeeperException {
  zkQuorum = conf.get(ZK_QUORUM_KEY);
  int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
      ZK_SESSION_TIMEOUT_DEFAULT);
  // Parse ACLs from configuration.
  String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
  zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
  List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
  if (zkAcls.isEmpty()) {
    zkAcls = Ids.CREATOR_ALL_ACL;
  }

  // Parse authentication from configuration.
  String zkAuthConf = conf.get(ZK_AUTH_KEY);
  zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
  List<ZKAuthInfo> zkAuths;
  if (zkAuthConf != null) {
    zkAuths = ZKUtil.parseAuth(zkAuthConf);
  } else {
    zkAuths = Collections.emptyList();
  }

  // Sanity check configuration.
  Preconditions.checkArgument(zkQuorum != null,
      "Missing required configuration '%s' for ZooKeeper quorum",
      ZK_QUORUM_KEY);
  Preconditions.checkArgument(zkTimeout > 0,
      "Invalid ZK session timeout %s", zkTimeout);

  int maxRetryNum = conf.getInt(
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
  elector = new ActiveStandbyElector(zkQuorum,
      zkTimeout, getParentZnode(), zkAcls, zkAuths,
      new ElectorCallbacks(), maxRetryNum);
}
项目:hadoop    文件:EmbeddedElectorService.java   
@Override
protected void serviceInit(Configuration conf)
    throws Exception {
  conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);

  String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
  if (zkQuorum == null) {
   throw new YarnRuntimeException("Embedded automatic failover " +
        "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
        " is not set");
  }

  String rmId = HAUtil.getRMHAId(conf);
  String clusterId = YarnConfiguration.getClusterId(conf);
  localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);

  String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
      YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
  String electionZNode = zkBasePath + "/" + clusterId;

  long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
      YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);

  List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
  List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);

  int maxRetryNum = conf.getInt(
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
  elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
      electionZNode, zkAcls, zkAuths, this, maxRetryNum);

  elector.ensureParentZNode();
  if (!isParentZnodeSafe(clusterId)) {
    notifyFatalError(electionZNode + " znode has invalid data! "+
        "Might need formatting!");
  }

  super.serviceInit(conf);
}
项目:hadoop    文件:RMZKUtils.java   
/**
 * Utility method to fetch the ZK ACLs from the configuration
 */
public static List<ACL> getZKAcls(Configuration conf) throws Exception {
  // Parse authentication from configuration.
  String zkAclConf =
      conf.get(YarnConfiguration.RM_ZK_ACL,
          YarnConfiguration.DEFAULT_RM_ZK_ACL);
  try {
    zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
    return ZKUtil.parseACLs(zkAclConf);
  } catch (Exception e) {
    LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
    throw e;
  }
}
项目:hadoop    文件:ZKRMStateStore.java   
private synchronized void createConnection()
    throws IOException, InterruptedException {
  closeZkClients();
  for (int retries = 0; retries < numRetries && zkClient == null;
      retries++) {
    try {
      activeZkClient = getNewZooKeeper();
      zkClient = activeZkClient;
      for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
        zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
      }
      if (useDefaultFencingScheme) {
        zkClient.addAuthInfo(zkRootNodeAuthScheme,
            (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(Charset.forName("UTF-8")));
      }
    } catch (IOException ioe) {
      // Retry in case of network failures
      LOG.info("Failed to connect to the ZooKeeper on attempt - " +
          (retries + 1));
      ioe.printStackTrace();
    }
  }
  if (zkClient == null) {
    LOG.error("Unable to connect to Zookeeper");
    throw new YarnRuntimeException("Unable to connect to Zookeeper");
  }
  ZKRMStateStore.this.notifyAll();
  LOG.info("Created new ZK connection");
}
项目:hadoop    文件:RegistrySecurity.java   
/**
 * Parse an ACL list. This includes configuration indirection
 * {@link ZKUtil#resolveConfIndirection(String)}
 * @param zkAclConf configuration string
 * @return an ACL list
 * @throws IOException on a bad ACL parse
 */
public List<ACL> parseACLs(String zkAclConf) throws IOException {
  try {
    return ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf));
  } catch (ZKUtil.BadAclFormatException e) {
    throw new IOException("Parsing " + zkAclConf + " :" + e, e);
  }
}
项目:hadoop    文件:ZKFailoverController.java   
private void initZK() throws HadoopIllegalArgumentException, IOException,
    KeeperException {
  zkQuorum = conf.get(ZK_QUORUM_KEY);
  int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
      ZK_SESSION_TIMEOUT_DEFAULT);
  // Parse ACLs from configuration.
  String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
  zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
  List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
  if (zkAcls.isEmpty()) {
    zkAcls = Ids.CREATOR_ALL_ACL;
  }

  // Parse authentication from configuration.
  String zkAuthConf = conf.get(ZK_AUTH_KEY);
  zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
  List<ZKAuthInfo> zkAuths;
  if (zkAuthConf != null) {
    zkAuths = ZKUtil.parseAuth(zkAuthConf);
  } else {
    zkAuths = Collections.emptyList();
  }

  // Sanity check configuration.
  Preconditions.checkArgument(zkQuorum != null,
      "Missing required configuration '%s' for ZooKeeper quorum",
      ZK_QUORUM_KEY);
  Preconditions.checkArgument(zkTimeout > 0,
      "Invalid ZK session timeout %s", zkTimeout);

  int maxRetryNum = conf.getInt(
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
  elector = new ActiveStandbyElector(zkQuorum,
      zkTimeout, getParentZnode(), zkAcls, zkAuths,
      new ElectorCallbacks(), maxRetryNum);
}
项目:aliyun-oss-hadoop-fs    文件:RMZKUtils.java   
/**
 * Utility method to fetch the ZK ACLs from the configuration
 */
public static List<ACL> getZKAcls(Configuration conf) throws Exception {
  // Parse authentication from configuration.
  String zkAclConf =
      conf.get(YarnConfiguration.RM_ZK_ACL,
          YarnConfiguration.DEFAULT_RM_ZK_ACL);
  try {
    zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
    return ZKUtil.parseACLs(zkAclConf);
  } catch (Exception e) {
    LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
    throw e;
  }
}
项目:aliyun-oss-hadoop-fs    文件:RegistrySecurity.java   
/**
 * Parse an ACL list. This includes configuration indirection
 * {@link ZKUtil#resolveConfIndirection(String)}
 * @param zkAclConf configuration string
 * @return an ACL list
 * @throws IOException on a bad ACL parse
 */
public List<ACL> parseACLs(String zkAclConf) throws IOException {
  try {
    return ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf));
  } catch (ZKUtil.BadAclFormatException e) {
    throw new IOException("Parsing " + zkAclConf + " :" + e, e);
  }
}
项目:aliyun-oss-hadoop-fs    文件:ZKFailoverController.java   
private void initZK() throws HadoopIllegalArgumentException, IOException,
    KeeperException {
  zkQuorum = conf.get(ZK_QUORUM_KEY);
  int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
      ZK_SESSION_TIMEOUT_DEFAULT);
  // Parse ACLs from configuration.
  String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
  zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
  List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
  if (zkAcls.isEmpty()) {
    zkAcls = Ids.CREATOR_ALL_ACL;
  }

  // Parse authentication from configuration.
  String zkAuthConf = conf.get(ZK_AUTH_KEY);
  zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
  List<ZKAuthInfo> zkAuths;
  if (zkAuthConf != null) {
    zkAuths = ZKUtil.parseAuth(zkAuthConf);
  } else {
    zkAuths = Collections.emptyList();
  }

  // Sanity check configuration.
  Preconditions.checkArgument(zkQuorum != null,
      "Missing required configuration '%s' for ZooKeeper quorum",
      ZK_QUORUM_KEY);
  Preconditions.checkArgument(zkTimeout > 0,
      "Invalid ZK session timeout %s", zkTimeout);

  int maxRetryNum = conf.getInt(
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
  elector = new ActiveStandbyElector(zkQuorum,
      zkTimeout, getParentZnode(), zkAcls, zkAuths,
      new ElectorCallbacks(), maxRetryNum);
}
项目:big-c    文件:EmbeddedElectorService.java   
@Override
protected void serviceInit(Configuration conf)
    throws Exception {
  conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);

  String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
  if (zkQuorum == null) {
   throw new YarnRuntimeException("Embedded automatic failover " +
        "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
        " is not set");
  }

  String rmId = HAUtil.getRMHAId(conf);
  String clusterId = YarnConfiguration.getClusterId(conf);
  localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);

  String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
      YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
  String electionZNode = zkBasePath + "/" + clusterId;

  long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
      YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);

  List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
  List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);

  int maxRetryNum = conf.getInt(
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
  elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
      electionZNode, zkAcls, zkAuths, this, maxRetryNum);

  elector.ensureParentZNode();
  if (!isParentZnodeSafe(clusterId)) {
    notifyFatalError(electionZNode + " znode has invalid data! "+
        "Might need formatting!");
  }

  super.serviceInit(conf);
}
项目:big-c    文件:RMZKUtils.java   
/**
 * Utility method to fetch the ZK ACLs from the configuration
 */
public static List<ACL> getZKAcls(Configuration conf) throws Exception {
  // Parse authentication from configuration.
  String zkAclConf =
      conf.get(YarnConfiguration.RM_ZK_ACL,
          YarnConfiguration.DEFAULT_RM_ZK_ACL);
  try {
    zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
    return ZKUtil.parseACLs(zkAclConf);
  } catch (Exception e) {
    LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
    throw e;
  }
}
项目:big-c    文件:ZKRMStateStore.java   
private synchronized void createConnection()
    throws IOException, InterruptedException {
  closeZkClients();
  for (int retries = 0; retries < numRetries && zkClient == null;
      retries++) {
    try {
      activeZkClient = getNewZooKeeper();
      zkClient = activeZkClient;
      for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
        zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
      }
      if (useDefaultFencingScheme) {
        zkClient.addAuthInfo(zkRootNodeAuthScheme,
            (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(Charset.forName("UTF-8")));
      }
    } catch (IOException ioe) {
      // Retry in case of network failures
      LOG.info("Failed to connect to the ZooKeeper on attempt - " +
          (retries + 1));
      ioe.printStackTrace();
    }
  }
  if (zkClient == null) {
    LOG.error("Unable to connect to Zookeeper");
    throw new YarnRuntimeException("Unable to connect to Zookeeper");
  }
  ZKRMStateStore.this.notifyAll();
  LOG.info("Created new ZK connection");
}
项目:big-c    文件:RegistrySecurity.java   
/**
 * Parse an ACL list. This includes configuration indirection
 * {@link ZKUtil#resolveConfIndirection(String)}
 * @param zkAclConf configuration string
 * @return an ACL list
 * @throws IOException on a bad ACL parse
 */
public List<ACL> parseACLs(String zkAclConf) throws IOException {
  try {
    return ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf));
  } catch (ZKUtil.BadAclFormatException e) {
    throw new IOException("Parsing " + zkAclConf + " :" + e, e);
  }
}
项目:big-c    文件:ZKFailoverController.java   
private void initZK() throws HadoopIllegalArgumentException, IOException,
    KeeperException {
  zkQuorum = conf.get(ZK_QUORUM_KEY);
  int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
      ZK_SESSION_TIMEOUT_DEFAULT);
  // Parse ACLs from configuration.
  String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
  zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
  List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
  if (zkAcls.isEmpty()) {
    zkAcls = Ids.CREATOR_ALL_ACL;
  }

  // Parse authentication from configuration.
  String zkAuthConf = conf.get(ZK_AUTH_KEY);
  zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
  List<ZKAuthInfo> zkAuths;
  if (zkAuthConf != null) {
    zkAuths = ZKUtil.parseAuth(zkAuthConf);
  } else {
    zkAuths = Collections.emptyList();
  }

  // Sanity check configuration.
  Preconditions.checkArgument(zkQuorum != null,
      "Missing required configuration '%s' for ZooKeeper quorum",
      ZK_QUORUM_KEY);
  Preconditions.checkArgument(zkTimeout > 0,
      "Invalid ZK session timeout %s", zkTimeout);

  int maxRetryNum = conf.getInt(
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
  elector = new ActiveStandbyElector(zkQuorum,
      zkTimeout, getParentZnode(), zkAcls, zkAuths,
      new ElectorCallbacks(), maxRetryNum);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:EmbeddedElectorService.java   
@Override
protected void serviceInit(Configuration conf)
    throws Exception {
  conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);

  String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
  if (zkQuorum == null) {
   throw new YarnRuntimeException("Embedded automatic failover " +
        "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
        " is not set");
  }

  String rmId = HAUtil.getRMHAId(conf);
  String clusterId = YarnConfiguration.getClusterId(conf);
  localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);

  String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
      YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
  String electionZNode = zkBasePath + "/" + clusterId;

  long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
      YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);

  List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
  List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);

  elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
      electionZNode, zkAcls, zkAuths, this);

  elector.ensureParentZNode();
  if (!isParentZnodeSafe(clusterId)) {
    notifyFatalError(electionZNode + " znode has invalid data! "+
        "Might need formatting!");
  }

  super.serviceInit(conf);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:RMZKUtils.java   
/**
 * Utility method to fetch the ZK ACLs from the configuration
 */
public static List<ACL> getZKAcls(Configuration conf) throws Exception {
  // Parse authentication from configuration.
  String zkAclConf =
      conf.get(YarnConfiguration.RM_ZK_ACL,
          YarnConfiguration.DEFAULT_RM_ZK_ACL);
  try {
    zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
    return ZKUtil.parseACLs(zkAclConf);
  } catch (Exception e) {
    LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
    throw e;
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ZKRMStateStore.java   
private synchronized void createConnection()
    throws IOException, InterruptedException {
  closeZkClients();
  for (int retries = 0; retries < numRetries && zkClient == null;
      retries++) {
    try {
      activeZkClient = getNewZooKeeper();
      zkClient = activeZkClient;
      for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
        zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
      }
      if (useDefaultFencingScheme) {
        zkClient.addAuthInfo(zkRootNodeAuthScheme,
            (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes());
      }
    } catch (IOException ioe) {
      // Retry in case of network failures
      LOG.info("Failed to connect to the ZooKeeper on attempt - " +
          (retries + 1));
      ioe.printStackTrace();
    }
  }
  if (zkClient == null) {
    LOG.error("Unable to connect to Zookeeper");
    throw new YarnRuntimeException("Unable to connect to Zookeeper");
  }
  ZKRMStateStore.this.notifyAll();
  LOG.info("Created new ZK connection");
}
项目:hadoop-2.6.0-cdh5.4.3    文件:RegistrySecurity.java   
/**
 * Parse an ACL list. This includes configuration indirection
 * {@link ZKUtil#resolveConfIndirection(String)}
 * @param zkAclConf configuration string
 * @return an ACL list
 * @throws IOException on a bad ACL parse
 */
public List<ACL> parseACLs(String zkAclConf) throws IOException {
  try {
    return ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf));
  } catch (ZKUtil.BadAclFormatException e) {
    throw new IOException("Parsing " + zkAclConf + " :" + e, e);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ZKFailoverController.java   
private void initZK() throws HadoopIllegalArgumentException, IOException,
    KeeperException {
  zkQuorum = conf.get(ZK_QUORUM_KEY);
  int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
      ZK_SESSION_TIMEOUT_DEFAULT);
  // Parse ACLs from configuration.
  String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
  zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
  List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
  if (zkAcls.isEmpty()) {
    zkAcls = Ids.CREATOR_ALL_ACL;
  }

  // Parse authentication from configuration.
  String zkAuthConf = conf.get(ZK_AUTH_KEY);
  zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
  List<ZKAuthInfo> zkAuths;
  if (zkAuthConf != null) {
    zkAuths = ZKUtil.parseAuth(zkAuthConf);
  } else {
    zkAuths = Collections.emptyList();
  }

  // Sanity check configuration.
  Preconditions.checkArgument(zkQuorum != null,
      "Missing required configuration '%s' for ZooKeeper quorum",
      ZK_QUORUM_KEY);
  Preconditions.checkArgument(zkTimeout > 0,
      "Invalid ZK session timeout %s", zkTimeout);


  elector = new ActiveStandbyElector(zkQuorum,
      zkTimeout, getParentZnode(), zkAcls, zkAuths,
      new ElectorCallbacks());
}