hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1379227 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver: Replication.java ReplicationSink.java
Date Thu, 30 Aug 2012 23:37:23 GMT
Author: larsh
Date: Thu Aug 30 23:37:23 2012
New Revision: 1379227

URL: http://svn.apache.org/viewvc?rev=1379227&view=rev
Log:
HBASE-6550 Refactoring ReplicationSink to make it more responsive of cluster health (Himanshu
Vashishtha)

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1379227&r1=1379226&r2=1379227&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
Thu Aug 30 23:37:23 2012
@@ -128,6 +128,7 @@ public class Replication implements WALA
   public void join() {
     if (this.replication) {
       this.replicationManager.join();
+      this.replicationSink.stopReplicationSinkServices();
     }
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1379227&r1=1379226&r2=1379227&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
Thu Aug 30 23:37:23 2012
@@ -23,11 +23,16 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -41,6 +46,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This class is responsible for replicating the edits coming
@@ -63,8 +72,8 @@ public class ReplicationSink {
   // Name of the HDFS directory that contains the temporary rep logs
   public static final String REPLICATION_LOG_DIR = ".replogs";
   private final Configuration conf;
-  // Pool used to replicated
-  private final HTablePool pool;
+  private final ExecutorService sharedThreadPool;
+  private final HConnection sharedHtableCon;
   private final ReplicationSinkMetrics metrics;
 
   /**
@@ -76,13 +85,29 @@ public class ReplicationSink {
    */
   public ReplicationSink(Configuration conf, Stoppable stopper)
       throws IOException {
-    this.conf = conf;
-    this.pool = new HTablePool(this.conf,
-        conf.getInt("replication.sink.htablepool.capacity", 10));
+    this.conf = HBaseConfiguration.create(conf);
+    decorateConf();
     this.metrics = new ReplicationSinkMetrics();
+    this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
+    this.sharedThreadPool = new ThreadPoolExecutor(1,
+        conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE),
+        conf.getLong("hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(), new DaemonThreadFactory("hbase-repl"));
+    ((ThreadPoolExecutor) this.sharedThreadPool).allowCoreThreadTimeOut(true);
   }
 
   /**
+   * decorate the Configuration object to make replication more receptive to delays:
+   * lessen the timeout and numTries.
+   */
+  private void decorateConf() {
+    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+        this.conf.getInt("replication.sink.client.retries.number", 1));
+    this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        this.conf.getInt("replication.sink.client.ops.timeout", 20));
+   }
+
+  /**
    * Replicate this array of entries directly into the local cluster
    * using the native client.
    *
@@ -161,6 +186,27 @@ public class ReplicationSink {
   }
 
   /**
+   * stop the thread pool executor. It is called when the regionserver is stopped.
+   */
+  public void stopReplicationSinkServices() {
+    try {
+      this.sharedThreadPool.shutdown();
+      if (!this.sharedThreadPool.awaitTermination(60000, TimeUnit.MILLISECONDS)) {
+        this.sharedThreadPool.shutdownNow();
+      }
+    }  catch (InterruptedException e) {
+      LOG.warn("Interrupted while closing the table pool", e); // ignoring it as we are closing.
+      Thread.currentThread().interrupt();
+    }
+    try {
+      this.sharedHtableCon.close();
+    } catch (IOException e) {
+      LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
+    }
+  }
+
+
+  /**
    * Do the changes and handle the pool
    * @param tableName table to insert into
    * @param rows list of actions
@@ -172,7 +218,7 @@ public class ReplicationSink {
     }
     HTableInterface table = null;
     try {
-      table = this.pool.getTable(tableName);
+      table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool);
       table.batch(rows);
     } catch (InterruptedException ix) {
       throw new IOException(ix);



Mime
View raw message