hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject hbase git commit: HBASE-15647 Backport HBASE-15507 (update_peer_config) to 0.98
Date Tue, 19 Apr 2016 21:48:08 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 e956eb63d -> c9b9f7f66


HBASE-15647 Backport HBASE-15507 (update_peer_config) to 0.98

Signed-off-by: Andrew Purtell <apurtell@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c9b9f7f6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c9b9f7f6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c9b9f7f6

Branch: refs/heads/0.98
Commit: c9b9f7f660519bc9576f73f5a79ef27f55475651
Parents: e956eb6
Author: gjacoby <gjacoby@salesforce.com>
Authored: Wed Apr 13 14:41:26 2016 -0700
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Tue Apr 19 14:47:58 2016 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |   9 ++
 .../hbase/replication/ReplicationPeer.java      |   6 +
 .../ReplicationPeerConfigListener.java          |  33 +++++
 .../replication/ReplicationPeerZKImpl.java      |  71 +++++++++-
 .../hbase/replication/ReplicationPeers.java     |   8 ++
 .../replication/ReplicationPeersZKImpl.java     | 139 ++++++-------------
 .../replication/ReplicationSerDeHelper.java     | 124 +++++++++++++++++
 .../replication/ReplicationStateZKBase.java     |   4 +
 .../replication/BaseReplicationEndpoint.java    |  21 +++
 .../hbase/replication/ReplicationEndpoint.java  |   7 +-
 .../regionserver/ReplicationSourceManager.java  |   3 +-
 .../VisibilityReplicationEndpoint.java          |   5 +
 .../TestReplicationAdminWithClusters.java       |  67 +++++++--
 .../hbase/replication/TestReplicationBase.java  |   2 +-
 .../src/main/ruby/hbase/replication_admin.rb    |  25 ++++
 hbase-shell/src/main/ruby/shell.rb              |   1 +
 .../ruby/shell/commands/update_peer_config.rb   |  49 +++++++
 .../test/ruby/hbase/replication_admin_test.rb   |  26 +++-
 18 files changed, 488 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 7e25500..ade261f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -217,6 +217,10 @@ public class ReplicationAdmin implements Closeable {
     return tableCfsStr;
   }
 
+  public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
+    throws ReplicationException {
+    this.replicationPeers.updatePeerConfig(id, peerConfig);
+  }
   /**
    * Removes a peer cluster and stops the replication to it.
    * @param id a short name that identifies the cluster
@@ -497,6 +501,11 @@ public class ReplicationAdmin implements Closeable {
   }
 
   @VisibleForTesting
+  public void peerAdded(String id) throws ReplicationException {
+    this.replicationPeers.peerAdded(id);
+  }
+
+  @VisibleForTesting
   List<ReplicationPeer> listReplicationPeers() {
     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
     if (peers == null || peers.size() <= 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 251c1ed..c99bd62 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -67,6 +67,12 @@ public interface ReplicationPeer {
   public Configuration getConfiguration();
 
   /**
+   * Setup a callback for chanages to the replication peer config
+   * @param listener Listener for config changes, usually a replication endpoint
+   */
+  void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
+
+  /*
    * Get replicable (table, cf-list) map of this peer
    * 
    * @return the replicable (table, cf-list) map

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
new file mode 100644
index 0000000..b10862b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+    
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationPeerConfigListener {
+/** Callback method for when users update the ReplicationPeerConfig for this peer
+  *
+  * @param rpc The updated ReplicationPeerConfig
+  */
+  void peerConfigUpdated(ReplicationPeerConfig rpc);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index 7c22308..1792443 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -44,7 +44,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
 public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
   private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
 
-  private final ReplicationPeerConfig peerConfig;
+  private ReplicationPeerConfig peerConfig;
   private final String id;
   private volatile PeerState peerState;
   private volatile Map<String, List<String>> tableCFs = new HashMap<String,
List<String>>();
@@ -52,7 +52,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable,
Closea
 
   private PeerStateTracker peerStateTracker;
   private TableCFsTracker tableCFsTracker;
