hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [2/3] hbase git commit: HBASE-14866 VerifyReplication and ReplicationAdmin should use full peer configuration for peer connection
Date Mon, 14 Dec 2015 21:52:47 GMT
HBASE-14866 VerifyReplication and ReplicationAdmin should use full peer configuration for peer connection

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
	hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
	hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
	hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
	hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
	hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/229c430f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/229c430f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/229c430f

Branch: refs/heads/0.98
Commit: 229c430fa144f1fc7a418a56e1ab180f5b4e3ff5
Parents: 40d7fa2
Author: Gary Helmling <garyh@apache.org>
Authored: Wed Dec 9 16:47:25 2015 -0800
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Mon Dec 14 13:23:23 2015 -0800

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |  14 +-
 .../replication/ReplicationPeersZKImpl.java     |   9 +-
 .../replication/ReplicationStateZKBase.java     |   3 +-
 .../apache/hadoop/hbase/zookeeper/ZKConfig.java | 261 -----------
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   |  65 ---
 .../hadoop/hbase/zookeeper/TestZKUtil.java      |   2 +-
 .../apache/hadoop/hbase/HBaseConfiguration.java | 102 ++++-
 .../apache/hadoop/hbase/zookeeper/ZKConfig.java | 450 +++++++++++++++++++
 .../hadoop/hbase/TestHBaseConfiguration.java    |  30 ++
 .../hadoop/hbase/zookeeper/TestZKConfig.java    | 126 ++++++
 .../hadoop/hbase/mapreduce/SyncTable.java       |  15 +-
 .../hbase/mapreduce/TableMapReduceUtil.java     |  64 ++-
 .../hbase/mapreduce/TableOutputFormat.java      |  40 +-
 .../replication/VerifyReplication.java          |  25 +-
 .../org/apache/hadoop/hbase/TestZooKeeper.java  |  27 --
 .../replication/TestReplicationAdmin.java       |  34 +-
 .../replication/TestReplicationEndpoint.java    |  15 +-
 .../replication/TestReplicationStateBasic.java  |   4 +-
 .../replication/TestReplicationStateZKImpl.java |   5 +-
 .../hadoop/hbase/zookeeper/TestZKConfig.java    |  56 ---
 20 files changed, 845 insertions(+), 502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 8e25fd0..5fffe75 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -498,7 +497,8 @@ public class ReplicationAdmin implements Closeable {
     }
   }
 
