hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject hbase git commit: HBASE-14866 VerifyReplication and ReplicationAdmin should use full peer configuration for peer connection
Date Wed, 09 Dec 2015 23:55:58 GMT
Repository: hbase
Updated Branches:
  refs/heads/master ba3aa9a9b -> c1e0fcc26


HBASE-14866 VerifyReplication and ReplicationAdmin should use full peer configuration for peer connection


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c1e0fcc2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c1e0fcc2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c1e0fcc2

Branch: refs/heads/master
Commit: c1e0fcc26d7e7b10f6ce609e1ff0e4e9378dcf4b
Parents: ba3aa9a
Author: Gary Helmling <garyh@apache.org>
Authored: Wed Dec 9 15:52:27 2015 -0800
Committer: Gary Helmling <garyh@apache.org>
Committed: Wed Dec 9 15:52:27 2015 -0800

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |  14 +-
 .../replication/ReplicationPeersZKImpl.java     |   7 +-
 .../replication/ReplicationStateZKBase.java     |   3 +-
 .../apache/hadoop/hbase/zookeeper/ZKConfig.java | 155 ----------
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   | 124 --------
 .../hadoop/hbase/zookeeper/TestZKUtil.java      |  11 -
 .../apache/hadoop/hbase/HBaseConfiguration.java |  78 ++++-
 .../apache/hadoop/hbase/zookeeper/ZKConfig.java | 301 +++++++++++++++++++
 .../hadoop/hbase/TestHBaseConfiguration.java    |  10 +-
 .../hadoop/hbase/zookeeper/TestZKConfig.java    | 126 ++++++++
 .../hadoop/hbase/mapreduce/SyncTable.java       |  15 +-
 .../hbase/mapreduce/TableMapReduceUtil.java     |  34 ++-
 .../hbase/mapreduce/TableOutputFormat.java      |  22 +-
 .../replication/VerifyReplication.java          |  25 +-
 .../hbase/util/ServerRegionReplicaUtil.java     |   4 +-
 .../org/apache/hadoop/hbase/TestZooKeeper.java  |  65 ----
 .../replication/TestReplicationAdmin.java       |  36 ++-
 .../replication/TestReplicationEndpoint.java    |  10 +-
 .../replication/TestReplicationStateBasic.java  |   4 +-
 .../replication/TestReplicationStateZKImpl.java |   5 +-
 .../TestRegionReplicaReplicationEndpoint.java   |   8 +-
 .../hadoop/hbase/zookeeper/TestZKConfig.java    |  45 ---
 22 files changed, 620 insertions(+), 482 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 8bd1267..a0bea8b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -626,7 +625,8 @@ public class ReplicationAdmin implements Closeable {
     }
   }
 
