hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject git commit: HBASE-11920 Add CP hooks for ReplicationEndPoint
Date Thu, 25 Sep 2014 16:42:41 GMT
Repository: hbase
Updated Branches:
  refs/heads/master a2e05b9f8 -> 44a27c5cd


HBASE-11920 Add CP hooks for ReplicationEndPoint


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/44a27c5c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/44a27c5c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/44a27c5c

Branch: refs/heads/master
Commit: 44a27c5cd76f44e435671c69d1d8f60c42a2b420
Parents: a2e05b9
Author: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Authored: Thu Sep 25 22:11:28 2014 +0530
Committer: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Committed: Thu Sep 25 22:11:28 2014 +0530

----------------------------------------------------------------------
 .../BaseMasterAndRegionObserver.java            | 14 +++----
 .../coprocessor/BaseRegionServerObserver.java   |  7 ++++
 .../hbase/coprocessor/RegionServerObserver.java | 10 +++++
 .../RegionServerCoprocessorHost.java            | 34 +++++++++++++++++
 .../regionserver/ReplicationSourceManager.java  | 40 +++++++++++++-------
 .../hbase/security/access/AccessController.java | 11 +++++-
 6 files changed, 94 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java
index 768481d..a6b9d84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java
@@ -19,23 +19,23 @@
 
 package org.apache.hadoop.hbase.coprocessor;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
 
-import java.io.IOException;
-import java.util.List;
-
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
 @InterfaceStability.Evolving
 public abstract class BaseMasterAndRegionObserver extends BaseRegionObserver

http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java
index 5bc23d3..c21cdf8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 
 /**
  * An abstract class that implements RegionServerObserver.
@@ -76,4 +77,10 @@ public class BaseRegionServerObserver implements RegionServerObserver {
   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment>
ctx)
       throws IOException { }
 
+  @Override
+  public ReplicationEndpoint postCreateReplicationEndPoint(
+      ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint
endpoint) {
+    return endpoint;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
index 8a76d46..5c07fd2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.MetaMutationAnnotation;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 
 public interface RegionServerObserver extends Coprocessor {
 
@@ -121,4 +122,13 @@ public interface RegionServerObserver extends Coprocessor {
   void postRollWALWriterRequest(final ObserverContext<RegionServerCoprocessorEnvironment>
ctx)
       throws IOException;
 
+  /**
+   * This will be called after the replication endpoint is instantiated.
+   * @param ctx
+   * @param endpoint - the base endpoint for replication
+   * @return the endpoint to use during replication.
+   */
+  ReplicationEndpoint postCreateReplicationEndPoint(
+      ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint
endpoint);
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
index 54552c6..ec44560 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
 @InterfaceStability.Evolving
@@ -156,6 +157,27 @@ public class RegionServerCoprocessorHost extends
     });
   }
 
+  public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
+      throws IOException {
+    return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
+        : new CoprocessOperationWithResult<ReplicationEndpoint>() {
+          @Override
+          public void call(RegionServerObserver oserver,
+              ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException
{
+            setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
+          }
+        });
+  }
+
+  private <T> T execOperationWithResult(final T defaultValue,
+      final CoprocessOperationWithResult<T> ctx) throws IOException {
+    if (ctx == null)
+      return defaultValue;
+    ctx.setResult(defaultValue);
+    execOperation(ctx);
+    return ctx.getResult();
+  }
+
   private static abstract class CoprocessorOperation
       extends ObserverContext<RegionServerCoprocessorEnvironment> {
     public CoprocessorOperation() {
@@ -168,6 +190,18 @@ public class RegionServerCoprocessorHost extends
     }
   }
 
+  private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation
{
+    private T result = null;
+
+    public void setResult(final T result) {
+      this.result = result;
+    }
+
+    public T getResult() {
+      return this.result;
+    }
+  }
+
   private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
     if (ctx == null) return false;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a2f1667..cb0f6ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -39,11 +39,13 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
@@ -84,7 +86,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   // UUID for this cluster
   private final UUID clusterId;
   // All about stopping
-  private final Stoppable stopper;
+  private final Server server;
   // All logs we are currently tracking
   private final Map<String, SortedSet<String>> hlogsById;
   // Logs for recovered sources we are currently tracking
@@ -111,7 +113,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param replicationPeers
    * @param replicationTracker
    * @param conf the configuration to use
