hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [20/24] hbase git commit: HBASE-15507 Online modification of enabled ReplicationPeerConfig (Geoffrey Jacoby)
Date Mon, 11 Apr 2016 16:10:45 GMT
HBASE-15507 Online modification of enabled ReplicationPeerConfig (Geoffrey Jacoby)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e0f31ba6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e0f31ba6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e0f31ba6

Branch: refs/heads/hbase-12439
Commit: e0f31ba6e68416db359d31eee21750cd7559909f
Parents: 6ea4994
Author: tedyu <yuzhihong@gmail.com>
Authored: Fri Apr 8 21:26:31 2016 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Fri Apr 8 21:26:31 2016 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    | 12 +++-
 .../hbase/replication/ReplicationPeer.java      |  2 +
 .../ReplicationPeerConfigListener.java          | 33 +++++++++++
 .../replication/ReplicationPeerZKImpl.java      | 26 ++++++++-
 .../hbase/replication/ReplicationPeers.java     |  2 +
 .../replication/ReplicationPeersZKImpl.java     | 39 +++++++++++++
 .../replication/BaseReplicationEndpoint.java    | 22 ++++++++
 .../hbase/replication/ReplicationEndpoint.java  |  7 +--
 .../regionserver/ReplicationSourceManager.java  |  2 +-
 .../VisibilityReplicationEndpoint.java          |  6 ++
 .../TestReplicationAdminWithClusters.java       | 58 ++++++++++++++++++++
 .../src/main/ruby/hbase/replication_admin.rb    | 29 +++++++++-
 hbase-shell/src/main/ruby/shell.rb              |  1 +
 .../ruby/shell/commands/update_peer_config.rb   | 49 +++++++++++++++++
 .../test/ruby/hbase/replication_admin_test.rb   | 25 +++++++++
 15 files changed, 301 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 8ee3a22..a2ad2e7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -201,7 +200,11 @@ public class ReplicationAdmin implements Closeable {
   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig)
{
     return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
   }
-  
+
+  public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
+      throws ReplicationException {
+    this.replicationPeers.updatePeerConfig(id, peerConfig);
+  }
   /**
    * Removes a peer cluster and stops the replication to it.
    * @param id a short name that identifies the cluster
@@ -550,6 +553,11 @@ public class ReplicationAdmin implements Closeable {
   }
 
   @VisibleForTesting
+  public void peerAdded(String id) throws ReplicationException {
+    this.replicationPeers.peerAdded(id);
+  }
+
+  @VisibleForTesting
   List<ReplicationPeer> listReplicationPeers() {
     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
     if (peers == null || peers.size() <= 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 920eea6..3da01fe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -71,4 +71,6 @@ public interface ReplicationPeer {
    */
   public Map<TableName, List<String>> getTableCFs();
 
+  void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
new file mode 100644
index 0000000..4e04186
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationPeerConfigListener {
+  /** Callback method for when users update the ReplicationPeerConfig for this peer
+   *
+   * @param rpc The updated ReplicationPeerConfig
+   */
+  void peerConfigUpdated(ReplicationPeerConfig rpc);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index f7a2411..a33690c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -109,7 +109,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
     this.readPeerConfig();
   }
 
-  private void readPeerConfig() {
+  private ReplicationPeerConfig readPeerConfig() {
     try {
       byte[] data = peerConfigTracker.getData(false);
       if (data != null) {
@@ -118,6 +118,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
     } catch (DeserializationException e) {
       LOG.error("", e);
     }
+    return this.peerConfig;
   }
 
   @Override
@@ -163,6 +164,13 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
   }
 
   @Override
+  public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
+    if (this.peerConfigTracker != null){
+      this.peerConfigTracker.setListener(listener);
+    }
+  }
+
+  @Override
   public void abort(String why, Throwable e) {
     LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
         + " was aborted for the following reason(s):" + why, e);
@@ -260,24 +268,36 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
    */
   public class PeerConfigTracker extends ZooKeeperNodeTracker {
 
+    ReplicationPeerConfigListener listener;
+
     public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher,
         Abortable abortable) {
       super(watcher, peerConfigNode, abortable);
     }