-  private List<ReplicationPeer> listValidReplicationPeers() {
+  @VisibleForTesting
+  List<ReplicationPeer> listValidReplicationPeers() {
     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
     if (peers == null || peers.size() <= 0) {
       return null;
@@ -634,18 +634,16 @@ public class ReplicationAdmin implements Closeable {
     List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
       String peerId = peerEntry.getKey();
-      String clusterKey = peerEntry.getValue().getClusterKey();
-      Configuration peerConf = new Configuration(this.connection.getConfiguration());
       Stat s = null;
       try {
-        ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
+        Configuration peerConf = pair.getSecond();
         ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
         s =
             zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
               null);
         if (null == s) {
-          LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
+          LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now.");
           continue;
         }
         validPeers.add(peer);
@@ -664,10 +662,6 @@ public class ReplicationAdmin implements Closeable {
         LOG.debug("Failure details to get valid replication peers.", e);
         Thread.currentThread().interrupt();
         continue;
-      } catch (IOException e) {
-        LOG.warn("Failed to get valid replication peers due to IOException.");
-        LOG.debug("Failure details to get valid replication peers.", e);
-        continue;
       }
     }
     return validPeers;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 1884469..63f9ac3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
@@ -318,11 +319,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       return null;
     }
 
-    Configuration otherConf = new Configuration(this.conf);
+    Configuration otherConf;
     try {
-      if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
-        ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
-      }
+      otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
     } catch (IOException e) {
       LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
       return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 1691b3f..4fbac0f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -69,7 +70,7 @@ public abstract class ReplicationStateZKBase {
     String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
     String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
     this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
-    this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
+    this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
     this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
     this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
deleted file mode 100644
index a8f1182..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Utility methods for reading, and building the ZooKeeper configuration.
- *
- * The order and priority for reading the config are as follows:
- * (1). Property with "hbase.zookeeper.property." prefix from HBase XML
- * (2). other zookeeper related properties in HBASE XML
- */
-@InterfaceAudience.Private
-public class ZKConfig {
-
-  private static final String VARIABLE_START = "${";
-
-  /**
-   * Make a Properties object holding ZooKeeper config.
-   * Parses the corresponding config options from the HBase XML configs
-   * and generates the appropriate ZooKeeper properties.
-   * @param conf Configuration to read from.
-   * @return Properties holding mappings representing ZooKeeper config file.
-   */
-  public static Properties makeZKProps(Configuration conf) {
-    return makeZKPropsFromHbaseConfig(conf);
-  }
-
-  /**
-   * Make a Properties object holding ZooKeeper config.
-   * Parses the corresponding config options from the HBase XML configs
-   * and generates the appropriate ZooKeeper properties.
-   *
-   * @param conf Configuration to read from.
-   * @return Properties holding mappings representing ZooKeeper config file.
-   */
-  private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
-    Properties zkProperties = new Properties();
-
-    // Directly map all of the hbase.zookeeper.property.KEY properties.
-    // Synchronize on conf so no loading of configs while we iterate
-    synchronized (conf) {
-      for (Entry<String, String> entry : conf) {
-        String key = entry.getKey();
-        if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
-          String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
-          String value = entry.getValue();
-          // If the value has variables substitutions, need to do a get.
-          if (value.contains(VARIABLE_START)) {
-            value = conf.get(key);
-          }
-          zkProperties.setProperty(zkKey, value);
-        }
-      }
-    }
-
-    // If clientPort is not set, assign the default.
-    if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
-      zkProperties.put(HConstants.CLIENT_PORT_STR,
-          HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-    }
-
-    // Create the server.X properties.
-    int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
-    int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
-
-    final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
-                                                 HConstants.LOCALHOST);
-    String serverHost;
-    String address;
-    String key;
-    for (int i = 0; i < serverHosts.length; ++i) {
-      if (serverHosts[i].contains(":")) {
-        serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':'));
-      } else {
-        serverHost = serverHosts[i];
-      }
-      address = serverHost + ":" + peerPort + ":" + leaderPort;
-      key = "server." + i;
-      zkProperties.put(key, address);
-    }
-
-    return zkProperties;
-  }
-
-  /**
-   * Return the ZK Quorum servers string given the specified configuration
-   *
-   * @param conf
-   * @return Quorum servers String
-   */
-  private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
-    String defaultClientPort = Integer.toString(
-        conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
-
-    // Build the ZK quorum server string with "server:clientport" list, separated by ','
-    final String[] serverHosts =
-        conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
-    return buildQuorumServerString(serverHosts, defaultClientPort);
-  }
-
-  /**
-   * Build the ZK quorum server string with "server:clientport" list, separated by ','
-   *
-   * @param serverHosts a list of servers for ZK quorum
-   * @param clientPort the default client port
-   * @return the string for a list of "server:port" separated by ","
-   */
-  public static String buildQuorumServerString(String[] serverHosts, String clientPort) {
-    StringBuilder quorumStringBuilder = new StringBuilder();
-    String serverHost;
-    for (int i = 0; i < serverHosts.length; ++i) {
-      if (serverHosts[i].contains(":")) {
-        serverHost = serverHosts[i]; // just use the port specified from the input
-      } else {
-        serverHost = serverHosts[i] + ":" + clientPort;
-      }
-      if (i > 0) {
-        quorumStringBuilder.append(',');
-      }
-      quorumStringBuilder.append(serverHost);
-    }
-    return quorumStringBuilder.toString();
-  }
-
-  /**
-   * Return the ZK Quorum servers string given the specified configuration.
-   * @return Quorum servers
-   */
-  public static String getZKQuorumServersString(Configuration conf) {
-    return getZKQuorumServersStringFromHbaseConfig(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 633525f..c268268 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -76,7 +76,6 @@ import org.apache.zookeeper.proto.DeleteRequest;
 import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.server.ZooKeeperSaslServer;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
@@ -96,25 +95,6 @@ public class ZKUtil {
   public static final char ZNODE_PATH_SEPARATOR = '/';
   private static int zkDumpConnectionTimeOut;
 
-  // The Quorum for the ZK cluster can have one the following format (see examples below):
-  // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort)
-  // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
-  //      in this case, the clientPort would be ignored)
-  // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
-  //      the clientPort; otherwise, it would use the specified port)
-  @VisibleForTesting
-  public static class ZKClusterKey {
-    public String quorumString;
-    public int clientPort;
-    public String znodeParent;
-
-    ZKClusterKey(String quorumString, int clientPort, String znodeParent) {
-      this.quorumString = quorumString;
-      this.clientPort = clientPort;
-      this.znodeParent = znodeParent;
-    }
-  }
-
   /**
    * Creates a new connection to ZooKeeper, pulling settings and ensemble config
    * from the specified configuration object using methods from {@link ZKConfig}.
@@ -361,110 +341,6 @@ public class ZKUtil {
     return path.substring(path.lastIndexOf("/")+1);
   }
 
-  /**
-   * Get the key to the ZK ensemble for this configuration without
-   * adding a name at the end
-   * @param conf Configuration to use to build the key
-   * @return ensemble key without a name
-   */
-  public static String getZooKeeperClusterKey(Configuration conf) {
-    return getZooKeeperClusterKey(conf, null);
-  }
-
-  /**
-   * Get the key to the ZK ensemble for this configuration and append
-   * a name at the end
-   * @param conf Configuration to use to build the key
-   * @param name Name that should be appended at the end if not empty or null
-   * @return ensemble key with a name (if any)
-   */
-  public static String getZooKeeperClusterKey(Configuration conf, String name) {
-    String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll(
-        "[\\t\\n\\x0B\\f\\r]", "");
-    StringBuilder builder = new StringBuilder(ensemble);
-    builder.append(":");
-    builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
-    builder.append(":");
-    builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-    if (name != null && !name.isEmpty()) {
-      builder.append(",");
-      builder.append(name);
-    }
-    return builder.toString();
-  }
-
-  /**
-   * Apply the settings in the given key to the given configuration, this is
-   * used to communicate with distant clusters
-   * @param conf configuration object to configure
-   * @param key string that contains the 3 required configuratins
-   * @throws IOException
-   */
-  public static void applyClusterKeyToConf(Configuration conf, String key)
-      throws IOException{
-    ZKClusterKey zkClusterKey = transformClusterKey(key);
-    conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.quorumString);
-    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.clientPort);
-    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.znodeParent);
-  }
-
-  /**
-   * Separate the given key into the three configurations it should contain:
-   * hbase.zookeeper.quorum, hbase.zookeeper.client.port
-   * and zookeeper.znode.parent
-   * @param key
-   * @return the three configuration in the described order
-   * @throws IOException
-   */
-  public static ZKClusterKey transformClusterKey(String key) throws IOException {
-    String[] parts = key.split(":");
-
-    if (parts.length == 3) {
-      return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
-    }
-
-    if (parts.length > 3) {
-      // The quorum could contain client port in server:clientport format, try to transform more.
-      String zNodeParent = parts [parts.length - 1];
-      String clientPort = parts [parts.length - 2];
-
-      // The first part length is the total length minus the lengths of other parts and minus 2 ":"
-      int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2;
-      String quorumStringInput = key.substring(0, endQuorumIndex);
-      String[] serverHosts = quorumStringInput.split(",");
-
-      // The common case is that every server has its own client port specified - this means
-      // that (total parts - the ZNodeParent part - the ClientPort part) is equal to
-      // (the number of "," + 1) - "+ 1" because the last server has no ",".
-      if ((parts.length - 2) == (serverHosts.length + 1)) {
-        return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent);
-      }
-
-      // For the uncommon case that some servers has no port specified, we need to build the
-      // server:clientport list using default client port for servers without specified port.
-      return new ZKClusterKey(
-        ZKConfig.buildQuorumServerString(serverHosts, clientPort),
-        Integer.parseInt(clientPort),
-        zNodeParent);
-    }
-
-    throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
-          HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
-          + HConstants.ZOOKEEPER_ZNODE_PARENT);
-  }
-
-  /**
-   * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ','
-   * @param quorumStringInput a string contains a list of servers for ZK quorum
-   * @param clientPort the default client port
-   * @return the string for a list of "server:port" separated by ","
-   */
-  @VisibleForTesting
-  public static String standardizeQuorumServerString(String quorumStringInput, String clientPort) {
-    String[] serverHosts = quorumStringInput.split(",");
-    return ZKConfig.buildQuorumServerString(serverHosts, clientPort);
-  }
-
   //
   // Existence checks and watches
   //

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
index 72de935..eb629f2 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
@@ -41,17 +41,6 @@ import org.junit.experimental.categories.Category;
 public class TestZKUtil {
 
   @Test
-  public void testGetZooKeeperClusterKey() {
-    Configuration conf = HBaseConfiguration.create();
-    conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n");
-    conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333");
-    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase");
-    String clusterKey = ZKUtil.getZooKeeperClusterKey(conf, "test");
-    Assert.assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
-    Assert.assertEquals("localhost:3333:hbase,test", clusterKey);
-  }
-  
-  @Test
   public void testCreateACL() throws ZooKeeperConnectionException, IOException {
     Configuration conf = HBaseConfiguration.create();
     conf.set(Superusers.SUPERUSER_CONF_KEY, "user1,@group1,user2,@group2,user3");

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
index 7a037f4..7b94c3d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hbase;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.Map.Entry;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
 import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 
 /**
  * Adds HBase configuration files to a Configuration
@@ -115,7 +116,7 @@ public class HBaseConfiguration extends Configuration {
    * @param srcConf the source configuration
    **/
   public static void merge(Configuration destConf, Configuration srcConf) {
-    for (Entry<String, String> e : srcConf) {
+    for (Map.Entry<String, String> e : srcConf) {
       destConf.set(e.getKey(), e.getValue());
     }
   }
@@ -129,7 +130,7 @@ public class HBaseConfiguration extends Configuration {
    */
   public static Configuration subset(Configuration srcConf, String prefix) {
     Configuration newConf = new Configuration(false);
-    for (Entry<String, String> entry : srcConf) {
+    for (Map.Entry<String, String> entry : srcConf) {
       if (entry.getKey().startsWith(prefix)) {
         String newKey = entry.getKey().substring(prefix.length());
         // avoid entries that would produce an empty key
@@ -142,6 +143,18 @@ public class HBaseConfiguration extends Configuration {
   }
 
   /**
+   * Sets all the entries in the provided {@code Map<String, String>} as properties in the
+   * given {@code Configuration}.  Each property will have the specified prefix prepended,
+   * so that the configuration entries are keyed by {@code prefix + entry.getKey()}.
+   */
+  public static void setWithPrefix(Configuration conf, String prefix,
+                                   Iterable<Map.Entry<String, String>> properties) {
+    for (Map.Entry<String, String> entry : properties) {
+      conf.set(prefix + entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
    * @return whether to show HBase Configuration in servlet
    */
   public static boolean isShowConfInServlet() {
@@ -236,6 +249,65 @@ public class HBaseConfiguration extends Configuration {
   }
 
   /**
+   * Generates a {@link Configuration} instance by applying the ZooKeeper cluster key
+   * to the base Configuration.  Note that additional configuration properties may be needed
+   * for a remote cluster, so it is preferable to use
+   * {@link #createClusterConf(Configuration, String, String)}.
+   *
+   * @param baseConf the base configuration to use, containing prefixed override properties
+   * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none
+   *
+   * @return the merged configuration with override properties and cluster key applied
+   *
+   * @see #createClusterConf(Configuration, String, String)
+   */
+  public static Configuration createClusterConf(Configuration baseConf, String clusterKey)
+      throws IOException {
+    return createClusterConf(baseConf, clusterKey, null);
+  }
+
+  /**
+   * Generates a {@link Configuration} instance by applying property overrides prefixed by
+   * a cluster profile key to the base Configuration.  Override properties are extracted by
+   * the {@link #subset(Configuration, String)} method, then the merged on top of the base
+   * Configuration and returned.
+   *
+   * @param baseConf the base configuration to use, containing prefixed override properties
+   * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none
+   * @param overridePrefix the property key prefix to match for override properties,
+   *     or {@code null} if none
+   * @return the merged configuration with override properties and cluster key applied
+   */
+  public static Configuration createClusterConf(Configuration baseConf, String clusterKey,
+                                                String overridePrefix) throws IOException {
+    Configuration clusterConf = HBaseConfiguration.create(baseConf);
+    if (clusterKey != null && !clusterKey.isEmpty()) {
+      applyClusterKeyToConf(clusterConf, clusterKey);
+    }
+
+    if (overridePrefix != null && !overridePrefix.isEmpty()) {
+      Configuration clusterSubset = HBaseConfiguration.subset(clusterConf, overridePrefix);
+      HBaseConfiguration.merge(clusterConf, clusterSubset);
+    }
+    return clusterConf;
+  }
+
+  /**
+   * Apply the settings in the given key to the given configuration, this is
+   * used to communicate with distant clusters
+   * @param conf configuration object to configure
+   * @param key string that contains the 3 required configuratins
+   * @throws IOException
+   */
+  private static void applyClusterKeyToConf(Configuration conf, String key)
+      throws IOException{
+    ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
+    conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString());
+    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort());
+    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent());
+  }
+
+  /**
    * For debugging.  Dump configurations to system output as xml format.
    * Master and RS configurations can also be dumped using
    * http services. e.g. "curl http://master:16010/dump"

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
new file mode 100644
index 0000000..fe7396a
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
@@ -0,0 +1,301 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Utility methods for reading, and building the ZooKeeper configuration.
+ *
+ * The order and priority for reading the config are as follows:
+ * (1). Property with "hbase.zookeeper.property." prefix from HBase XML
+ * (2). other zookeeper related properties in HBASE XML
+ */
+@InterfaceAudience.Private
+public final class ZKConfig {
+
+  private static final String VARIABLE_START = "${";
+
+  private ZKConfig() {
+  }
+
+  /**
+   * Make a Properties object holding ZooKeeper config.
+   * Parses the corresponding config options from the HBase XML configs
+   * and generates the appropriate ZooKeeper properties.
+   * @param conf Configuration to read from.
+   * @return Properties holding mappings representing ZooKeeper config file.
+   */
+  public static Properties makeZKProps(Configuration conf) {
+    return makeZKPropsFromHbaseConfig(conf);
+  }
+
+  /**
+   * Make a Properties object holding ZooKeeper config.
+   * Parses the corresponding config options from the HBase XML configs
+   * and generates the appropriate ZooKeeper properties.
+   *
+   * @param conf Configuration to read from.
+   * @return Properties holding mappings representing ZooKeeper config file.
+   */
+  private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
+    Properties zkProperties = new Properties();
+
+    // Directly map all of the hbase.zookeeper.property.KEY properties.
+    // Synchronize on conf so no loading of configs while we iterate
+    synchronized (conf) {
+      for (Entry<String, String> entry : conf) {
+        String key = entry.getKey();
+        if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
+          String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
+          String value = entry.getValue();
+          // If the value has variables substitutions, need to do a get.
+          if (value.contains(VARIABLE_START)) {
+            value = conf.get(key);
+          }
+          zkProperties.setProperty(zkKey, value);
+        }
+      }
+    }
+
+    // If clientPort is not set, assign the default.
+    if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
+      zkProperties.put(HConstants.CLIENT_PORT_STR,
+          HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+    }
+
+    // Create the server.X properties.
+    int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
+    int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
+
+    final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
+                                                 HConstants.LOCALHOST);
+    String serverHost;
+    String address;
+    String key;
+    for (int i = 0; i < serverHosts.length; ++i) {
+      if (serverHosts[i].contains(":")) {
+        serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':'));
+      } else {
+        serverHost = serverHosts[i];
+      }
+      address = serverHost + ":" + peerPort + ":" + leaderPort;
+      key = "server." + i;
+      zkProperties.put(key, address);
+    }
+
+    return zkProperties;
+  }
+
+  /**
+   * Return the ZK Quorum servers string given the specified configuration
+   *
+   * @param conf
+   * @return Quorum servers String
+   */
+  private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
+    String defaultClientPort = Integer.toString(
+        conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
+
+    // Build the ZK quorum server string with "server:clientport" list, separated by ','
+    final String[] serverHosts =
+        conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+    return buildZKQuorumServerString(serverHosts, defaultClientPort);
+  }
+
+  /**
+   * Return the ZK Quorum servers string given the specified configuration.
+   * @return Quorum servers
+   */
+  public static String getZKQuorumServersString(Configuration conf) {
+    return getZKQuorumServersStringFromHbaseConfig(conf);
+  }
+
+  /**
+   * Build the ZK quorum server string with "server:clientport" list, separated by ','
+   *
+   * @param serverHosts a list of servers for ZK quorum
+   * @param clientPort the default client port
+   * @return the string for a list of "server:port" separated by ","
+   */
+  public static String buildZKQuorumServerString(String[] serverHosts, String clientPort) {
+    StringBuilder quorumStringBuilder = new StringBuilder();
+    String serverHost;
+    for (int i = 0; i < serverHosts.length; ++i) {
+      if (serverHosts[i].contains(":")) {
+        serverHost = serverHosts[i]; // just use the port specified from the input
+      } else {
+        serverHost = serverHosts[i] + ":" + clientPort;
+      }
+      if (i > 0) {
+        quorumStringBuilder.append(',');
+      }
+      quorumStringBuilder.append(serverHost);
+    }
+    return quorumStringBuilder.toString();
+  }
+
+  /**
+   * Verifies that the given key matches the expected format for a ZooKeeper cluster key.
+   * The Quorum for the ZK cluster can have one the following formats (see examples below):
+   *
+   * <ol>
+   *   <li>s1,s2,s3 (no client port in the list, the client port could be obtained from
+   *       clientPort)</li>
+   *   <li>s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
+   *       in this case, the clientPort would be ignored)</li>
+   *   <li>s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
+   *       the clientPort; otherwise, it would use the specified port)</li>
+   * </ol>
+   *
+   * @param key the cluster key to validate
+   * @throws IOException if the key could not be parsed
+   */
+  public static void validateClusterKey(String key) throws IOException {
+    transformClusterKey(key);
+  }
+
+  /**
+   * Separate the given key into the three configurations it should contain:
+   * hbase.zookeeper.quorum, hbase.zookeeper.client.port
+   * and zookeeper.znode.parent
+   * @param key
+   * @return the three configuration in the described order
+   * @throws IOException
+   */
+  public static ZKClusterKey transformClusterKey(String key) throws IOException {
+    String[] parts = key.split(":");
+
+    if (parts.length == 3) {
+      return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
+    }
+
+    if (parts.length > 3) {
+      // The quorum could contain client port in server:clientport format, try to transform more.
+      String zNodeParent = parts [parts.length - 1];
+      String clientPort = parts [parts.length - 2];
+
+      // The first part length is the total length minus the lengths of other parts and minus 2 ":"
+      int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2;
+      String quorumStringInput = key.substring(0, endQuorumIndex);
+      String[] serverHosts = quorumStringInput.split(",");
+
+      // The common case is that every server has its own client port specified - this means
+      // that (total parts - the ZNodeParent part - the ClientPort part) is equal to
+      // (the number of "," + 1) - "+ 1" because the last server has no ",".
+      if ((parts.length - 2) == (serverHosts.length + 1)) {
+        return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent);
+      }
+
+      // For the uncommon case that some servers has no port specified, we need to build the
+      // server:clientport list using default client port for servers without specified port.
+      return new ZKClusterKey(
+          buildZKQuorumServerString(serverHosts, clientPort),
+          Integer.parseInt(clientPort),
+          zNodeParent);
+    }
+
+    throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
+        HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
+        + HConstants.ZOOKEEPER_ZNODE_PARENT);
+  }
+
+  /**
+   * Get the key to the ZK ensemble for this configuration without
+   * adding a name at the end
+   * @param conf Configuration to use to build the key
+   * @return ensemble key without a name
+   */
+  public static String getZooKeeperClusterKey(Configuration conf) {
+    return getZooKeeperClusterKey(conf, null);
+  }
+
+  /**
+   * Get the key to the ZK ensemble for this configuration and append
+   * a name at the end
+   * @param conf Configuration to use to build the key
+   * @param name Name that should be appended at the end if not empty or null
+   * @return ensemble key with a name (if any)
+   */
+  public static String getZooKeeperClusterKey(Configuration conf, String name) {
+    String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll(
+        "[\\t\\n\\x0B\\f\\r]", "");
+    StringBuilder builder = new StringBuilder(ensemble);
+    builder.append(":");
+    builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
+    builder.append(":");
+    builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+    if (name != null && !name.isEmpty()) {
+      builder.append(",");
+      builder.append(name);
+    }
+    return builder.toString();
+  }
+
+  /**
+   * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ','
+   * @param quorumStringInput a string contains a list of servers for ZK quorum
+   * @param clientPort the default client port
+   * @return the string for a list of "server:port" separated by ","
+   */
+  @VisibleForTesting
+  public static String standardizeZKQuorumServerString(String quorumStringInput,
+      String clientPort) {
+    String[] serverHosts = quorumStringInput.split(",");
+    return buildZKQuorumServerString(serverHosts, clientPort);
+  }
+
+  // The Quorum for the ZK cluster can have one the following format (see examples below):
+  // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort)
+  // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
+  //      in this case, the clientPort would be ignored)
+  // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
+  //      the clientPort; otherwise, it would use the specified port)
+  @VisibleForTesting
+  public static class ZKClusterKey {
+    private String quorumString;
+    private int clientPort;
+    private String znodeParent;
+
+    ZKClusterKey(String quorumString, int clientPort, String znodeParent) {
+      this.quorumString = quorumString;
+      this.clientPort = clientPort;
+      this.znodeParent = znodeParent;
+    }
+
+    public String getQuorumString() {
+      return quorumString;
+    }
+
+    public int getClientPort() {
+      return clientPort;
+    }
+
+    public String getZnodeParent() {
+      return znodeParent;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
index c11916f..6c14ef9 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
@@ -27,11 +28,13 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -73,8 +76,11 @@ public class TestHBaseConfiguration {
     String prefix = "hbase.mapred.output.";
     conf.set("hbase.security.authentication", "kerberos");
     conf.set("hbase.regionserver.kerberos.principal", "hbasesource");
-    conf.set(prefix + "hbase.regionserver.kerberos.principal", "hbasedest");
-    conf.set(prefix, "shouldbemissing");
+    HBaseConfiguration.setWithPrefix(conf, prefix,
+        ImmutableMap.of(
+            "hbase.regionserver.kerberos.principal", "hbasedest",
+            "", "shouldbemissing")
+            .entrySet());
 
     Configuration subsetConf = HBaseConfiguration.subset(conf, prefix);
     assertNull(subsetConf.get(prefix + "hbase.regionserver.kerberos.principal"));

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
new file mode 100644
index 0000000..7879aea
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestZKConfig {
+
+  @Test
+  public void testZKConfigLoading() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    // Test that we read only from the config instance
+    // (i.e. via hbase-default.xml and hbase-site.xml)
+    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
+    Properties props = ZKConfig.makeZKProps(conf);
+    assertEquals("Property client port should have been default from the HBase config",
+                        "2181",
+                        props.getProperty("clientPort"));
+  }
+
+  @Test
+  public void testGetZooKeeperClusterKey() {
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n");
+    conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333");
+    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase");
+    String clusterKey = ZKConfig.getZooKeeperClusterKey(conf, "test");
+    assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
+    assertEquals("localhost:3333:hbase,test", clusterKey);
+  }
+
+  @Test
+  public void testClusterKey() throws Exception {
+    testKey("server", 2181, "hbase");
+    testKey("server1,server2,server3", 2181, "hbase");
+    try {
+      ZKConfig.validateClusterKey("2181:hbase");
+    } catch (IOException ex) {
+      // OK
+    }
+  }
+
+  @Test
+  public void testClusterKeyWithMultiplePorts() throws Exception {
+    // server has different port than the default port
+    testKey("server1:2182", 2181, "hbase", true);
+    // multiple servers have their own port
+    testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
+    // one server has no specified port, should use default port
+    testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
+    // the last server has no specified port, should use default port
+    testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
+    // multiple servers have no specified port, should use default port for those servers
+    testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
+    // same server, different ports
+    testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
+    // mix of same server/different port and different server
+    testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
+  }
+
+  private void testKey(String ensemble, int port, String znode)
+      throws IOException {
+    testKey(ensemble, port, znode, false); // not support multiple client ports
+  }
+
+  private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport)
+      throws IOException {
+    Configuration conf = new Configuration();
+    String key = ensemble+":"+port+":"+znode;
+    String ensemble2 = null;
+    ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
+    if (multiplePortSupport) {
+      ensemble2 = ZKConfig.standardizeZKQuorumServerString(ensemble,
+          Integer.toString(port));
+      assertEquals(ensemble2, zkClusterKey.getQuorumString());
+    }
+    else {
+      assertEquals(ensemble, zkClusterKey.getQuorumString());
+    }
+    assertEquals(port, zkClusterKey.getClientPort());
+    assertEquals(znode, zkClusterKey.getZnodeParent());
+
+    conf = HBaseConfiguration.createClusterConf(conf, key);
+    assertEquals(zkClusterKey.getQuorumString(), conf.get(HConstants.ZOOKEEPER_QUORUM));
+    assertEquals(zkClusterKey.getClientPort(), conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1));
+    assertEquals(zkClusterKey.getZnodeParent(), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+
+    String reconstructedKey = ZKConfig.getZooKeeperClusterKey(conf);
+    if (multiplePortSupport) {
+      String key2 = ensemble2 + ":" + port + ":" + znode;
+      assertEquals(key2, reconstructedKey);
+    }
+    else {
+      assertEquals(key, reconstructedKey);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
index 20d6e24..1658ba4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -174,8 +173,9 @@ public class SyncTable extends Configured implements Tool {
       
       Configuration conf = context.getConfiguration();
       sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
-      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY);
-      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY);
+      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
+      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
+          TableOutputFormat.OUTPUT_CONF_PREFIX);
       sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
       targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
       dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
@@ -196,13 +196,12 @@ public class SyncTable extends Configured implements Tool {
       targetHasher = new HashTable.ResultHasher();
     }
   
-    private static Connection openConnection(Configuration conf, String zkClusterConfKey)
+    private static Connection openConnection(Configuration conf, String zkClusterConfKey,
+                                             String configPrefix)
       throws IOException {
-        Configuration clusterConf = new Configuration(conf);
         String zkCluster = conf.get(zkClusterConfKey);
-        if (zkCluster != null) {
-          ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster);
-        }
+        Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
+            zkCluster, configPrefix);
         return ConnectionFactory.createConnection(clusterConf);
     }
     

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index cc8a35c..a48871f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -43,12 +43,11 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 import java.io.File;
 import java.io.IOException;
@@ -485,12 +484,8 @@ public class TableMapReduceUtil {
         String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
         User user = userProvider.getCurrent();
         if (quorumAddress != null) {
-          Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
-          ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
-          // apply any "hbase.mapred.output." configuration overrides
-          Configuration outputOverrides =
-              HBaseConfiguration.subset(peerConf, TableOutputFormat.OUTPUT_CONF_PREFIX);
-          HBaseConfiguration.merge(peerConf, outputOverrides);
+          Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
+              quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
           Connection peerConn = ConnectionFactory.createConnection(peerConf);
           try {
             TokenUtil.addTokenForJob(peerConn, user, job);
@@ -523,15 +518,30 @@ public class TableMapReduceUtil {
    * @param job The job that requires the permission.
    * @param quorumAddress string that contains the 3 required configuratins
    * @throws IOException When the authentication token cannot be obtained.
+   * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
    */
+  @Deprecated
   public static void initCredentialsForCluster(Job job, String quorumAddress)
       throws IOException {
+    Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
+        quorumAddress);
+    initCredentialsForCluster(job, peerConf);
+  }
+
+  /**
+   * Obtain an authentication token, for the specified cluster, on behalf of the current user
+   * and add it to the credentials for the given map reduce job.
+   *
+   * @param job The job that requires the permission.
+   * @param conf The configuration to use in connecting to the peer cluster
+   * @throws IOException When the authentication token cannot be obtained.
+   */
+  public static void initCredentialsForCluster(Job job, Configuration conf)
+      throws IOException {
     UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
     if (userProvider.isHBaseSecurityEnabled()) {
       try {
-        Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
-        ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
-        Connection peerConn = ConnectionFactory.createConnection(peerConf);
+        Connection peerConn = ConnectionFactory.createConnection(conf);
         try {
           TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
         } finally {
@@ -680,7 +690,7 @@ public class TableMapReduceUtil {
     // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
     if (quorumAddress != null) {
       // Calling this will validate the format
-      ZKUtil.transformClusterKey(quorumAddress);
+      ZKConfig.validateClusterKey(quorumAddress);
       conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
     }
     if (serverClass != null && serverImpl != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index 201e78f..998d700 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -195,22 +194,19 @@ implements Configurable {
 
   @Override
   public void setConf(Configuration otherConf) {
-    this.conf = HBaseConfiguration.create(otherConf);
-
-    String tableName = this.conf.get(OUTPUT_TABLE);
+    String tableName = otherConf.get(OUTPUT_TABLE);
     if(tableName == null || tableName.length() <= 0) {
       throw new IllegalArgumentException("Must specify table name");
     }
 
-    String address = this.conf.get(QUORUM_ADDRESS);
-    int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
-    String serverClass = this.conf.get(REGION_SERVER_CLASS);
-    String serverImpl = this.conf.get(REGION_SERVER_IMPL);
+    String address = otherConf.get(QUORUM_ADDRESS);
+    int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
+    String serverClass = otherConf.get(REGION_SERVER_CLASS);
+    String serverImpl = otherConf.get(REGION_SERVER_IMPL);
 
     try {
-      if (address != null) {
-        ZKUtil.applyClusterKeyToConf(this.conf, address);
-      }
+      this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
+
       if (serverClass != null) {
         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
       }
@@ -221,9 +217,5 @@ implements Configurable {
       LOG.error(e);
       throw new RuntimeException(e);
     }
-
-    // finally apply any remaining "hbase.mapred.output." configuration overrides
-    Configuration outputOverrides = HBaseConfiguration.subset(otherConf, OUTPUT_CONF_PREFIX);
-    HBaseConfiguration.merge(this.conf, outputOverrides);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 76ac541..e6b4802 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
@@ -70,6 +69,7 @@ public class VerifyReplication extends Configured implements Tool {
       LogFactory.getLog(VerifyReplication.class);
 
   public final static String NAME = "verifyrep";
+  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
   static long startTime = 0;
   static long endTime = Long.MAX_VALUE;
   static int versions = -1;
@@ -130,8 +130,8 @@ public class VerifyReplication extends Configured implements Tool {
         final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
 
         String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
-        Configuration peerConf = HBaseConfiguration.create(conf);
-        ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
+        Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
+            zkClusterKey, PEER_CONFIG_PREFIX);
 
         TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
         connection = ConnectionFactory.createConnection(peerConf);
@@ -211,7 +211,8 @@ public class VerifyReplication extends Configured implements Tool {
     }
   }
 
-  private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
+  private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
+      final Configuration conf) throws IOException {
     ZooKeeperWatcher localZKW = null;
     ReplicationPeerZKImpl peer = null;
     try {
@@ -228,8 +229,8 @@ public class VerifyReplication extends Configured implements Tool {
       if (pair == null) {
         throw new IOException("Couldn't get peer conf!");
       }
-      Configuration peerConf = rp.getPeerConf(peerId).getSecond();
-      return ZKUtil.getZooKeeperClusterKey(peerConf);
+
+      return pair;
     } catch (ReplicationException e) {
       throw new IOException(
           "An error occured while trying to connect to the remove peer cluster", e);
@@ -268,9 +269,14 @@ public class VerifyReplication extends Configured implements Tool {
       conf.set(NAME+".families", families);
     }
 
-    String peerQuorumAddress = getPeerQuorumAddress(conf);
+    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
+    ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
+    String peerQuorumAddress = peerConfig.getClusterKey();
+    LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
+        peerConfig.getConfiguration());
     conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
-    LOG.info("Peer Quorum Address: " + peerQuorumAddress);
+    HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
+        peerConfig.getConfiguration().entrySet());
 
     conf.setInt(NAME + ".versions", versions);
     LOG.info("Number of version: " + versions);
@@ -293,8 +299,9 @@ public class VerifyReplication extends Configured implements Tool {
     TableMapReduceUtil.initTableMapperJob(tableName, scan,
         Verifier.class, null, null, job);
 
+    Configuration peerClusterConf = peerConfigPair.getSecond();
     // Obtain the auth token from peer cluster
-    TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
+    TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
 
     job.setOutputFormatClass(NullOutputFormat.class);
     job.setNumReduceTasks(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 5c61afb..2ba1b47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 
 /**
  * Similar to {@link RegionReplicaUtil} but for the server side
@@ -148,7 +148,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
     try {
       if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) {
         ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
-        peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf));
+        peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
         peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
         repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
index 4e1599a..77d01e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
@@ -343,71 +343,6 @@ public class TestZooKeeper {
     assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null));
   }
 
-  @Test
-  public void testClusterKey() throws Exception {
-    testKey("server", 2181, "hbase");
-    testKey("server1,server2,server3", 2181, "hbase");
-    try {
-      ZKUtil.transformClusterKey("2181:hbase");
-    } catch (IOException ex) {
-      // OK
-    }
-  }
-
-  @Test
-  public void testClusterKeyWithMultiplePorts() throws Exception {
-    // server has different port than the default port
-    testKey("server1:2182", 2181, "hbase", true);
-    // multiple servers have their own port
-    testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
-    // one server has no specified port, should use default port
-    testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
-    // the last server has no specified port, should use default port
-    testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
-    // multiple servers have no specified port, should use default port for those servers
-    testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
-    // same server, different ports
-    testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
-    // mix of same server/different port and different server
-    testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
-  }
-
-  private void testKey(String ensemble, int port, String znode)
-      throws IOException {
-    testKey(ensemble, port, znode, false); // not support multiple client ports
-  }
-
-  private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport)
-      throws IOException {
-    Configuration conf = new Configuration();
-    String key = ensemble+":"+port+":"+znode;
-    String ensemble2 = null;
-    ZKUtil.ZKClusterKey zkClusterKey = ZKUtil.transformClusterKey(key);
-    if (multiplePortSupport) {
-      ensemble2 = ZKUtil.standardizeQuorumServerString(ensemble, Integer.toString(port));
-      assertEquals(ensemble2, zkClusterKey.quorumString);
-    }
-    else {
-      assertEquals(ensemble, zkClusterKey.quorumString);
-    }
-    assertEquals(port, zkClusterKey.clientPort);
-    assertEquals(znode, zkClusterKey.znodeParent);
-
-    ZKUtil.applyClusterKeyToConf(conf, key);
-    assertEquals(zkClusterKey.quorumString, conf.get(HConstants.ZOOKEEPER_QUORUM));
-    assertEquals(zkClusterKey.clientPort, conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1));
-    assertEquals(zkClusterKey.znodeParent, conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-
-    String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf);
-    if (multiplePortSupport) {
-      String key2 = ensemble2 + ":" + port + ":" + znode;
-      assertEquals(key2, reconstructedKey);
-    }
-    else {
-      assertEquals(key, reconstructedKey);
-    }
-  }
-
   /**
    * A test for HBASE-3238
    * @throws IOException A connection attempt to zk failed

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index e187b9b..e18220d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -24,9 +24,13 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -38,10 +42,12 @@ import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Lists;
 
-import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 /**
  * Unit testing of ReplicationAdmin
@@ -117,7 +123,29 @@ public class TestReplicationAdmin {
     admin.removePeer(ID_SECOND);
     assertEquals(0, admin.getPeersCount());
   }
-  
+
+  /**
+   * Tests that the peer configuration used by ReplicationAdmin contains all
+   * the peer's properties.
+   */
+  @Test
+  public void testPeerConfig() throws Exception {
+    ReplicationPeerConfig config = new ReplicationPeerConfig();
+    config.setClusterKey(KEY_ONE);
+    config.getConfiguration().put("key1", "value1");
+    config.getConfiguration().put("key2", "value2");
+    admin.addPeer(ID_ONE, config, null);
+
+    List<ReplicationPeer> peers = admin.listValidReplicationPeers();
+    assertEquals(1, peers.size());
+    ReplicationPeer peerOne = peers.get(0);
+    assertNotNull(peerOne);
+    assertEquals("value1", peerOne.getConfiguration().get("key1"));
+    assertEquals("value2", peerOne.getConfiguration().get("key2"));
+
+    admin.removePeer(ID_ONE);
+  }
+
   @Test
   public void testAddPeerWithUnDeletedQueues() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 52fb41c..a5a4e73 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -115,7 +115,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   public void testCustomReplicationEndpoint() throws Exception {
     // test installing a custom replication endpoint other than the default one.
     admin.addPeer("testCustomReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
         .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
 
     // check whether the class has been constructed and started
@@ -157,7 +157,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     int peerCount = admin.getPeersCount();
     final String id = "testReplicationEndpointReturnsFalseOnReplicate";
     admin.addPeer(id,
-      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
         .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
     // This test is flakey and then there is so much stuff flying around in here its, hard to
     // debug.  Peer needs to be up for the edit to make it across. This wait on
@@ -209,7 +209,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     }
 
     admin.addPeer(id,
-        new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf2))
+        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2))
             .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
         null);
 
@@ -234,7 +234,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   @Test (timeout=120000)
   public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
     admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
         .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
     // now replicate some data.
     try (Connection connection = ConnectionFactory.createConnection(conf1)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index f05eceb..696c130 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -27,7 +27,7 @@ import java.util.SortedSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
 import org.junit.Test;
@@ -202,7 +202,7 @@ public abstract class TestReplicationStateBasic {
       fail("There are no connected peers, should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException e) {
     }
-    assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
+    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
     rp.removePeer(ID_ONE);
     rp.peerRemoved(ID_ONE);
     assertNumberOfPeers(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index fff6c9d..4587c61 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -79,7 +80,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
     String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234");
     ZKUtil.createWithParents(zkw1, fakeRs);
     ZKClusterId.setClusterId(zkw1, new ClusterId());
-    return ZKUtil.getZooKeeperClusterKey(testConf);
+    return ZKConfig.getZooKeeperClusterKey(testConf);
   }
 
   @Before
@@ -94,7 +95,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
     rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3);
     rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
     rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
-    OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf);
+    OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
     rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
index 2231f0e..65600ff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -129,7 +129,8 @@ public class TestRegionReplicaReplicationEndpoint {
     // assert peer configuration is correct
     peerConfig = admin.getPeerConfig(peerId);
     assertNotNull(peerConfig);
-    assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration()));
+    assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
+        HTU.getConfiguration()));
     assertEquals(peerConfig.getReplicationEndpointImpl(),
       RegionReplicaReplicationEndpoint.class.getName());
     admin.close();
@@ -162,7 +163,8 @@ public class TestRegionReplicaReplicationEndpoint {
     // assert peer configuration is correct
     peerConfig = admin.getPeerConfig(peerId);
     assertNotNull(peerConfig);
-    assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration()));
+    assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
+        HTU.getConfiguration()));
     assertEquals(peerConfig.getReplicationEndpointImpl(),
       RegionReplicaReplicationEndpoint.class.getName());
     admin.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
deleted file mode 100644
index 8f5961f..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({MiscTests.class, SmallTests.class})
-public class TestZKConfig {
-
-  @Test
-  public void testZKConfigLoading() throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    // Test that we read only from the config instance
-    // (i.e. via hbase-default.xml and hbase-site.xml)
-    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
-    Properties props = ZKConfig.makeZKProps(conf);
-    Assert.assertEquals("Property client port should have been default from the HBase config",
-                        "2181",
-                        props.getProperty("clientPort"));
-  }
-}


Mime
View raw message