hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1379229 - in /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver: Replication.java ReplicationSink.java
Date Thu, 30 Aug 2012 23:39:16 GMT
Author: larsh
Date: Thu Aug 30 23:39:15 2012
New Revision: 1379229

URL: http://svn.apache.org/viewvc?rev=1379229&view=rev
Log:
HBASE-6550 Refactoring ReplicationSink to make it more responsive of cluster health (Himanshu
Vashishtha)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1379229&r1=1379228&r2=1379229&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
Thu Aug 30 23:39:15 2012
@@ -126,6 +126,7 @@ public class Replication implements WALA
   public void join() {
     if (this.replication) {
       this.replicationManager.join();
+      this.replicationSink.stopReplicationSinkServices();
     }
   }
 

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1379229&r1=1379228&r2=1379229&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
Thu Aug 30 23:39:15 2012
@@ -22,15 +22,20 @@ package org.apache.hadoop.hbase.replicat
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.Stoppable;
 
 import java.io.IOException;
@@ -38,6 +43,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This class is responsible for replicating the edits coming
@@ -59,8 +68,8 @@ public class ReplicationSink {
   // Name of the HDFS directory that contains the temporary rep logs
   public static final String REPLICATION_LOG_DIR = ".replogs";
   private final Configuration conf;
-  // Pool used to replicated
-  private final HTablePool pool;
+  private final ExecutorService sharedThreadPool;
+  private final HConnection sharedHtableCon;
   private final ReplicationSinkMetrics metrics;
 
   /**
@@ -72,13 +81,29 @@ public class ReplicationSink {
    */
   public ReplicationSink(Configuration conf, Stoppable stopper)
       throws IOException {
-    this.conf = conf;
-    this.pool = new HTablePool(this.conf,
-        conf.getInt("replication.sink.htablepool.capacity", 10));
+    this.conf = HBaseConfiguration.create(conf);
+    decorateConf();
+    this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
+    this.sharedThreadPool = new ThreadPoolExecutor(1, 
+        conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE), 
+        conf.getLong("hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-repl"));
+    ((ThreadPoolExecutor)this.sharedThreadPool).allowCoreThreadTimeOut(true);
     this.metrics = new ReplicationSinkMetrics();
   }
 
   /**
+   * decorate the Configuration object to make replication more receptive to
+   * delays: lessen the timeout and numTries.
+   */
+  private void decorateConf() {
+    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+        this.conf.getInt("replication.sink.client.retries.number", 1));
+    this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        this.conf.getInt("replication.sink.client.ops.timeout", 20));
+  }
+
+  /**
    * Replicate this array of entries directly into the local cluster
    * using the native client.
    *
@@ -157,6 +182,26 @@ public class ReplicationSink {
   }
 
   /**
+   * stop the thread pool executor. It is called when the regionserver is stopped.
+   */
+  public void stopReplicationSinkServices() {
+    try {
+      this.sharedThreadPool.shutdown();
+      if (!this.sharedThreadPool.awaitTermination(60000, TimeUnit.MILLISECONDS)) {
+        this.sharedThreadPool.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while closing the table pool", e); // ignoring it as we are closing.
+      Thread.currentThread().interrupt();
+    }
+    try {
+      this.sharedHtableCon.close();
+    } catch (IOException e) {
+      LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
+    }
+  }  
+
+  /**
    * Do the changes and handle the pool
    * @param tableName table to insert into
    * @param rows list of actions
@@ -168,7 +213,7 @@ public class ReplicationSink {
     }
     HTableInterface table = null;
     try {
-      table = this.pool.getTable(tableName);
+      table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool);
       table.batch(rows);
       this.metrics.appliedOpsRate.inc(rows.size());
     } catch (InterruptedException ix) {



Mime
View raw message