-
+  private PeerConfigTracker peerConfigTracker;
   /**
    * Constructor that takes all the objects required to communicate with the
    * specified peer, except for the region server addresses.
@@ -157,12 +157,36 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable,
Closea
 
     return tableCFsMap;
   }
+  /**
+   * start a table-cfs tracker to listen the (table, cf-list) map change
+   * @param zookeeper
+   * @param peerConfigNode path to zk node which stores table-cfs
+   * @throws KeeperException
+   */
+  public void startPeerConfigTracker(ZooKeeperWatcher zookeeper, String peerConfigNode)
+      throws KeeperException {
+    this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper,
+        this);
+    this.peerConfigTracker.start();
+    this.readPeerConfig();
+  }
 
   private void readTableCFsZnode() {
     String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
     this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
   }
 
+  private ReplicationPeerConfig readPeerConfig() {
+    try {
+      byte[] data = peerConfigTracker.getData(false);
+      if (data != null) {
+        this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data);
+      }
+    } catch (DeserializationException e) {
+      LOG.error("", e);
+    }
+    return this.peerConfig;
+  }
   @Override
   public PeerState getPeerState() {
     return peerState;
@@ -205,6 +229,13 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable,
Closea
   }
 
   @Override
+  public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
+    if (this.peerConfigTracker != null){
+      this.peerConfigTracker.setListener(listener);
+    }
+  }
+
+  @Override
   public void abort(String why, Throwable e) {
     LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
         + " was aborted for the following reason(s):" + why, e);
@@ -323,4 +354,40 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable,
Closea
       }
     }
   }