-    
+
+    public synchronized void setListener(ReplicationPeerConfigListener listener){
+      this.listener = listener;
+    }
+
     @Override
     public synchronized void nodeCreated(String path) {
       if (path.equals(node)) {
         super.nodeCreated(path);
-        readPeerConfig();
+        ReplicationPeerConfig config = readPeerConfig();
+        if (listener != null){
+          listener.peerConfigUpdated(config);
+        }
       }
     }
 
     @Override
     public synchronized void nodeDataChanged(String path) {
+      //superclass calls nodeCreated
       if (path.equals(node)) {
         super.nodeDataChanged(path);
       }
+
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 1961a65..9f70d95 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -151,4 +151,6 @@ public interface ReplicationPeers {
    * @return the configuration for the peer cluster, null if it was unable to get the configuration
    */
   Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
+
+  void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 367c688..916eaf8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -351,6 +351,45 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements
Re
     return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
   }
 
+  @Override
+  public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
+      throws ReplicationException {
+    ReplicationPeer peer = getPeer(id);
+    if (peer == null){
+      throw new ReplicationException("Could not find peer Id " + id);
+    }
+    ReplicationPeerConfig existingConfig = peer.getPeerConfig();
+    if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty()
&&
+        !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){
+      throw new ReplicationException("Changing the cluster key on an existing peer is not
allowed."
+          + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key
'"
+          + newConfig.getClusterKey() +
+      "'");
+    }
+    String existingEndpointImpl = existingConfig.getReplicationEndpointImpl();
+    if (newConfig.getReplicationEndpointImpl() != null &&
+        !newConfig.getReplicationEndpointImpl().isEmpty() &&
+        !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){
+      throw new ReplicationException("Changing the replication endpoint implementation class
" +
+          "on an existing peer is not allowed. Existing class '"
+          + existingConfig.getReplicationEndpointImpl()
+          + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'");
+    }
+    //Update existingConfig's peer config and peer data with the new values, but don't touch
config
+    // or data that weren't explicitly changed
+    existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
+    existingConfig.getPeerData().putAll(newConfig.getPeerData());
+
+    try {
+      ZKUtil.setData(this.zookeeper, getPeerNode(id),
+          ReplicationSerDeHelper.toByteArray(existingConfig));
+    }
+    catch(KeeperException ke){
+      throw new ReplicationException("There was a problem trying to save changes to the "
+
+          "replication peer " + id, ke);
+    }
+  }
+
   /**
    * List all registered peer clusters and set a watch on their znodes.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
index 67051ab..d667269 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -21,10 +21,13 @@ package org.apache.hadoop.hbase.replication;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractService;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 
 /**
  * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending
this
@@ -35,11 +38,30 @@ import com.google.common.util.concurrent.AbstractService;
 public abstract class BaseReplicationEndpoint extends AbstractService
   implements ReplicationEndpoint {
 
+  private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class);
   protected Context ctx;
 
   @Override
   public void init(Context context) throws IOException {
     this.ctx = context;
+
+    if (this.ctx != null){
+      ReplicationPeer peer = this.ctx.getReplicationPeer();
+      if (peer != null){
+        peer.trackPeerConfigChanges(this);
+      } else {
+        LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId()
+
+            " because there's no such peer");
+      }
+    }
+  }
+
+  @Override
+  /**
+   * No-op implementation for subclasses to override if they wish to execute logic if their
config changes
+   */
+  public void peerConfigUpdated(ReplicationPeerConfig rpc){
+
   }
 
   /** Returns a default set of filters */

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index ac1257f..c92b53d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -46,14 +46,13 @@ import com.google.common.util.concurrent.Service;
  * and persisting of the WAL entries in the other cluster.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public interface ReplicationEndpoint extends Service {
+public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener {
 
   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class Context {
     private final Configuration conf;
     private final FileSystem fs;
     private final TableDescriptors tableDescriptors;
-    private final ReplicationPeerConfig peerConfig;
     private final ReplicationPeer replicationPeer;
     private final String peerId;
     private final UUID clusterId;
@@ -63,13 +62,11 @@ public interface ReplicationEndpoint extends Service {
     public Context(
         final Configuration conf,
         final FileSystem fs,
-        final ReplicationPeerConfig peerConfig,
         final String peerId,
         final UUID clusterId,
         final ReplicationPeer replicationPeer,
         final MetricsSource metrics,
         final TableDescriptors tableDescriptors) {
-      this.peerConfig = peerConfig;
       this.conf = conf;
       this.fs = fs;
       this.clusterId = clusterId;
@@ -91,7 +88,7 @@ public interface ReplicationEndpoint extends Service {
       return peerId;
     }
     public ReplicationPeerConfig getPeerConfig() {
-      return peerConfig;
+      return replicationPeer.getPeerConfig();
     }
     public ReplicationPeer getReplicationPeer() {
       return replicationPeer;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9ff4b2d..83e0205 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -494,7 +494,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
     // init replication endpoint
     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
-      fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
+      fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
 
     return src;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
index 3db54c6..2ac515a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
@@ -57,6 +58,11 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint
{
   }
 
   @Override
+  public void peerConfigUpdated(ReplicationPeerConfig rpc){
+
+  }
+
+  @Override
   public boolean replicate(ReplicateContext replicateContext) {
     if (!delegator.canReplicateToSameCluster()) {
       // Only when the replication is inter cluster replication we need to

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index e7bd72c..a56276d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -18,6 +18,7 @@ import static org.junit.Assert.fail;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -27,6 +28,8 @@ import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.TestReplicationBase;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -195,4 +198,59 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase
{
       adminExt.disableTableRep(tableName);
     }
   }
+
+  @Test(timeout=300000)
+  public void testReplicationPeerConfigUpdateCallback() throws Exception {
+    String peerId = "1";
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
+    rpc.getConfiguration().put("key1", "value1");
+
+    admin.addPeer(peerId, rpc);
+    admin.peerAdded(peerId);
+
+    rpc.getConfiguration().put("key1", "value2");
+    admin.updatePeerConfig(peerId, rpc);
+    if (!TestUpdatableReplicationEndpoint.hasCalledBack()) {
+      synchronized(TestUpdatableReplicationEndpoint.class) {
+        TestUpdatableReplicationEndpoint.class.wait(2000L);
+      }
+    }
+
+    assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack());
+  }
+
+  public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint {
+    private static boolean calledBack = false;
+    public static boolean hasCalledBack(){
+      return calledBack;
+    }
+    @Override
+    public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc){
+      calledBack = true;
+      notifyAll();
+    }
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+
+
+    @Override
+    public UUID getPeerUUID() {
+      return UUID.randomUUID();
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index f441a99..e91a4f7 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -92,8 +92,9 @@ module Hbase
           table_cfs.each{|key, val|
             map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
           }
+          replication_peer_config.set_table_cfs_map(map)
         end
-        @replication_admin.add_peer(id, replication_peer_config, map)
+        @replication_admin.add_peer(id, replication_peer_config)
       else
         raise(ArgumentError, "args must be a Hash")
       end
@@ -202,5 +203,31 @@ module Hbase
     def get_peer_config(id)
       @replication_admin.get_peer_config(id)
     end
+
+    def peer_added(id)
+      @replication_admin.peer_added(id)
+    end
+
+    def update_peer_config(id, args={})
+      # Optional parameters
+      config = args.fetch(CONFIG, nil)
+      data = args.fetch(DATA, nil)
+
+      # Create and populate a ReplicationPeerConfig
+      replication_peer_config = ReplicationPeerConfig.new
+      unless config.nil?
+        replication_peer_config.get_configuration.put_all(config)
+      end
+
+      unless data.nil?
+        # Convert Strings to Bytes for peer_data
+        peer_data = replication_peer_config.get_peer_data
+        data.each{|key, val|
+          peer_data.put(Bytes.to_bytes(key), Bytes.to_bytes(val))
+        }
+      end
+
+      @replication_admin.update_peer_config(id, replication_peer_config)
+    end
   end
 end

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index e5c9a31..adcd8f2 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -365,6 +365,7 @@ Shell.load_command_group(
     disable_table_replication
     get_peer_config
     list_peer_configs
+    update_peer_config
   ]
 )
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb b/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb
new file mode 100644
index 0000000..5d721fd
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb
@@ -0,0 +1,49 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class UpdatePeerConfig< Command
+      def help
+        return <<-EOF
+A peer can either be another HBase cluster or a custom replication endpoint. In either case
an id
+must be specified to identify the peer. This command does not interrupt processing on an
enabled replication peer.
+
+Two optional arguments are DATA and CONFIG which can be specified to set different values
for either
+the peer_data or configuration for a custom replication endpoint. Any existing values not
updated by this command
+are left unchanged.
+
+CLUSTER_KEY, REPLICATION_ENDPOINT, and TABLE_CFs cannot be updated with this command.
+To update TABLE_CFs, see the append_peer_tableCFs and remove_peer_tableCFs commands.
+
+  hbase> update_peer_config '1', DATA => { "key1" => 1 }
+  hbase> update_peer_config '2', CONFIG => { "config1" => "value1", "config2" =>
"value2" }
+  hbase> update_peer_config '3', DATA => { "key1" => 1 }, CONFIG => { "config1"
=> "value1", "config2" => "value2" },
+
+        EOF
+      end
+
+      def command(id, args = {})
+        format_simple_command do
+          replication_admin.update_peer_config(id, args)
+        end
+      end
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0f31ba6/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 8f08dc0..0c026d6 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -200,6 +200,31 @@ module Hbase
       replication_admin.remove_peer(peer_id_second)
     end
 
+    define_test "update_peer_config: can update peer config and data" do
+      repl_impl = "org.apache.hadoop.hbase.replication.ReplicationEndpointForTest"
+      config_params = { "config1" => "value1", "config2" => "value2" }
+      data_params = {"data1" => "value1", "data2" => "value2"}
+      args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA =>
data_params}
+      replication_admin.add_peer(@peer_id, args)
+
+      #Normally the ReplicationSourceManager will call ReplicationPeer#peer_added, but here
we have to do it ourselves
+      replication_admin.peer_added(@peer_id)
+
+      new_config_params = { "config1" => "new_value1" }
+      new_data_params = {"data1" => "new_value1"}
+      new_args = {CONFIG => new_config_params, DATA => new_data_params}
+      replication_admin.update_peer_config(@peer_id, new_args)
+
+      #Make sure the updated key/value pairs in config and data were successfully updated,
and that those we didn't
+      #update are still there and unchanged
+      peer_config = replication_admin.get_peer_config(@peer_id)
+      replication_admin.remove_peer(@peer_id)
+      assert_equal("new_value1", peer_config.get_configuration.get("config1"))
+      assert_equal("value2", peer_config.get_configuration.get("config2"))
+      assert_equal("new_value1", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data1"))))
+      assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2"))))
+
+    end
     # assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279
     # Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below.
     # define_test "add_peer: adding a second peer with same id should error" do


Mime
View raw message