hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jya...@apache.org
Subject svn commit: r1533185 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/coprocessor/ test/java/org/apache/hadoop/hbase/coprocessor/
Date Thu, 17 Oct 2013 18:10:16 GMT
Author: jyates
Date: Thu Oct 17 18:10:16 2013
New Revision: 1533185

URL: http://svn.apache.org/r1533185
Log:
HBASE-9749: Custom threadpool for Coprocessor obtained HTables

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java?rev=1533185&r1=1533184&r2=1533185&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
Thu Oct 17 18:10:16 2013
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hbase;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -49,4 +50,11 @@ public interface CoprocessorEnvironment 
    * @throws IOException
    */
   public HTableInterface getTable(byte[] tableName) throws IOException;
-}
+
+  /**
+   * @return an interface for accessing the given table using the passed executor to run
batch
+   *         operations
+   * @throws IOException
+   */
+  public HTableInterface getTable(byte[] tableName, ExecutorService service) throws IOException;
+}
\ No newline at end of file

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1533185&r1=1533184&r2=1533185&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Oct 17
18:10:16 2013
@@ -175,7 +175,7 @@ public class HTable implements HTableInt
     this.finishSetup();
   }
 
-  private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
+  public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
     if (maxThreads == 0) {
       maxThreads = 1; // is there a better default?

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1533185&r1=1533184&r2=1533185&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
Thu Oct 17 18:10:16 2013
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -363,9 +364,10 @@ public abstract class CoprocessorHost<E 
       private HTable table;
       private HConnection connection;
 
-      public HTableWrapper(byte[] tableName, HConnection connection) throws IOException {
+      public HTableWrapper(byte[] tableName, HConnection connection, ExecutorService executor)
+          throws IOException {
         this.tableName = tableName;
-        this.table = new HTable(tableName, connection);
+        this.table = new HTable(tableName, connection, executor);
         this.connection = connection;
         openTables.add(this);
       }
@@ -680,7 +682,19 @@ public abstract class CoprocessorHost<E 
      */
     @Override
     public HTableInterface getTable(byte[] tableName) throws IOException {
-      return new HTableWrapper(tableName, CoprocessorHConnection.getConnectionForEnvironment(this));
+      return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
+    }
+
+    /**
+     * Open a table from within the Coprocessor environment
+     * @param tableName the table name
+     * @return an interface for manipulating the table
+     * @exception java.io.IOException Exception
+     */
+    @Override
+    public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException
{
+      return new HTableWrapper(tableName, CoprocessorHConnection.getConnectionForEnvironment(this),
+          pool);
     }
   }
 

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java?rev=1533185&r1=1533184&r2=1533185&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
Thu Oct 17 18:10:16 2013
@@ -21,6 +21,11 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -33,11 +38,12 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -48,9 +54,10 @@ import org.junit.experimental.categories
 public class TestOpenTableInCoprocessor {
 
   private static final byte[] otherTable = Bytes.toBytes("otherTable");
+  private static final byte[] primaryTable = Bytes.toBytes("primary");
   private static final byte[] family = new byte[] { 'f' };
 
-  private static boolean completed = false;
+  private static boolean [] completed = new boolean[1];
 
   /**
    * Custom coprocessor that just copies the write to another table.
@@ -65,28 +72,93 @@ public class TestOpenTableInCoprocessor 
       p.add(family, null, new byte[] { 'a' });
       table.put(put);
       table.flushCommits();
-      completed = true;
+      completed[0] = true;
       table.close();
     }
 
   }
 
+  private static boolean []  completedWithPool = new boolean [1] ;
+
+  public static class CustomThreadPoolCoprocessor extends BaseRegionObserver {
+
+    /**
+     * Get a pool that has only ever one thread. A second action added to the pool (running
+     * concurrently), will cause an exception.
+     * @return
+     */
+    private ExecutorService getPool() {
+      int maxThreads = 1;
+      long keepAliveTime = 60;
+      ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
+          TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+          Threads.newDaemonThreadFactory("hbase-table"));
+      pool.allowCoreThreadTimeOut(true);
+      return pool;
+    }
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit
edit,
+        boolean writeToWAL) throws IOException {
+      HTableInterface table = e.getEnvironment().getTable(otherTable, getPool());
+      Put p = new Put(new byte[] { 'a' });
+      p.add(family, null, new byte[] { 'a' });
+      try {
+        table.batch(Collections.singletonList(put));
+      } catch (InterruptedException e1) {
+        throw new IOException(e1);
+      }
+      completedWithPool[0] = true;
+      table.close();
+    }
+  }
+
+  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    UTIL.startMiniCluster();
+  }
+
+  @After
+  public void cleanupTestTable() throws Exception {
+    UTIL.getHBaseAdmin().disableTable(primaryTable);
+    UTIL.getHBaseAdmin().deleteTable(primaryTable);
+
+    UTIL.getHBaseAdmin().disableTable(otherTable);
+    UTIL.getHBaseAdmin().deleteTable(otherTable);
+
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception{
+    UTIL.shutdownMiniCluster();
+  }
+
   @Test
   public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable {
-    HBaseTestingUtility UTIL = new HBaseTestingUtility();
-    HTableDescriptor primary = new HTableDescriptor("primary");
+    runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed);
+  }
+
+  @Test
+  public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable
{
+    runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool);
+  }
+
+  private void runCoprocessorConnectionToRemoteTable(Class<? extends BaseRegionObserver>
clazz,
+      boolean[] completeCheck) throws Throwable {
+    HTableDescriptor primary = new HTableDescriptor(primaryTable);
     primary.addFamily(new HColumnDescriptor(family));
     // add our coprocessor
-    primary.addCoprocessor(SendToOtherTableCoprocessor.class.getName());
+    primary.addCoprocessor(clazz.getName());
 
     HTableDescriptor other = new HTableDescriptor(otherTable);
     other.addFamily(new HColumnDescriptor(family));
-    UTIL.startMiniCluster();
+
 
     HBaseAdmin admin = UTIL.getHBaseAdmin();
     admin.createTable(primary);
     admin.createTable(other);
-    admin.close();
 
     HTable table = new HTable(UTIL.getConfiguration(), "primary");
     Put p = new Put(new byte[] { 'a' });
@@ -96,11 +168,10 @@ public class TestOpenTableInCoprocessor 
     table.close();
 
     HTable target = new HTable(UTIL.getConfiguration(), otherTable);
-    assertTrue("Didn't complete update to target table!", completed);
+    assertTrue("Didn't complete update to target table!", completeCheck[0]);
     assertEquals("Didn't find inserted row", 1, getKeyValueCount(target));
     target.close();
 
-    UTIL.shutdownMiniCluster();
   }
 
   /**



Mime
View raw message