-  private List<ReplicationPeer> listValidReplicationPeers() {
+  @VisibleForTesting
+  List<ReplicationPeer> listValidReplicationPeers() {
     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
     if (peers == null || peers.size() <= 0) {
       return null;
@@ -506,18 +506,16 @@ public class ReplicationAdmin implements Closeable {
     List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
       String peerId = peerEntry.getKey();
-      String clusterKey = peerEntry.getValue().getClusterKey();
-      Configuration peerConf = new Configuration(this.connection.getConfiguration());
       Stat s = null;
       try {
-        ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
+        Configuration peerConf = pair.getSecond();
         ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
         s =
             zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
               null);
         if (null == s) {
-          LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
+          LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now.");
           continue;
         }
         validPeers.add(peer);
@@ -536,10 +534,6 @@ public class ReplicationAdmin implements Closeable {
         LOG.debug("Failure details to get valid replication peers.", e);
         Thread.currentThread().interrupt();
         continue;
-      } catch (IOException e) {
-        LOG.warn("Failed to get valid replication peers due to IOException.");
-        LOG.debug("Failure details to get valid replication peers.", e);
-        continue;
       }
     }
     return validPeers;

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 885b2e1..4d977c0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -29,10 +29,11 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
@@ -291,11 +292,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       return null;
     }
 
-    Configuration otherConf = new Configuration(this.conf);
+    Configuration otherConf;
     try {
-      if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
-        ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
-      }
+      otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
     } catch (IOException e) {
       LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
       return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 3941be5..7d8455b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -69,7 +70,7 @@ public abstract class ReplicationStateZKBase {
     String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
     String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
     this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
-    this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
+    this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
     this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
     this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
deleted file mode 100644
index 37b68b9..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Utility methods for reading, and building the ZooKeeper configuration.
- */
-@InterfaceAudience.Private
-public class ZKConfig {
-  private static final Log LOG = LogFactory.getLog(ZKConfig.class);
-
-  private static final String VARIABLE_START = "${";
-  private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
-  private static final String VARIABLE_END = "}";
-  private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
-
-  /**
-   * Make a Properties object holding ZooKeeper config.
-   * Parses the corresponding config options from the HBase XML configs
-   * and generates the appropriate ZooKeeper properties.
-   * @param conf Configuration to read from.
-   * @return Properties holding mappings representing ZooKeeper config file.
-   */
-  public static Properties makeZKProps(Configuration conf) {
-    if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, false)) {
-      LOG.warn(
-          "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME +
-          " file for ZK properties " +
-          "has been deprecated. Please instead place all ZK related HBase " +
-          "configuration under the hbase-site.xml, using prefixes " +
-          "of the form '" + HConstants.ZK_CFG_PROPERTY_PREFIX + "', and " +
-          "set property '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG +
-          "' to false");
-      // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read
-      // it and grab its configuration properties.
-      ClassLoader cl = HQuorumPeer.class.getClassLoader();
-      final InputStream inputStream =
-        cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME);
-      if (inputStream != null) {
-        try {
-          return parseZooCfg(conf, inputStream);
-        } catch (IOException e) {
-          LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME +
-                   ", loading from XML files", e);
-        }
-      }
-    } else {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Skipped reading ZK properties file '" + HConstants.ZOOKEEPER_CONFIG_NAME +
-          "' since '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG + "' was not set to true");
-      }
-    }
-
-    // Otherwise, use the configuration options from HBase's XML files.
-    Properties zkProperties = new Properties();
-
-    // Directly map all of the hbase.zookeeper.property.KEY properties.
-    for (Entry<String, String> entry : new Configuration(conf)) { // copy for mt safety
-      String key = entry.getKey();
-      if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
-        String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
-        String value = entry.getValue();
-        // If the value has variables substitutions, need to do a get.
-        if (value.contains(VARIABLE_START)) {
-          value = conf.get(key);
-        }
-        zkProperties.put(zkKey, value);
-      }
-    }
-
-    // If clientPort is not set, assign the default.
-    if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
-      zkProperties.put(HConstants.CLIENT_PORT_STR,
-          HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-    }
-
-    // Create the server.X properties.
-    int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
-    int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
-
-    final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
-                                                 HConstants.LOCALHOST);
-    for (int i = 0; i < serverHosts.length; ++i) {
-      String serverHost = serverHosts[i];
-      String address = serverHost + ":" + peerPort + ":" + leaderPort;
-      String key = "server." + i;
-      zkProperties.put(key, address);
-    }
-
-    return zkProperties;
-  }
-
-  /**
-   * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
-   * This method is used for testing so we can pass our own InputStream.
-   * @param conf HBaseConfiguration to use for injecting variables.
-   * @param inputStream InputStream to read from.
-   * @return Properties parsed from config stream with variables substituted.
-   * @throws IOException if anything goes wrong parsing config
-   * @deprecated in 0.96 onwards. HBase will no longer rely on zoo.cfg
-   * availability.
-   */
-  @Deprecated
-  public static Properties parseZooCfg(Configuration conf,
-      InputStream inputStream) throws IOException {
-    Properties properties = new Properties();
-    try {
-      properties.load(inputStream);
-    } catch (IOException e) {
-      final String msg = "fail to read properties from "
-        + HConstants.ZOOKEEPER_CONFIG_NAME;
-      LOG.fatal(msg);
-      throw new IOException(msg, e);
-    }
-    for (Entry<Object, Object> entry : properties.entrySet()) {
-      String value = entry.getValue().toString().trim();
-      String key = entry.getKey().toString().trim();
-      StringBuilder newValue = new StringBuilder();
-      int varStart = value.indexOf(VARIABLE_START);
-      int varEnd = 0;
-      while (varStart != -1) {
-        varEnd = value.indexOf(VARIABLE_END, varStart);
-        if (varEnd == -1) {
-          String msg = "variable at " + varStart + " has no end marker";
-          LOG.fatal(msg);
-          throw new IOException(msg);
-        }
-        String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
-
-        String substituteValue = System.getProperty(variable);
-        if (substituteValue == null) {
-          substituteValue = conf.get(variable);
-        }
-        if (substituteValue == null) {
-          String msg = "variable " + variable + " not set in system property "
-                     + "or hbase configs";
-          LOG.fatal(msg);
-          throw new IOException(msg);
-        }
-
-        newValue.append(substituteValue);
-
-        varEnd += VARIABLE_END_LENGTH;
-        varStart = value.indexOf(VARIABLE_START, varEnd);
-      }
-      // Special case for 'hbase.cluster.distributed' property being 'true'
-      if (key.startsWith("server.")) {
-        boolean mode = conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
-        if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) {
-          String msg = "The server in zoo.cfg cannot be set to localhost " +
-              "in a fully-distributed setup because it won't be reachable. " +
-              "See \"Getting Started\" for more information.";
-          LOG.fatal(msg);
-          throw new IOException(msg);
-        }
-      }
-      newValue.append(value.substring(varEnd));
-      properties.setProperty(key, newValue.toString());
-    }
-    return properties;
-  }
-
-  /**
-   * Return the ZK Quorum servers string given zk properties returned by
-   * makeZKProps
-   * @param properties
-   * @return Quorum servers String
-   */
-  public static String getZKQuorumServersString(Properties properties) {
-    String clientPort = null;
-    List<String> servers = new ArrayList<String>();
-
-    // The clientPort option may come after the server.X hosts, so we need to
-    // grab everything and then create the final host:port comma separated list.
-    boolean anyValid = false;
-    for (Entry<Object,Object> property : properties.entrySet()) {
-      String key = property.getKey().toString().trim();
-      String value = property.getValue().toString().trim();
-      if (key.equals("clientPort")) {
-        clientPort = value;
-      }
-      else if (key.startsWith("server.")) {
-        String host = value.substring(0, value.indexOf(':'));
-        servers.add(host);
-        anyValid = true;
-      }
-    }
-
-    if (!anyValid) {
-      LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
-      return null;
-    }
-
-    if (clientPort == null) {
-      LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
-      return null;
-    }
-
-    if (servers.isEmpty()) {
-      LOG.fatal("No servers were found in provided ZooKeeper configuration. " +
-          "HBase must have a ZooKeeper cluster configured for its " +
-          "operation. Ensure that you've configured '" +
-          HConstants.ZOOKEEPER_QUORUM + "' properly.");
-      return null;
-    }
-
-    StringBuilder hostPortBuilder = new StringBuilder();
-    for (int i = 0; i < servers.size(); ++i) {
-      String host = servers.get(i);
-      if (i > 0) {
-        hostPortBuilder.append(',');
-      }
-      hostPortBuilder.append(host);
-      hostPortBuilder.append(':');
-      hostPortBuilder.append(clientPort);
-    }
-
-    return hostPortBuilder.toString();
-  }
-
-  /**
-   * Return the ZK Quorum servers string given the specified configuration.
-   * @param conf
-   * @return Quorum servers
-   */
-  public static String getZKQuorumServersString(Configuration conf) {
-    return getZKQuorumServersString(makeZKProps(conf));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index fd46762..c7bed68 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -343,71 +343,6 @@ public class ZKUtil {
     return path.substring(path.lastIndexOf("/")+1);
   }
 
-  /**
-   * Get the key to the ZK ensemble for this configuration without
-   * adding a name at the end
-   * @param conf Configuration to use to build the key
-   * @return ensemble key without a name
-   */
-  public static String getZooKeeperClusterKey(Configuration conf) {
-    return getZooKeeperClusterKey(conf, null);
-  }
-
-  /**
-   * Get the key to the ZK ensemble for this configuration and append
-   * a name at the end
-   * @param conf Configuration to use to build the key
-   * @param name Name that should be appended at the end if not empty or null
-   * @return ensemble key with a name (if any)
-   */
-  public static String getZooKeeperClusterKey(Configuration conf, String name) {
-    String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll(
-        "[\\t\\n\\x0B\\f\\r]", "");
-    StringBuilder builder = new StringBuilder(ensemble);
-    builder.append(":");
-    builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
-    builder.append(":");
-    builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-    if (name != null && !name.isEmpty()) {
-      builder.append(",");
-      builder.append(name);
-    }
-    return builder.toString();
-  }
-
-  /**
-   * Apply the settings in the given key to the given configuration, this is
-   * used to communicate with distant clusters
-   * @param conf configuration object to configure
-   * @param key string that contains the 3 required configuratins
-   * @throws IOException
-   */
-  public static void applyClusterKeyToConf(Configuration conf, String key)
-      throws IOException{
-    String[] parts = transformClusterKey(key);
-    conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
-    conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]);
-    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
-  }
-
-  /**
-   * Separate the given key into the three configurations it should contain:
-   * hbase.zookeeper.quorum, hbase.zookeeper.client.port
-   * and zookeeper.znode.parent
-   * @param key
-   * @return the three configuration in the described order
-   * @throws IOException
-   */
-  public static String[] transformClusterKey(String key) throws IOException {
-    String[] parts = key.split(":");
-    if (parts.length != 3) {
-      throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
-          HConstants.ZOOKEEPER_QUORUM + ":hbase.zookeeper.client.port:"
-          + HConstants.ZOOKEEPER_ZNODE_PARENT);
-    }
-    return parts;
-  }
-
   //
   // Existence checks and watches
   //

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
index cad1afa..f875195 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
@@ -43,7 +43,7 @@ public class TestZKUtil {
     conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n");
     conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333");
     conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase");
-    String clusterKey = ZKUtil.getZooKeeperClusterKey(conf, "test");
+    String clusterKey = ZKConfig.getZooKeeperClusterKey(conf, "test");
     Assert.assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
     Assert.assertEquals("localhost:3333:hbase,test", clusterKey);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
index 6716492..543a7f1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
@@ -20,14 +20,15 @@ package org.apache.hadoop.hbase;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.Map.Entry;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 
 /**
  * Adds HBase configuration files to a Configuration
@@ -138,12 +139,45 @@ public class HBaseConfiguration extends Configuration {
    * @param srcConf the source configuration
    **/
   public static void merge(Configuration destConf, Configuration srcConf) {
-    for (Entry<String, String> e : srcConf) {
+    for (Map.Entry<String, String> e : srcConf) {
       destConf.set(e.getKey(), e.getValue());
     }
   }
 
   /**
+   * Returns a subset of the configuration properties, matching the given key prefix.
+   * The prefix is stripped from the return keys, ie. when calling with a prefix of "myprefix",
+   * the entry "myprefix.key1 = value1" would be returned as "key1 = value1".  If an entry's
+   * key matches the prefix exactly ("myprefix = value2"), it will <strong>not</strong> be
+   * included in the results, since it would show up as an entry with an empty key.
+   */
+  public static Configuration subset(Configuration srcConf, String prefix) {
+    Configuration newConf = new Configuration(false);
+    for (Map.Entry<String, String> entry : srcConf) {
+      if (entry.getKey().startsWith(prefix)) {
+        String newKey = entry.getKey().substring(prefix.length());
+        // avoid entries that would produce an empty key
+        if (!newKey.isEmpty()) {
+          newConf.set(newKey, entry.getValue());
+        }
+      }
+    }
+    return newConf;
+  }
+
+  /**
+   * Sets all the entries in the provided {@code Map<String, String>} as properties in the
+   * given {@code Configuration}.  Each property will have the specified prefix prepended,
+   * so that the configuration entries are keyed by {@code prefix + entry.getKey()}.
+   */
+  public static void setWithPrefix(Configuration conf, String prefix,
+                                   Iterable<Map.Entry<String, String>> properties) {
+    for (Map.Entry<String, String> entry : properties) {
+      conf.set(prefix + entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
    * @return whether to show HBase Configuration in servlet
    */
   public static boolean isShowConfInServlet() {
@@ -237,7 +271,67 @@ public class HBaseConfiguration extends Configuration {
     return passwd;
   }
 
-  /** For debugging.  Dump configurations to system output as xml format.
+  /**
+   * Generates a {@link Configuration} instance by applying the ZooKeeper cluster key
+   * to the base Configuration.  Note that additional configuration properties may be needed
+   * for a remote cluster, so it is preferable to use
+   * {@link #createClusterConf(Configuration, String, String)}.
+   *
+   * @param baseConf the base configuration to use, containing prefixed override properties
+   * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none
+   *
+   * @return the merged configuration with override properties and cluster key applied
+   *
+   * @see #createClusterConf(Configuration, String, String)
+   */
+  public static Configuration createClusterConf(Configuration baseConf, String clusterKey)
+      throws IOException {
+    return createClusterConf(baseConf, clusterKey, null);
+  }
+
+  /**
+   * Generates a {@link Configuration} instance by applying property overrides prefixed by
+   * a cluster profile key to the base Configuration.  Override properties are extracted by
+   * the {@link #subset(Configuration, String)} method, then the merged on top of the base
+   * Configuration and returned.
+   *
+   * @param baseConf the base configuration to use, containing prefixed override properties
+   * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none
+   * @param overridePrefix the property key prefix to match for override properties,
+   *     or {@code null} if none
+   * @return the merged configuration with override properties and cluster key applied
+   */
+  public static Configuration createClusterConf(Configuration baseConf, String clusterKey,
+                                                String overridePrefix) throws IOException {
+    Configuration clusterConf = HBaseConfiguration.create(baseConf);
+    if (clusterKey != null && !clusterKey.isEmpty()) {
+      applyClusterKeyToConf(clusterConf, clusterKey);
+    }
+
+    if (overridePrefix != null && !overridePrefix.isEmpty()) {
+      Configuration clusterSubset = HBaseConfiguration.subset(clusterConf, overridePrefix);
+      HBaseConfiguration.merge(clusterConf, clusterSubset);
+    }
+    return clusterConf;
+  }
+
+  /**
+   * Apply the settings in the given key to the given configuration, this is
+   * used to communicate with distant clusters
+   * @param conf configuration object to configure
+   * @param key string that contains the 3 required configuratins
+   * @throws IOException
+   */
+  private static void applyClusterKeyToConf(Configuration conf, String key)
+      throws IOException{
+    ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
+    conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString());
+    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort());
+    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent());
+  }
+
+  /**
+   * For debugging.  Dump configurations to system output as xml format.
    * Master and RS configurations can also be dumped using
    * http services. e.g. "curl http://master:60010/dump"
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
new file mode 100644
index 0000000..d23a607
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
@@ -0,0 +1,450 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Utility methods for reading, and building the ZooKeeper configuration.
+ */
+@InterfaceAudience.Private
+public final class ZKConfig {
+  private static final Log LOG = LogFactory.getLog(ZKConfig.class);
+
+  private static final String VARIABLE_START = "${";
+  private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
+  private static final String VARIABLE_END = "}";
+  private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
+
+  private ZKConfig() {
+  }
+
+  /**
+   * Make a Properties object holding ZooKeeper config.
+   * Parses the corresponding config options from the HBase XML configs
+   * and generates the appropriate ZooKeeper properties.
+   * @param conf Configuration to read from.
+   * @return Properties holding mappings representing ZooKeeper config file.
+   */
+  public static Properties makeZKProps(Configuration conf) {
+    if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, false)) {
+      LOG.warn(
+          "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME +
+          " file for ZK properties " +
+          "has been deprecated. Please instead place all ZK related HBase " +
+          "configuration under the hbase-site.xml, using prefixes " +
+          "of the form '" + HConstants.ZK_CFG_PROPERTY_PREFIX + "', and " +
+          "set property '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG +
+          "' to false");
+      // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read
+      // it and grab its configuration properties.
+      ClassLoader cl = ZKConfig.class.getClassLoader();
+      final InputStream inputStream =
+        cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME);
+      if (inputStream != null) {
+        try {
+          return parseZooCfg(conf, inputStream);
+        } catch (IOException e) {
+          LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME +
+                   ", loading from XML files", e);
+        }
+      }
+    } else {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Skipped reading ZK properties file '" + HConstants.ZOOKEEPER_CONFIG_NAME +
+          "' since '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG + "' was not set to true");
+      }
+    }
+
+    // Otherwise, use the configuration options from HBase's XML files.
+    Properties zkProperties = new Properties();
+
+    // Directly map all of the hbase.zookeeper.property.KEY properties.
+    for (Entry<String, String> entry : new Configuration(conf)) { // copy for mt safety
+      String key = entry.getKey();
+      if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
+        String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
+        String value = entry.getValue();
+        // If the value has variables substitutions, need to do a get.
+        if (value.contains(VARIABLE_START)) {
+          value = conf.get(key);
+        }
+        zkProperties.put(zkKey, value);
+      }
+    }
+
+    // If clientPort is not set, assign the default.
+    if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
+      zkProperties.put(HConstants.CLIENT_PORT_STR,
+          HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+    }
+
+    // Create the server.X properties.
+    int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
+    int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
+
+    final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
+                                                 HConstants.LOCALHOST);
+    for (int i = 0; i < serverHosts.length; ++i) {
+      String serverHost = serverHosts[i];
+      String address = serverHost + ":" + peerPort + ":" + leaderPort;
+      String key = "server." + i;
+      zkProperties.put(key, address);
+    }
+
+    return zkProperties;
+  }
+
+  /**
+   * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
+   * This method is used for testing so we can pass our own InputStream.
+   * @param conf HBaseConfiguration to use for injecting variables.
+   * @param inputStream InputStream to read from.
+   * @return Properties parsed from config stream with variables substituted.
+   * @throws IOException if anything goes wrong parsing config
+   * @deprecated in 0.96 onwards. HBase will no longer rely on zoo.cfg
+   * availability.
+   */
+  @Deprecated
+  public static Properties parseZooCfg(Configuration conf,
+      InputStream inputStream) throws IOException {
+    Properties properties = new Properties();
+    try {
+      properties.load(inputStream);
+    } catch (IOException e) {
+      final String msg = "fail to read properties from "
+        + HConstants.ZOOKEEPER_CONFIG_NAME;
+      LOG.fatal(msg);
+      throw new IOException(msg, e);
+    }
+    for (Entry<Object, Object> entry : properties.entrySet()) {
+      String value = entry.getValue().toString().trim();
+      String key = entry.getKey().toString().trim();
+      StringBuilder newValue = new StringBuilder();
+      int varStart = value.indexOf(VARIABLE_START);
+      int varEnd = 0;
+      while (varStart != -1) {
+        varEnd = value.indexOf(VARIABLE_END, varStart);
+        if (varEnd == -1) {
+          String msg = "variable at " + varStart + " has no end marker";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+        String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
+
+        String substituteValue = System.getProperty(variable);
+        if (substituteValue == null) {
+          substituteValue = conf.get(variable);
+        }
+        if (substituteValue == null) {
+          String msg = "variable " + variable + " not set in system property "
+                     + "or hbase configs";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+
+        newValue.append(substituteValue);
+
+        varEnd += VARIABLE_END_LENGTH;
+        varStart = value.indexOf(VARIABLE_START, varEnd);
+      }
+      // Special case for 'hbase.cluster.distributed' property being 'true'
+      if (key.startsWith("server.")) {
+        boolean mode = conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
+        if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) {
+          String msg = "The server in zoo.cfg cannot be set to localhost " +
+              "in a fully-distributed setup because it won't be reachable. " +
+              "See \"Getting Started\" for more information.";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+      }
+      newValue.append(value.substring(varEnd));
+      properties.setProperty(key, newValue.toString());
+    }
+    return properties;
+  }
+
+  /**
+   * Return the ZK Quorum servers string given zk properties returned by
+   * makeZKProps
+   * @param properties
+   * @return Quorum servers String
+   */
+  public static String getZKQuorumServersString(Properties properties) {
+    String clientPort = null;
+    List<String> servers = new ArrayList<String>();
+
+    // The clientPort option may come after the server.X hosts, so we need to
+    // grab everything and then create the final host:port comma separated list.
+    boolean anyValid = false;
+    for (Entry<Object,Object> property : properties.entrySet()) {
+      String key = property.getKey().toString().trim();
+      String value = property.getValue().toString().trim();
+      if (key.equals("clientPort")) {
+        clientPort = value;
+      }
+      else if (key.startsWith("server.")) {
+        String host = value.substring(0, value.indexOf(':'));
+        servers.add(host);
+        anyValid = true;
+      }
+    }
+
+    if (!anyValid) {
+      LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+      return null;
+    }
+
+    if (clientPort == null) {
+      LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+      return null;
+    }
+
+    if (servers.isEmpty()) {
+      LOG.fatal("No servers were found in provided ZooKeeper configuration. " +
+          "HBase must have a ZooKeeper cluster configured for its " +
+          "operation. Ensure that you've configured '" +
+          HConstants.ZOOKEEPER_QUORUM + "' properly.");
+      return null;
+    }
+
+    StringBuilder hostPortBuilder = new StringBuilder();
+    for (int i = 0; i < servers.size(); ++i) {
+      String host = servers.get(i);
+      if (i > 0) {
+        hostPortBuilder.append(',');
+      }
+      hostPortBuilder.append(host);
+      hostPortBuilder.append(':');
+      hostPortBuilder.append(clientPort);
+    }
+
+    return hostPortBuilder.toString();
+  }
+
+  /**
+   * Return the ZK Quorum servers string given the specified configuration
+   *
+   * @param conf
+   * @return Quorum servers String
+   */
+  private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
+    String defaultClientPort = Integer.toString(
+        conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
+
+    // Build the ZK quorum server string with "server:clientport" list, separated by ','
+    final String[] serverHosts =
+        conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+    return buildZKQuorumServerString(serverHosts, defaultClientPort);
+  }
+
+  /**
+   * Return the ZK Quorum servers string given the specified configuration.
+   * @param conf
+   * @return Quorum servers
+   */
+  public static String getZKQuorumServersString(Configuration conf) {
+    // First try zoo.cfg; if not applicable, then try config XML.
+    Properties zkProperties = makeZKProps(conf);
+
+    if (zkProperties != null) {
+      return getZKQuorumServersString(zkProperties);
+    }
+
+    return getZKQuorumServersStringFromHbaseConfig(conf);
+  }
+
+  /**
+   * Build the ZK quorum server string with "server:clientport" list, separated by ','
+   *
+   * @param serverHosts a list of servers for ZK quorum
+   * @param clientPort the default client port
+   * @return the string for a list of "server:port" separated by ","
+   */
+  public static String buildZKQuorumServerString(String[] serverHosts, String clientPort) {
+    StringBuilder quorumStringBuilder = new StringBuilder();
+    String serverHost;
+    for (int i = 0; i < serverHosts.length; ++i) {
+      if (serverHosts[i].contains(":")) {
+        serverHost = serverHosts[i]; // just use the port specified from the input
+      } else {
+        serverHost = serverHosts[i] + ":" + clientPort;
+      }
+      if (i > 0) {
+        quorumStringBuilder.append(',');
+      }
+      quorumStringBuilder.append(serverHost);
+    }
+    return quorumStringBuilder.toString();
+  }
+
+  /**
+   * Verifies that the given key matches the expected format for a ZooKeeper cluster key.
+   * The Quorum for the ZK cluster can have one the following formats (see examples below):
+   *
+   * <ol>
+   *   <li>s1,s2,s3 (no client port in the list, the client port could be obtained from
+   *       clientPort)</li>
+   *   <li>s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
+   *       in this case, the clientPort would be ignored)</li>
+   *   <li>s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
+   *       the clientPort; otherwise, it would use the specified port)</li>
+   * </ol>
+   *
+   * @param key the cluster key to validate
+   * @throws IOException if the key could not be parsed
+   */
+  public static void validateClusterKey(String key) throws IOException {
+    transformClusterKey(key);
+  }
+
+  /**
+   * Separate the given key into the three configurations it should contain:
+   * hbase.zookeeper.quorum, hbase.zookeeper.client.port
+   * and zookeeper.znode.parent
+   * @param key
+   * @return the three configuration in the described order
+   * @throws IOException
+   */
+  public static ZKClusterKey transformClusterKey(String key) throws IOException {
+    String[] parts = key.split(":");
+
+    if (parts.length == 3) {
+      return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
+    }
+
+    if (parts.length > 3) {
+      // The quorum could contain client port in server:clientport format, try to transform more.
+      String zNodeParent = parts [parts.length - 1];
+      String clientPort = parts [parts.length - 2];
+
+      // The first part length is the total length minus the lengths of other parts and minus 2 ":"
+      int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2;
+      String quorumStringInput = key.substring(0, endQuorumIndex);
+      String[] serverHosts = quorumStringInput.split(",");
+
+      // The common case is that every server has its own client port specified - this means
+      // that (total parts - the ZNodeParent part - the ClientPort part) is equal to
+      // (the number of "," + 1) - "+ 1" because the last server has no ",".
+      if ((parts.length - 2) == (serverHosts.length + 1)) {
+        return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent);
+      }
+
+      // For the uncommon case that some servers has no port specified, we need to build the
+      // server:clientport list using default client port for servers without specified port.
+      return new ZKClusterKey(
+          buildZKQuorumServerString(serverHosts, clientPort),
+          Integer.parseInt(clientPort),
+          zNodeParent);
+    }
+
+    throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
+        HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
+        + HConstants.ZOOKEEPER_ZNODE_PARENT);
+  }
+
+  /**
+   * Get the key to the ZK ensemble for this configuration without
+   * adding a name at the end
+   * @param conf Configuration to use to build the key
+   * @return ensemble key without a name
+   */
+  public static String getZooKeeperClusterKey(Configuration conf) {
+    return getZooKeeperClusterKey(conf, null);
+  }
+
+  /**
+   * Get the key to the ZK ensemble for this configuration and append
+   * a name at the end
+   * @param conf Configuration to use to build the key
+   * @param name Name that should be appended at the end if not empty or null
+   * @return ensemble key with a name (if any)
+   */
+  public static String getZooKeeperClusterKey(Configuration conf, String name) {
+    String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll(
+        "[\\t\\n\\x0B\\f\\r]", "");
+    StringBuilder builder = new StringBuilder(ensemble);
+    builder.append(":");
+    builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
+    builder.append(":");
+    builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+    if (name != null && !name.isEmpty()) {
+      builder.append(",");
+      builder.append(name);
+    }
+    return builder.toString();
+  }
+
+  /**
+   * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ','
+   * @param quorumStringInput a string contains a list of servers for ZK quorum
+   * @param clientPort the default client port
+   * @return the string for a list of "server:port" separated by ","
+   */
+  @VisibleForTesting
+  public static String standardizeZKQuorumServerString(String quorumStringInput,
+      String clientPort) {
+    String[] serverHosts = quorumStringInput.split(",");
+    return buildZKQuorumServerString(serverHosts, clientPort);
+  }
+
+  // The Quorum for the ZK cluster can have one the following format (see examples below):
+  // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort)
+  // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
+  //      in this case, the clientPort would be ignored)
+  // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
+  //      the clientPort; otherwise, it would use the specified port)
+  @VisibleForTesting
+  public static class ZKClusterKey {
+    private String quorumString;
+    private int clientPort;
+    private String znodeParent;
+
+    ZKClusterKey(String quorumString, int clientPort, String znodeParent) {
+      this.quorumString = quorumString;
+      this.clientPort = clientPort;
+      this.znodeParent = znodeParent;
+    }
+
+    public String getQuorumString() {
+      return quorumString;
+    }
+
+    public int getClientPort() {
+      return clientPort;
+    }
+
+    public String getZnodeParent() {
+      return znodeParent;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
index da3e115..3d8560e 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
@@ -26,6 +27,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -64,6 +66,34 @@ public class TestHBaseConfiguration {
   }
 
   @Test
+  public void testSubset() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    // subset is used in TableMapReduceUtil#initCredentials to support different security
+    // configurations between source and destination clusters, so we'll use that as an example
+    String prefix = "hbase.mapred.output.";
+    conf.set("hbase.security.authentication", "kerberos");
+    conf.set("hbase.regionserver.kerberos.principal", "hbasesource");
+    HBaseConfiguration.setWithPrefix(conf, prefix,
+        ImmutableMap.of(
+            "hbase.regionserver.kerberos.principal", "hbasedest",
+            "", "shouldbemissing")
+            .entrySet());
+
+    Configuration subsetConf = HBaseConfiguration.subset(conf, prefix);
+    assertNull(subsetConf.get(prefix + "hbase.regionserver.kerberos.principal"));
+    assertEquals("hbasedest", subsetConf.get("hbase.regionserver.kerberos.principal"));
+    assertNull(subsetConf.get("hbase.security.authentication"));
+    assertNull(subsetConf.get(""));
+
+    Configuration mergedConf = HBaseConfiguration.create(conf);
+    HBaseConfiguration.merge(mergedConf, subsetConf);
+
+    assertEquals("hbasedest", mergedConf.get("hbase.regionserver.kerberos.principal"));
+    assertEquals("kerberos", mergedConf.get("hbase.security.authentication"));
+    assertEquals("shouldbemissing", mergedConf.get(prefix));
+  }
+
+  @Test
   public void testGetPassword() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     conf.set(ReflectiveCredentialProviderClient.CREDENTIAL_PROVIDER_PATH,

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
new file mode 100644
index 0000000..7879aea
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestZKConfig {
+
+  @Test
+  public void testZKConfigLoading() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    // Test that we read only from the config instance
+    // (i.e. via hbase-default.xml and hbase-site.xml)
+    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
+    Properties props = ZKConfig.makeZKProps(conf);
+    assertEquals("Property client port should have been default from the HBase config",
+                        "2181",
+                        props.getProperty("clientPort"));
+  }
+
+  @Test
+  public void testGetZooKeeperClusterKey() {
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n");
+    conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333");
+    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase");
+    String clusterKey = ZKConfig.getZooKeeperClusterKey(conf, "test");
+    assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
+    assertEquals("localhost:3333:hbase,test", clusterKey);
+  }
+
+  @Test
+  public void testClusterKey() throws Exception {
+    testKey("server", 2181, "hbase");
+    testKey("server1,server2,server3", 2181, "hbase");
+    try {
+      ZKConfig.validateClusterKey("2181:hbase");
+    } catch (IOException ex) {
+      // OK
+    }
+  }
+
+  @Test
+  public void testClusterKeyWithMultiplePorts() throws Exception {
+    // server has different port than the default port
+    testKey("server1:2182", 2181, "hbase", true);
+    // multiple servers have their own port
+    testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
+    // one server has no specified port, should use default port
+    testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
+    // the last server has no specified port, should use default port
+    testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
+    // multiple servers have no specified port, should use default port for those servers
+    testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
+    // same server, different ports
+    testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
+    // mix of same server/different port and different server
+    testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
+  }
+
+  private void testKey(String ensemble, int port, String znode)
+      throws IOException {
+    testKey(ensemble, port, znode, false); // not support multiple client ports
+  }
+
+  private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport)
+      throws IOException {
+    Configuration conf = new Configuration();
+    String key = ensemble+":"+port+":"+znode;
+    String ensemble2 = null;
+    ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
+    if (multiplePortSupport) {
+      ensemble2 = ZKConfig.standardizeZKQuorumServerString(ensemble,
+          Integer.toString(port));
+      assertEquals(ensemble2, zkClusterKey.getQuorumString());
+    }
+    else {
+      assertEquals(ensemble, zkClusterKey.getQuorumString());
+    }
+    assertEquals(port, zkClusterKey.getClientPort());
+    assertEquals(znode, zkClusterKey.getZnodeParent());
+
+    conf = HBaseConfiguration.createClusterConf(conf, key);
+    assertEquals(zkClusterKey.getQuorumString(), conf.get(HConstants.ZOOKEEPER_QUORUM));
+    assertEquals(zkClusterKey.getClientPort(), conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1));
+    assertEquals(zkClusterKey.getZnodeParent(), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+
+    String reconstructedKey = ZKConfig.getZooKeeperClusterKey(conf);
+    if (multiplePortSupport) {
+      String key2 = ensemble2 + ":" + port + ":" + znode;
+      assertEquals(key2, reconstructedKey);
+    }
+    else {
+      assertEquals(key, reconstructedKey);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
index 37d91b1..7ad8cfd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -174,8 +173,9 @@ public class SyncTable extends Configured implements Tool {
       
       Configuration conf = context.getConfiguration();
       sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
-      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY);
-      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY);
+      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
+      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
+        TableOutputFormat.OUTPUT_CONF_PREFIX);
       sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
       targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
       dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
@@ -196,13 +196,12 @@ public class SyncTable extends Configured implements Tool {
       targetHasher = new HashTable.ResultHasher();
     }
   
-    private static HConnection openConnection(Configuration conf, String zkClusterConfKey)
+    private static HConnection openConnection(Configuration conf, String zkClusterConfKey,
+                                             String configPrefix)
       throws IOException {
-        Configuration clusterConf = new Configuration(conf);
         String zkCluster = conf.get(zkClusterConfKey);
-        if (zkCluster != null) {
-          ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster);
-        }
+        Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
+            zkCluster, configPrefix);
         return HConnectionManager.createConnection(clusterConf);
     }
     

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index 4766efb..574a049 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -18,6 +18,23 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.yammer.metrics.core.MetricsRegistry;
 