+
+  /**
+   * Tracker for PeerConfigNode of this peer
+   */
+  public class PeerConfigTracker extends ZooKeeperNodeTracker {
+
+    private ReplicationPeerConfigListener listener;
+
+    public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher,
+                             Abortable abortable) {
+      super(watcher, peerConfigNode, abortable);
+    }
+
+    public synchronized void setListener(ReplicationPeerConfigListener listener){
+      this.listener = listener;
+    }
+
+    @Override
+    public synchronized void nodeCreated(String path) {
+      if (path.equals(node)) {
+        super.nodeCreated(path);
+        ReplicationPeerConfig config = readPeerConfig();
+        if (listener != null){
+          listener.peerConfigUpdated(config);
+        }
+      }
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      //superclass calls nodeCreated
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 10f8932..4d5ac9a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -156,4 +156,12 @@ public interface ReplicationPeers {
    * @return the configuration for the peer cluster, null if it was unable to get the configuration
    */
   Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
+
+  /**
+   * Updates replication peer configuration and/or peer data
+   * @param id a short that identifies the cluster
+   * @param peerConfig configuration for the replication slave cluster
+   * @throws ReplicationException
+   */
+  void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 4d977c0..632894c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -35,9 +35,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -47,8 +44,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.protobuf.ByteString;
-
 /**
  * This class provides an implementation of the ReplicationPeers interface using Zookeeper.
The
  * peers znode contains a list of all peer replication clusters and the current replication
state of
@@ -118,9 +113,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements
Re
       
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
-      ZKUtilOp op1 =
-          ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
-            toByteArray(peerConfig));
+      ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
+          ReplicationSerDeHelper.toByteArray(peerConfig));
       // There is a race (if hbase.zookeeper.useMulti is false)
       // b/w PeerWatcher and ReplicationZookeeper#add method to create the
       // peer-state znode. This happens while adding a peer
@@ -275,7 +269,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements
Re
     }
 
     try {
-      return parsePeerFrom(data);
+      return ReplicationSerDeHelper.parsePeerFrom(data);
     } catch (DeserializationException e) {
       LOG.warn("Failed to parse cluster key from peerId=" + peerId
           + ", specifically the content from the following znode: " + znode);
@@ -365,6 +359,43 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements
Re
     }
   }
 
+  @Override
+  public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
+    throws ReplicationException {
+    ReplicationPeer peer = getPeer(id);
+    if (peer == null){
+      throw new ReplicationException("Could not find peer Id " + id);
+    }
+    ReplicationPeerConfig existingConfig = peer.getPeerConfig();
+    if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty()
&&
+        !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){
+      throw new ReplicationException("Changing the cluster key on an existing peer is not
allowed."
+          + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key
'"
+          + newConfig.getClusterKey() +
+      "'");
+    }
+    String existingEndpointImpl = existingConfig.getReplicationEndpointImpl();
+    if (newConfig.getReplicationEndpointImpl() != null &&
+        !newConfig.getReplicationEndpointImpl().isEmpty() &&
+        !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){
+      throw new ReplicationException("Changing the replication endpoint implementation class
" +
+          "on an existing peer is not allowed. Existing class '"
+          + existingConfig.getReplicationEndpointImpl()
+          + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'");
+    }
+    //Update existingConfig's peer config and peer data with the new values, but don't touch
config
+    // or data that weren't explicitly changed
+    existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
+    existingConfig.getPeerData().putAll(newConfig.getPeerData());
+    try {
+      ZKUtil.setData(this.zookeeper, getPeerNode(id),
+          ReplicationSerDeHelper.toByteArray(existingConfig));
+    }
+    catch(KeeperException ke){
+      throw new ReplicationException("There was a problem trying to save changes to the "
+
+          "replication peer " + id, ke);
+    }
+  }
   /**
    * Attempt to connect to a new remote slave cluster.
    * @param peerId a short that identifies the cluster
@@ -461,91 +492,13 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements
Re
           peerId, e);
     }
 
-    return peer;
-  }
-
-  /**
-   * @param bytes Content of a peer znode.
-   * @return ClusterKey parsed from the passed bytes.
-   * @throws DeserializationException
-   */
-  private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
-      throws DeserializationException {
-    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
-      int pblen = ProtobufUtil.lengthOfPBMagic();
-      ZooKeeperProtos.ReplicationPeer.Builder builder =
-          ZooKeeperProtos.ReplicationPeer.newBuilder();
-      ZooKeeperProtos.ReplicationPeer peer;
-      try {
-        ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
-        peer = builder.build();
-      } catch (IOException e) {
-        throw new DeserializationException(e);
-      }
-      return convert(peer);
-    } else {
-      if (bytes.length > 0) {
-        return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
-      }
-      return new ReplicationPeerConfig().setClusterKey("");
-    }
-  }
-
-  private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
-    ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
-    if (peer.hasClusterkey()) {
-      peerConfig.setClusterKey(peer.getClusterkey());
-    }
-    if (peer.hasReplicationEndpointImpl()) {
-      peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
-    }
-
-    for (BytesBytesPair pair : peer.getDataList()) {
-      peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
-    }
-
-    for (NameStringPair pair : peer.getConfigurationList()) {
-      peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
-    }
-    return peerConfig;
-  }
-
-  private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig  peerConfig)
{
-    ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
-    if (peerConfig.getClusterKey() != null) {
-      builder.setClusterkey(peerConfig.getClusterKey());
-    }
-    if (peerConfig.getReplicationEndpointImpl() != null) {
-      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
-    }
-
-    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
-      builder.addData(BytesBytesPair.newBuilder()
-        .setFirst(ByteString.copyFrom(entry.getKey()))
-        .setSecond(ByteString.copyFrom(entry.getValue()))
-          .build());
+    try {
+      peer.startPeerConfigTracker(this.zookeeper, this.getPeerNode(peerId));
+    } catch(KeeperException e) {
+      throw new ReplicationException("Error starting the peer config tracker for peerId="
+
+          peerId, e);
     }
-
-    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet())
{
-      builder.addConfiguration(NameStringPair.newBuilder()
-        .setName(entry.getKey())
-        .setValue(entry.getValue())
-        .build());
+      return peer;
     }
 