-   * @param stopper the stopper object for this region server
+   * @param server the server for this region server
    * @param fs the file system to use
    * @param logDir the directory that contains all hlog directories of live RSs
    * @param oldLogDir the directory where old logs are archived
@@ -119,7 +121,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
-      final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path
logDir,
+      final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
       final Path oldLogDir, final UUID clusterId) {
     //CopyOnWriteArrayList is thread-safe.
     //Generally, reading is more than modifying.
@@ -127,7 +129,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.replicationQueues = replicationQueues;
     this.replicationPeers = replicationPeers;
     this.replicationTracker = replicationTracker;
-    this.stopper = stopper;
+    this.server = server;
     this.hlogsById = new HashMap<String, SortedSet<String>>();
     this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@@ -243,7 +245,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     ReplicationPeer peer = replicationPeers.getPeer(id);
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
-          this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer);
+          this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
     synchronized (this.hlogsById) {
       this.sources.add(src);
       this.hlogsById.put(id, new TreeSet<String>());
@@ -257,7 +259,7 @@ public class ReplicationSourceManager implements ReplicationListener {
           String message =
               "Cannot add log to queue when creating a new source, queueId="
                   + src.getPeerClusterZnode() + ", filename=" + name;
-          stopper.stop(message);
+          server.stop(message);
           throw e;
         }
         src.enqueueLog(this.latestPath);
@@ -359,7 +361,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param conf the configuration to use
    * @param fs the file system to use
    * @param manager the manager to use
-   * @param stopper the stopper object for this region server
+   * @param server the server object for this region server
    * @param peerId the id of the peer cluster
    * @return the created source
    * @throws IOException
@@ -367,9 +369,13 @@ public class ReplicationSourceManager implements ReplicationListener
{
   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
       final FileSystem fs, final ReplicationSourceManager manager,
       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
-      final Stoppable stopper, final String peerId, final UUID clusterId,
+      final Server server, final String peerId, final UUID clusterId,
       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
           throws IOException {
+    RegionServerCoprocessorHost rsServerHost = null;
+    if (server instanceof HRegionServer) {
+      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
+    }
     ReplicationSourceInterface src;
     try {
       @SuppressWarnings("rawtypes")
@@ -392,6 +398,14 @@ public class ReplicationSourceManager implements ReplicationListener
{
       @SuppressWarnings("rawtypes")
       Class c = Class.forName(replicationEndpointImpl);
       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
+      if(rsServerHost != null) {
+        ReplicationEndpoint newReplicationEndPoint = rsServerHost
+            .postCreateReplicationEndPoint(replicationEndpoint);
+        if(newReplicationEndPoint != null) {
+          // Override the newly created endpoint from the hook with configured end point
+          replicationEndpoint = newReplicationEndPoint;
+        }
+      }
     } catch (Exception e) {
       LOG.warn("Passed replication endpoint implementation throws errors", e);
       throw new IOException(e);
@@ -399,7 +413,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
     MetricsSource metrics = new MetricsSource(peerId);
     // init replication source
-    src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId,
+    src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
       clusterId, replicationEndpoint, metrics);
 
     // init replication endpoint
@@ -542,7 +556,7 @@ public class ReplicationSourceManager implements ReplicationListener {
         Thread.currentThread().interrupt();
       }
       // We try to lock that rs' queue directory
-      if (stopper.isStopped()) {
+      if (server.isStopped()) {
         LOG.info("Not transferring queue since we are shutting down");
         return;
       }
@@ -578,7 +592,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
           ReplicationSourceInterface src =
               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
-                stopper, peerId, this.clusterId, peerConfig, peer);
+                server, peerId, this.clusterId, peerConfig, peer);
           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
             src.terminate("Recovered queue doesn't belong to any current peer");
             break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 95cd72a..96c912a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -30,7 +30,7 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -45,13 +45,13 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -2252,4 +2253,10 @@ public class AccessController extends BaseMasterAndRegionObserver
       final String namespace, final Quotas quotas) throws IOException {
     requirePermission("setNamespaceQuota", Action.ADMIN);
   }
+  
+  @Override
+  public ReplicationEndpoint postCreateReplicationEndPoint(
+      ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint
endpoint) {
+    return endpoint;
+  }
 }


Mime
View raw message