@@ -44,31 +61,15 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-
 /**
+ * 
  * Utility for {@link TableMapper} and {@link TableReducer}
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -460,8 +461,8 @@ public class TableMapReduceUtil {
         String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
         User user = userProvider.getCurrent();
         if (quorumAddress != null) {
-          Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
-          ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
+          Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
+              quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
           HConnection peerConn = HConnectionManager.createConnection(peerConf);
           try {
             TokenUtil.addTokenForJob(peerConn, user, job);
@@ -493,15 +494,30 @@ public class TableMapReduceUtil {
    * @param job The job that requires the permission.
    * @param quorumAddress string that contains the 3 required configuratins
    * @throws IOException When the authentication token cannot be obtained.
+   * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
    */
+  @Deprecated
   public static void initCredentialsForCluster(Job job, String quorumAddress)
       throws IOException {
+    Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
+        quorumAddress);
+    initCredentialsForCluster(job, peerConf);
+  }
+
+  /**
+   * Obtain an authentication token, for the specified cluster, on behalf of the current user
+   * and add it to the credentials for the given map reduce job.
+   *
+   * @param job The job that requires the permission.
+   * @param conf The configuration to use in connecting to the peer cluster
+   * @throws IOException When the authentication token cannot be obtained.
+   */
+  public static void initCredentialsForCluster(Job job, Configuration conf)
+      throws IOException {
     UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
     if (userProvider.isHBaseSecurityEnabled()) {
       try {
-        Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
-        ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
-        HConnection peerConn = HConnectionManager.createConnection(peerConf);
+        HConnection peerConn = HConnectionManager.createConnection(conf);
         try {
           TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
         } finally {
@@ -648,7 +664,7 @@ public class TableMapReduceUtil {
     // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
     if (quorumAddress != null) {
       // Calling this will validate the format
-      ZKUtil.transformClusterKey(quorumAddress);
+      ZKConfig.validateClusterKey(quorumAddress);
       conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
     }
     if (serverClass != null && serverImpl != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index 3e7f4c8..d888b3a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -53,8 +52,18 @@ implements Configurable {
 
   private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
 
+  /**
+   * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}.
+   * For keys matching this prefix, the prefix is stripped, and the value is set in the
+   * configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1"
+   * would be set in the configuration as "key1 = value1".  Use this to set properties
+   * which should only be applied to the {@code TableOutputFormat} configuration and not the
+   * input configuration.
+   */
+  public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output.";
+
   /** Job parameter that specifies the output table. */
-  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+  public static final String OUTPUT_TABLE = OUTPUT_CONF_PREFIX + "outputtable";
 
   /**
    * Optional job parameter to specify a peer cluster.
@@ -62,17 +71,15 @@ implements Configurable {
    * source is picked up from <code>hbase-site.xml</code>).
    * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)
    */
-  public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
+  public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum";
 
   /** Optional job parameter to specify peer cluster's ZK client port */
-  public static final String QUORUM_PORT = "hbase.mapred.output.quorum.port";
+  public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "port";
 
   /** Optional specification of the rs class name of the peer cluster */
-  public static final String
-      REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
+  public static final String REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class";
   /** Optional specification of the rs impl name of the peer cluster */
-  public static final String
-      REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
+  public static final String REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl";
 
   /** The configuration. */
   private Configuration conf = null;
@@ -181,22 +188,19 @@ implements Configurable {
 
   @Override
   public void setConf(Configuration otherConf) {
-    this.conf = HBaseConfiguration.create(otherConf);
-
-    String tableName = this.conf.get(OUTPUT_TABLE);
+    String tableName = otherConf.get(OUTPUT_TABLE);
     if(tableName == null || tableName.length() <= 0) {
       throw new IllegalArgumentException("Must specify table name");
     }
 
-    String address = this.conf.get(QUORUM_ADDRESS);
-    int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
-    String serverClass = this.conf.get(REGION_SERVER_CLASS);
-    String serverImpl = this.conf.get(REGION_SERVER_IMPL);
+    String address = otherConf.get(QUORUM_ADDRESS);
+    int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
+    String serverClass = otherConf.get(REGION_SERVER_CLASS);
+    String serverImpl = otherConf.get(REGION_SERVER_IMPL);
 
     try {
-      if (address != null) {
-        ZKUtil.applyClusterKeyToConf(this.conf, address);
-      }
+      this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
+
       if (serverClass != null) {
         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 979cd3a..a900f32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -70,6 +69,7 @@ public class VerifyReplication extends Configured implements Tool {
       LogFactory.getLog(VerifyReplication.class);
 
   public final static String NAME = "verifyrep";
+  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
   static long startTime = 0;
   static long endTime = Long.MAX_VALUE;
   static int versions = -1;
@@ -126,8 +126,8 @@ public class VerifyReplication extends Configured implements Tool {
           @Override
           public Void connect(HConnection conn) throws IOException {
             String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
-            Configuration peerConf = HBaseConfiguration.create(conf);
-            ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
+            Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
+                zkClusterKey, PEER_CONFIG_PREFIX);
 
             HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
             scan.setStartRow(tableSplit.getStartRow());
@@ -194,7 +194,8 @@ public class VerifyReplication extends Configured implements Tool {
     }
   }
 
-  private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
+  private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
+      final Configuration conf) throws IOException {
     ZooKeeperWatcher localZKW = null;
     ReplicationPeerZKImpl peer = null;
     try {
@@ -211,8 +212,8 @@ public class VerifyReplication extends Configured implements Tool {
       if (pair == null) {
         throw new IOException("Couldn't get peer conf!");
       }
-      Configuration peerConf = rp.getPeerConf(peerId).getSecond();
-      return ZKUtil.getZooKeeperClusterKey(peerConf);
+
+      return pair;
     } catch (ReplicationException e) {
       throw new IOException(
           "An error occured while trying to connect to the remove peer cluster", e);
@@ -251,9 +252,14 @@ public class VerifyReplication extends Configured implements Tool {
       conf.set(NAME+".families", families);
     }
 
-    String peerQuorumAddress = getPeerQuorumAddress(conf);
+    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
+    ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
+    String peerQuorumAddress = peerConfig.getClusterKey();
+    LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
+        peerConfig.getConfiguration());
     conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
-    LOG.info("Peer Quorum Address: " + peerQuorumAddress);
+    HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
+        peerConfig.getConfiguration().entrySet());
 
     conf.setInt(NAME + ".versions", versions);
     LOG.info("Number of version: " + versions);
@@ -276,8 +282,9 @@ public class VerifyReplication extends Configured implements Tool {
     TableMapReduceUtil.initTableMapperJob(tableName, scan,
         Verifier.class, null, null, job);
 
+    Configuration peerClusterConf = peerConfigPair.getSecond();
     // Obtain the auth token from peer cluster
-    TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
+    TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
 
     job.setOutputFormatClass(NullOutputFormat.class);
     job.setNumReduceTasks(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
index e30ab22..f30155e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
@@ -341,33 +341,6 @@ public class TestZooKeeper {
     assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null));
   }
 
-  @Test
-  public void testClusterKey() throws Exception {
-    testKey("server", "2181", "hbase");
-    testKey("server1,server2,server3", "2181", "hbase");
-    try {
-      ZKUtil.transformClusterKey("2181:hbase");
-    } catch (IOException ex) {
-      // OK
-    }
-  }
-
-  private void testKey(String ensemble, String port, String znode)
-      throws IOException {
-    Configuration conf = new Configuration();
-    String key = ensemble+":"+port+":"+znode;
-    String[] parts = ZKUtil.transformClusterKey(key);
-    assertEquals(ensemble, parts[0]);
-    assertEquals(port, parts[1]);
-    assertEquals(znode, parts[2]);
-    ZKUtil.applyClusterKeyToConf(conf, key);
-    assertEquals(parts[0], conf.get(HConstants.ZOOKEEPER_QUORUM));
-    assertEquals(parts[1], conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
-    assertEquals(parts[2], conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-    String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf);
-    assertEquals(key, reconstructedKey);
-  }
-
   /**
    * A test for HBASE-3238
    * @throws IOException A connection attempt to zk failed

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 526ba16..0933ff3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -24,7 +24,11 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -33,10 +37,12 @@ import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Lists;
 
-import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 /**
  * Unit testing of ReplicationAdmin
@@ -114,6 +120,28 @@ public class TestReplicationAdmin {
   }
 
   /**
+   * Tests that the peer configuration used by ReplicationAdmin contains all
+   * the peer's properties.
+   */
+  @Test
+  public void testPeerConfig() throws Exception {
+    ReplicationPeerConfig config = new ReplicationPeerConfig();
+    config.setClusterKey(KEY_ONE);
+    config.getConfiguration().put("key1", "value1");
+    config.getConfiguration().put("key2", "value2");
+    admin.addPeer(ID_ONE, config, null);
+
+    List<ReplicationPeer> peers = admin.listValidReplicationPeers();
+    assertEquals(1, peers.size());
+    ReplicationPeer peerOne = peers.get(0);
+    assertNotNull(peerOne);
+    assertEquals("value1", peerOne.getConfiguration().get("key1"));
+    assertEquals("value2", peerOne.getConfiguration().get("key2"));
+
+    admin.removePeer(ID_ONE);
+  }
+
+  /**
    * basic checks that when we add a peer that it is enabled, and that we can disable
    * @throws Exception
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 51a2416..ae4c8e3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -82,7 +82,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   public void testCustomReplicationEndpoint() throws Exception {
     // test installing a custom replication endpoint other than the default one.
     admin.addPeer("testCustomReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
         .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
 
     // check whether the class has been constructed and started
@@ -119,8 +119,11 @@ public class TestReplicationEndpoint extends TestReplicationBase {
 
   @Test
   public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
-    admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate",
-      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
+    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
+    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
+    admin.addPeer(id,
+      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
         .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
     // now replicate some data.
     doPut(row);
@@ -138,10 +141,10 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
   }
 
-  @Test
+  @Test (timeout=120000)
   public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
     admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
         .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
     // now replicate some data.
     doPut(Bytes.toBytes("row1"));

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index f05eceb..696c130 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -27,7 +27,7 @@ import java.util.SortedSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
 import org.junit.Test;
@@ -202,7 +202,7 @@ public abstract class TestReplicationStateBasic {
       fail("There are no connected peers, should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException e) {
     }
-    assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
+    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
     rp.removePeer(ID_ONE);
     rp.peerRemoved(ID_ONE);
     assertNumberOfPeers(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/229c430f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 207201f..86c57c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -76,7 +77,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
     String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234");
     ZKUtil.createWithParents(zkw1, fakeRs);
     ZKClusterId.setClusterId(zkw1, new ClusterId());
-    return ZKUtil.getZooKeeperClusterKey(testConf);
+    return ZKConfig.getZooKeeperClusterKey(testConf);
   }
 
   @Before
@@ -91,7 +92,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
     rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3);
     rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
     rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
-    OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf);
+    OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
     rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
   }
 


Mime
View raw message