hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jg...@apache.org
Subject svn commit: r1176935 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/replication/regionserver/
Date Wed, 28 Sep 2011 15:56:22 GMT
Author: jgray
Date: Wed Sep 28 15:56:22 2011
New Revision: 1176935

URL: http://svn.apache.org/viewvc?rev=1176935&view=rev
Log:
HBASE-4131  Make the Replication Service pluggable via a standard interface definition (dhruba
via jgray)

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1176935&r1=1176934&r2=1176935&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Sep 28 15:56:22 2011
@@ -6,6 +6,8 @@ Release 0.93.0 - Unreleased
    HBASE-4461  Expose getRowOrBefore via Thrift (jgray)
    HBASE-4433  avoid extra next (potentially a seek) if done with column/row
                (kannan via jgray)
+   HBASE-4131  Make the Replication Service pluggable via a standard
+               interface definition (dhruba via jgray)
 
   BUG FIXES
    HBASE-4488  Store could miss rows during flush (Lars H via jgray)

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1176935&r1=1176934&r2=1176935&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Sep 28 15:56:22
2011
@@ -473,8 +473,17 @@ public final class HConstants {
    */
   public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000;
 
+  /*
+   * cluster replication constants.
+   */
   public static final String
       REPLICATION_ENABLE_KEY = "hbase.replication";
+  public static final String 
+      REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
+  public static final String 
+      REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
+  public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
+    "org.apache.hadoop.hbase.replication.regionserver.Replication";
 
   /** HBCK special code name used as server name when manipulating ZK nodes */
   public static final String HBCK_CODE_NAME = "HBCKServerName";

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1176935&r1=1176934&r2=1176935&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed
Sep 28 15:56:22 2011
@@ -54,6 +54,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.hbase.ClockOutOfSyncException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -285,7 +286,8 @@ public class HRegionServer implements HR
   private ExecutorService service;
 
   // Replication services. If no replication, this handler will be null.
-  private Replication replicationHandler;
+  private ReplicationSourceService replicationSourceHandler;
+  private ReplicationSinkService replicationSinkHandler;
 
   private final RegionServerAccounting regionServerAccounting;
 
@@ -1177,12 +1179,7 @@ public class HRegionServer implements HR
 
     // Instantiate replication manager if replication enabled.  Pass it the
     // log directories.
-    try {
-      this.replicationHandler = Replication.isReplication(this.conf)?
-        new Replication(this, this.fs, logdir, oldLogDir): null;
-    } catch (KeeperException e) {
-      throw new IOException("Failed replication handler create", e);
-    }
+    createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
     return instantiateHLog(logdir, oldLogDir);
   }
 
@@ -1209,9 +1206,10 @@ public class HRegionServer implements HR
     // Log roller.
     this.hlogRoller = new LogRoller(this, this);
     listeners.add(this.hlogRoller);
-    if (this.replicationHandler != null) {
+    if (this.replicationSourceHandler != null &&
+        this.replicationSourceHandler.getWALActionsListener() != null) {
       // Replication handler is an implementation of WALActionsListener.
-      listeners.add(this.replicationHandler);
+      listeners.add(this.replicationSourceHandler.getWALActionsListener());
     }
     return listeners;
   }
@@ -1359,8 +1357,13 @@ public class HRegionServer implements HR
     // that port is occupied. Adjust serverInfo if this is the case.
     this.webuiport = putUpWebUI();
 
-    if (this.replicationHandler != null) {
-      this.replicationHandler.startReplicationServices();
+    if (this.replicationSourceHandler == this.replicationSinkHandler &&
+        this.replicationSourceHandler != null) {
+      this.replicationSourceHandler.startReplicationService();
+    } else if (this.replicationSourceHandler != null) {
+      this.replicationSourceHandler.startReplicationService();
+    } else if (this.replicationSinkHandler != null) {
+      this.replicationSinkHandler.startReplicationService();
     }
 
     // Start Server.  This service is like leases in that it internally runs
@@ -1570,12 +1573,33 @@ public class HRegionServer implements HR
       this.compactSplitThread.join();
     }
     if (this.service != null) this.service.shutdown();