-    return builder.build();
-  }
-
-  /**
-   * @param peerConfig
-   * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix
prepended suitable
-   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
-   *         /hbase/replication/peers/PEER_ID
-   */
-  private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
-    byte[] bytes = convert(peerConfig).toByteArray();
-    return ProtobufUtil.prependPBMagic(bytes);
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
new file mode 100644
index 0000000..05f909d
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.Map;
+
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public final class ReplicationSerDeHelper {
+  private static final Log LOG = LogFactory.getLog(ReplicationSerDeHelper.class);
+
+  private ReplicationSerDeHelper() {}
+
+  /**
+   * @param bytes Content of a peer znode.
+   * @return ClusterKey parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
+      throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.ReplicationPeer.Builder builder =
+          ZooKeeperProtos.ReplicationPeer.newBuilder();
+      ZooKeeperProtos.ReplicationPeer peer;
+      try {
+        ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
+        peer = builder.build();
+      } catch (IOException e) {
+        throw new DeserializationException(e);
+      }
+      return convert(peer);
+    } else {
+      if (bytes.length > 0) {
+        return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
+      }
+      return new ReplicationPeerConfig().setClusterKey("");
+    }
+  }
+
+  private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
+    ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
+    if (peer.hasClusterkey()) {
+      peerConfig.setClusterKey(peer.getClusterkey());
+    }
+    if (peer.hasReplicationEndpointImpl()) {
+      peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
+    }
+
+    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
+      peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
+    }
+
+    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
+      peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
+    }
+    return peerConfig;
+  }
+
+  /**
+   * @param peerConfig
+   * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix
prepended suitable
+   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
+   *         /hbase/replication/peers/PEER_ID
+   */
+  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
+    byte[] bytes = convert(peerConfig).toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig  peerConfig)
{
+    ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
+    if (peerConfig.getClusterKey() != null) {
+      builder.setClusterkey(peerConfig.getClusterKey());
+    }
+    if (peerConfig.getReplicationEndpointImpl() != null) {
+      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
+    }
+
+    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
+      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
+          .setFirst(ByteString.copyFrom(entry.getKey()))
+          .setSecond(ByteString.copyFrom(entry.getValue()))
+          .build());
+    }
+
+    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet())
{
+      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder()
+          .setName(entry.getKey())
+          .setValue(entry.getValue())
+          .build());
+    }
+
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 7d8455b..fd253da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -110,4 +110,8 @@ public abstract class ReplicationStateZKBase {
   protected boolean isPeerPath(String path) {
     return path.split("/").length == peersZNode.split("/").length + 1;
   }
+
+  protected String getPeerNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode, id);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
index 67051ab..bc73f74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.replication;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import com.google.common.collect.Lists;
@@ -35,11 +37,30 @@ import com.google.common.util.concurrent.AbstractService;
 public abstract class BaseReplicationEndpoint extends AbstractService
   implements ReplicationEndpoint {
 
+  private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class);
   protected Context ctx;
 
   @Override
   public void init(Context context) throws IOException {
     this.ctx = context;
+
+    if (this.ctx != null){
+      ReplicationPeer peer = this.ctx.getReplicationPeer();
+      if (peer != null){
+        peer.trackPeerConfigChanges(this);
+      } else {
+        LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId()
+
+            " because there's no such peer");
+      }
+    }
+  }
+
+  @Override
+  /**
+   * No-op implementation for subclasses to override if they wish to execute logic if their
config changes
+   */
+  public void peerConfigUpdated(ReplicationPeerConfig rpc){
+
   }
 
   /** Returns a default set of filters */

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index 5a2e96f..60645d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -45,13 +45,12 @@ import com.google.common.util.concurrent.Service;
  * and persisting of the WAL entries in the other cluster.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public interface ReplicationEndpoint extends Service {
+public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener {
 
   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class Context {
     private final Configuration conf;
     private final FileSystem fs;
-    private final ReplicationPeerConfig peerConfig;
     private final ReplicationPeer replicationPeer;
     private final String peerId;
     private final UUID clusterId;
@@ -61,12 +60,10 @@ public interface ReplicationEndpoint extends Service {
     public Context(
         final Configuration conf,
         final FileSystem fs,
-        final ReplicationPeerConfig peerConfig,
         final String peerId,
         final UUID clusterId,
         final ReplicationPeer replicationPeer,
         final MetricsSource metrics) {
-      this.peerConfig = peerConfig;
       this.conf = conf;
       this.fs = fs;
       this.clusterId = clusterId;
@@ -87,7 +84,7 @@ public interface ReplicationEndpoint extends Service {
       return peerId;
     }
     public ReplicationPeerConfig getPeerConfig() {
-      return peerConfig;
+      return replicationPeer.getPeerConfig();
     }
     public ReplicationPeer getReplicationPeer() {
       return replicationPeer;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 42b0cfc..95f7635 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -421,8 +421,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
     // init replication endpoint
     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
-      fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
-
+      fs, peerId, clusterId, replicationPeer, metrics));
     return src;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
index 75da263..e927ebb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 
@@ -158,4 +159,8 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint
{
     return delegator.stopAndWait();
   }
 
+  @Override
+  public void peerConfigUpdated(ReplicationPeerConfig rpc) {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index ffec60f..0f8b563 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -29,6 +29,9 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import java.util.UUID;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 
 /**
  * Unit testing of ReplicationAdmin with clusters
@@ -40,6 +43,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase
{
   static HConnection connection2;
   static HBaseAdmin admin1;
   static HBaseAdmin admin2;
+  static ReplicationAdmin adminExt;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -48,6 +52,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase
{
     connection2 = HConnectionManager.createConnection(conf2);
     admin1 = new HBaseAdmin(connection1.getConfiguration());
     admin2 = new HBaseAdmin(connection2.getConfiguration());
+    adminExt = new ReplicationAdmin(conf1);
   }
 
   @AfterClass
@@ -64,7 +69,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase
{
     admin2.disableTable(tableName);
     admin2.deleteTable(tableName);
     assertFalse(admin2.tableExists(tableName));
-    ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
     adminExt.enableTableRep(TableName.valueOf(tableName));
     assertTrue(admin2.tableExists(tableName));
   }
@@ -83,7 +87,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase
{
     admin2.modifyTable(tableName, table);
     admin2.enableTable(tableName);
 
-    ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
     adminExt.enableTableRep(TableName.valueOf(tableName));
     table = admin1.getTableDescriptor(tableName);
     for (HColumnDescriptor fam : table.getColumnFamilies()) {
@@ -100,7 +103,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase
{
     admin2.modifyTable(tableName, table);
     admin2.enableTable(tableName);
 
-    ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
     try {
       adminExt.enableTableRep(TableName.valueOf(tableName));
       fail("Exception should be thrown if table descriptors in the clusters are not same.");
@@ -119,7 +121,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase
{
 
   @Test(timeout = 300000)
   public void testDisableAndEnableReplication() throws Exception {
-    ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
     adminExt.disableTableRep(TableName.valueOf(tableName));
     HTableDescriptor table = admin1.getTableDescriptor(tableName);
     for (HColumnDescriptor fam : table.getColumnFamilies()) {
@@ -138,25 +139,75 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase
{
 
   @Test(timeout = 300000, expected = TableNotFoundException.class)
   public void testDisableReplicationForNonExistingTable() throws Exception {
-    ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
     adminExt.disableTableRep(TableName.valueOf("nonExistingTable"));
   }
 
   @Test(timeout = 300000, expected = TableNotFoundException.class)
   public void testEnableReplicationForNonExistingTable() throws Exception {
-    ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
     adminExt.enableTableRep(TableName.valueOf("nonExistingTable"));
   }
 
   @Test(timeout = 300000, expected = IllegalArgumentException.class)
   public void testDisableReplicationWhenTableNameAsNull() throws Exception {
-    ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
     adminExt.disableTableRep(null);
   }
 
   @Test(timeout = 300000, expected = IllegalArgumentException.class)
   public void testEnableReplicationWhenTableNameAsNull() throws Exception {
-    ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
     adminExt.enableTableRep(null);
   }
+
+  @Test(timeout=300000)
+  public void testReplicationPeerConfigUpdateCallback() throws Exception {
+    String peerId = "1";
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
+    rpc.getConfiguration().put("key1", "value1");
+    adminExt.addPeer(peerId, rpc, null);
+    adminExt.peerAdded(peerId);
+    rpc.getConfiguration().put("key1", "value2");
+    adminExt.updatePeerConfig(peerId, rpc);
+    if (!TestUpdatableReplicationEndpoint.hasCalledBack()) {
+      synchronized(TestUpdatableReplicationEndpoint.class) {
+        TestUpdatableReplicationEndpoint.class.wait(2000L);
+      }
+    }
+    assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack());
+    adminExt.removePeer(peerId);
+  }
+
+  public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint {
+    private static boolean calledBack = false;
+
+    public static boolean hasCalledBack() {
+      return calledBack;
+    }
+
+    @Override
+    public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc) {
+      calledBack = true;
+      notifyAll();
+    }
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+
+    @Override
+    public UUID getPeerUUID() {
+      return UUID.randomUUID();
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 3405f72..71c9814 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -103,7 +103,7 @@ public class TestReplicationBase {
     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
     // Have to reget conf1 in case zk cluster location different
     // than default
-    conf1 = utility1.getConfiguration();  
+    conf1 = utility1.getConfiguration();
     zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null, true);
     admin = new ReplicationAdmin(conf1);
     LOG.info("Setup first Zk");

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 3c39bdf..ea82384 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -168,5 +168,30 @@ module Hbase
     def get_peer_config(id)
       @replication_admin.get_peer_config(id)
     end
+    def peer_added(id)
+      @replication_admin.peer_added(id)
+    end
+
+    def update_peer_config(id, args={})
+      # Optional parameters
+      config = args.fetch(CONFIG, nil)
+      data = args.fetch(DATA, nil)
+
+      # Create and populate a ReplicationPeerConfig
+      replication_peer_config = ReplicationPeerConfig.new
+      unless config.nil?
+        replication_peer_config.get_configuration.put_all(config)
+      end
+
+      unless data.nil?
+        # Convert Strings to Bytes for peer_data
+        peer_data = replication_peer_config.get_peer_data
+        data.each{|key, val|
+          peer_data.put(Bytes.to_bytes(key), Bytes.to_bytes(val))
+        }
+      end
+
+      @replication_admin.update_peer_config(id, replication_peer_config)
+    end
   end
 end

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index 2398aaa..7a90eaa 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -345,6 +345,7 @@ Shell.load_command_group(
     disable_table_replication
     get_peer_config
     list_peer_configs
+    update_peer_config
   ]
 )
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb b/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb
new file mode 100644
index 0000000..bcecb91
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb
@@ -0,0 +1,49 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+    module Commands
+        class UpdatePeerConfig< Command
+            def help
+                return <<-EOF
+A peer can either be another HBase cluster or a custom replication endpoint. In either case
an id
+must be specified to identify the peer. This command does not interrupt processing on an
enabled replication peer.
+
+Two optional arguments are DATA and CONFIG which can be specified to set different values
for either
+the peer_data or configuration for a custom replication endpoint. Any existing values not
updated by this command
+are left unchanged.
+
+CLUSTER_KEY, REPLICATION_ENDPOINT, and TABLE_CFs cannot be updated with this command.
+To update TABLE_CFs, see the append_peer_tableCFs and remove_peer_tableCFs commands.
+
+  hbase> update_peer_config '1', DATA => { "key1" => 1 }
+  hbase> update_peer_config '2', CONFIG => { "config1" => "value1", "config2" =>
"value2" }
+  hbase> update_peer_config '3', DATA => { "key1" => 1 }, CONFIG => { "config1"
=> "value1", "config2" => "value2" },
+
+        EOF
+      end
+
+      def command(id, args = {})
+        format_simple_command do
+          replication_admin.update_peer_config(id, args)
+        end
+      end
+    end
+  end
+end
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9b9f7f6/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 14f4a0b..660ac91 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -30,7 +30,6 @@ module Hbase
     include TestHelpers
 
     def setup
-      @test_name = "hbase_shell_tests_table"
       @peer_id = '1'
 
       setup_hbase
@@ -211,6 +210,31 @@ module Hbase
       replication_admin.remove_peer(peer_id_second)
     end
 
+    define_test "update_peer_config: can update peer config and data" do
+      repl_impl = "org.apache.hadoop.hbase.replication.ReplicationEndpointForTest"
+      config_params = { "config1" => "value1", "config2" => "value2" }
+      data_params = {"data1" => "value1", "data2" => "value2"}
+      args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA =>
data_params}
+      replication_admin.add_peer(@peer_id, args)
+
+      #Normally the ReplicationSourceManager will call ReplicationPeer#peer_added, but here
we have to do it ourselves
+      replication_admin.peer_added(@peer_id)
+
+      new_config_params = { "config1" => "new_value1" }
+      new_data_params = {"data1" => "new_value1"}
+      new_args = {CONFIG => new_config_params, DATA => new_data_params}
+      replication_admin.update_peer_config(@peer_id, new_args)
+
+      #Make sure the updated key/value pairs in config and data were successfully updated,
and that those we didn't
+      #update are still there and unchanged
+      peer_config = replication_admin.get_peer_config(@peer_id)
+      replication_admin.remove_peer(@peer_id)
+      assert_equal("new_value1", peer_config.get_configuration.get("config1"))
+      assert_equal("value2", peer_config.get_configuration.get("config2"))
+      assert_equal("new_value1", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data1"))))
+      assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2"))))
+    end
+
     # assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279
     # Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below.
     # define_test "add_peer: adding a second peer with same id should error" do


Mime
View raw message