-    if (this.replicationHandler != null) {
-      this.replicationHandler.join();
+    if (this.replicationSourceHandler != null &&
+        this.replicationSourceHandler == this.replicationSinkHandler) {
+      this.replicationSourceHandler.stopReplicationService();
+    } else if (this.replicationSourceHandler != null) {
+      this.replicationSourceHandler.stopReplicationService();
+    } else if (this.replicationSinkHandler != null) {
+      this.replicationSinkHandler.stopReplicationService();
     }
   }
 
   /**
+   * @return Return the object that implements the replication
+   * source service.
+   */
+  ReplicationSourceService getReplicationSourceService() {
+    return replicationSourceHandler;
+  }
+
+  /**
+   * @return Return the object that implements the replication
+   * sink service.
+   */
+  ReplicationSinkService getReplicationSinkService() {
+    return replicationSinkHandler;
+  }
+
+  /**
    * Get the current master from ZooKeeper and open the RPC connection to it.
    *
    * Method will block until a master is available. You can break from this
@@ -3077,6 +3101,63 @@ public class HRegionServer implements HR
   //
 
   /**
+   * Load the replication service objects, if any
+   */
+  static private void createNewReplicationInstance(Configuration conf,
+    HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
+
+    // If replication is not enabled, then return immediately.
+    if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
+      return;
+    }
+
+    // read in the name of the source replication class from the config file.
+    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
+                               HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+    
+    // read in the name of the sink replication class from the config file.
+    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
+                             HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+    // If both the sink and the source class names are the same, then instantiate
+    // only one object.
+    if (sourceClassname.equals(sinkClassname)) {
+      server.replicationSourceHandler = (ReplicationSourceService)
+                                         newReplicationInstance(sourceClassname,
+                                         conf, server, fs, logDir, oldLogDir);
+      server.replicationSinkHandler = (ReplicationSinkService)
+                                         server.replicationSourceHandler;
+    }
+    else {
+      server.replicationSourceHandler = (ReplicationSourceService)
+                                         newReplicationInstance(sourceClassname,
+                                         conf, server, fs, logDir, oldLogDir);
+      server.replicationSinkHandler = (ReplicationSinkService)
+                                         newReplicationInstance(sinkClassname,
+                                         conf, server, fs, logDir, oldLogDir);
+    }
+  }
+
+  static private ReplicationService newReplicationInstance(String classname,
+    Configuration conf, HRegionServer server, FileSystem fs, Path logDir, 
+    Path oldLogDir) throws IOException{
+
+    Class<?> clazz = null;
+    try {
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      clazz = Class.forName(classname, true, classLoader);
+    } catch (java.lang.ClassNotFoundException nfe) {
+      throw new IOException("Cound not find class for " + classname);
+    }
+
+    // create an instance of the replication object.
+    ReplicationService service = (ReplicationService)
+                              ReflectionUtils.newInstance(clazz, conf);
+    service.initialize(server, fs, logDir, oldLogDir);
+    return service;
+  }
+
+  /**
    * @param hrs
    * @return Thread the RegionServer is running in correctly named.
    * @throws IOException
@@ -3129,8 +3210,8 @@ public class HRegionServer implements HR
   public void replicateLogEntries(final HLog.Entry[] entries)
   throws IOException {
     checkOpen();
-    if (this.replicationHandler == null) return;
-    this.replicationHandler.replicateLogEntries(entries);
+    if (this.replicationSinkHandler == null) return;
+    this.replicationSinkHandler.replicateLogEntries(entries);
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java?rev=1176935&r1=1176934&r2=1176935&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
Wed Sep 28 15:56:22 2011
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Gateway to Cluster Replication.  
+ * Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
+ * One such application is a cross-datacenter
+ * replication service that can keep two hbase clusters in sync.
+ */
+public interface ReplicationService {
+
+  /**
+   * Initializes the replication service object.
+   * @throws IOException
+   */
+  public void initialize(Server rs, FileSystem fs, Path logdir,
+                         Path oldLogDir) throws IOException;
+
+  /**
+   * Start replication services.
+   * @throws IOException
+   */
+  public void startReplicationService() throws IOException;
+
+  /**
+   * Stops replication service.
+   */
+  public void stopReplicationService();
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java?rev=1176935&r1=1176934&r2=1176935&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
Wed Sep 28 15:56:22 2011
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+/**
+ * A sink for a replication stream has to expose this service.
+ * This service allows an application to hook into the
+ * regionserver and behave as a replication sink.
+ */
+public interface ReplicationSinkService extends ReplicationService {
+
+ /**
+   * Carry on the list of log entries down to the sink
+   * @param entries list of entries to replicate
+   * @throws IOException
+   */
+  public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java?rev=1176935&r1=1176934&r2=1176935&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
Wed Sep 28 15:56:22 2011
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+
+/**
+ * A source for a replication stream has to expose this service.
+ * This service allows an application to hook into the
+ * regionserver and watch for new transactions.
+ */
+public interface ReplicationSourceService extends ReplicationService {
+
+  /**
+   * Returns a WALObserver for the service. This is needed to 
+   * observe log rolls and log archival events.
+   */
+  public WALActionsListener getWALActionsListener();
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1176935&r1=1176934&r2=1176935&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
Wed Sep 28 15:56:22 2011
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -47,15 +49,16 @@ import static org.apache.hadoop.hbase.HC
 /**
  * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
  */
-public class Replication implements WALActionsListener {
-  private final boolean replication;
-  private final ReplicationSourceManager replicationManager;
+public class Replication implements WALActionsListener, 
+  ReplicationSourceService, ReplicationSinkService {
+  private boolean replication;
+  private ReplicationSourceManager replicationManager;
   private final AtomicBoolean replicating = new AtomicBoolean(true);
-  private final ReplicationZookeeper zkHelper;
-  private final Configuration conf;
+  private ReplicationZookeeper zkHelper;
+  private Configuration conf;
   private ReplicationSink replicationSink;
   // Hosting server
-  private final Server server;
+  private Server server;
 
   /**
    * Instantiate the replication management (if rep is enabled).
@@ -64,16 +67,30 @@ public class Replication implements WALA
    * @param logDir
    * @param oldLogDir directory where logs are archived
    * @throws IOException
-   * @throws KeeperException 
    */
   public Replication(final Server server, final FileSystem fs,
-      final Path logDir, final Path oldLogDir)
-  throws IOException, KeeperException {
+      final Path logDir, final Path oldLogDir) throws IOException{
+    initialize(server, fs, logDir, oldLogDir);
+  }
+
+  /**
+   * Empty constructor
+   */
+  public Replication() {
+  }
+
+  public void initialize(final Server server, final FileSystem fs,
+      final Path logDir, final Path oldLogDir) throws IOException {
     this.server = server;
     this.conf = this.server.getConfiguration();
     this.replication = isReplication(this.conf);
     if (replication) {
-      this.zkHelper = new ReplicationZookeeper(server, this.replicating);
+      try {
+        this.zkHelper = new ReplicationZookeeper(server, this.replicating);
+      } catch (KeeperException ke) {
+        throw new IOException("Failed replication handler create " +
+           "(replicating=" + this.replicating, ke);
+      }
       this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
           this.server, fs, this.replicating, logDir, oldLogDir) ;
     } else {
@@ -82,14 +99,27 @@ public class Replication implements WALA
     }
   }
 
-  /**
-   * @param c Configuration to look at
-   * @return True if replication is enabled.
-   */
+   /**
+    * @param c Configuration to look at
+    * @return True if replication is enabled.
+    */
   public static boolean isReplication(final Configuration c) {
     return c.getBoolean(REPLICATION_ENABLE_KEY, false);
   }
 
+   /*
+    * Returns an object to listen to new hlog changes
+    **/
+  public WALActionsListener getWALActionsListener() {
+    return this;
+  }
+  /**
+   * Stops replication service.
+   */
+  public void stopReplicationService() {
+    join();
+  }
+
   /**
    * Join with the replication threads
    */
@@ -115,7 +145,7 @@ public class Replication implements WALA
    * it starts
    * @throws IOException
    */
-  public void startReplicationServices() throws IOException {
+  public void startReplicationService() throws IOException {
     if (this.replication) {
       this.replicationManager.init();
       this.replicationSink = new ReplicationSink(this.conf, this.server);



Mime
View raw message