hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jya...@apache.org
Subject svn commit: r1533187 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ hbase-server/src/test/java/org/apache...
Date Thu, 17 Oct 2013 18:13:18 GMT
Author: jyates
Date: Thu Oct 17 18:13:18 2013
New Revision: 1533187

URL: http://svn.apache.org/r1533187
Log:
HBASE-9749: Custom threadpool for Coprocessor obtained HTables

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java?rev=1533187&r1=1533186&r2=1533187&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
(original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
Thu Oct 17 18:13:18 2013
@@ -16,6 +16,7 @@
 package org.apache.hadoop.hbase;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -50,4 +51,11 @@ public interface CoprocessorEnvironment 
    * @throws IOException
    */
   HTableInterface getTable(TableName tableName) throws IOException;
+
+  /**
+   * @return an interface for accessing the given table using the passed executor to run
batch
+   *         operations
+   * @throws IOException
+   */
+  public HTableInterface getTable(TableName tableName, ExecutorService service) throws IOException;
 }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1533187&r1=1533186&r2=1533187&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu
Oct 17 18:13:18 2013
@@ -210,7 +210,7 @@ public class HTable implements HTableInt
     this.finishSetup();
   }
    
-  private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
+  public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
     if (maxThreads == 0) {
       maxThreads = 1; // is there a better default?

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1533187&r1=1533186&r2=1533187&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
Thu Oct 17 18:13:18 2013
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -376,9 +377,10 @@ public abstract class CoprocessorHost<E 
       private HTable table;
       private HConnection connection;
 
-      public HTableWrapper(TableName tableName, HConnection connection) throws IOException
{
+      public HTableWrapper(TableName tableName, HConnection connection, ExecutorService pool)
+          throws IOException {
         this.tableName = tableName;
-        this.table = new HTable(tableName, connection);
+        this.table = new HTable(tableName, connection, pool);
         this.connection = connection;
         openTables.add(this);
       }
@@ -709,7 +711,19 @@ public abstract class CoprocessorHost<E 
      */
     @Override
     public HTableInterface getTable(TableName tableName) throws IOException {
-      return new HTableWrapper(tableName, CoprocessorHConnection.getConnectionForEnvironment(this));
+      return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
+    }
+
+    /**
+     * Open a table from within the Coprocessor environment
+     * @param tableName the table name
+     * @return an interface for manipulating the table
+     * @exception java.io.IOException Exception
+     */
+    @Override
+    public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException
{
+      return new HTableWrapper(tableName, CoprocessorHConnection.getConnectionForEnvironment(this),
+          pool);
     }
   }
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java?rev=1533187&r1=1533186&r2=1533187&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
Thu Oct 17 18:13:18 2013
@@ -22,12 +22,17 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
@@ -37,7 +42,10 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -47,53 +55,113 @@ import org.junit.experimental.categories
 @Category(MediumTests.class)
 public class TestOpenTableInCoprocessor {
 
-  private static final TableName otherTable =
-      TableName.valueOf("otherTable");
+  private static final TableName otherTable = TableName.valueOf("otherTable");
+  private static final TableName primaryTable = TableName.valueOf("primary");
   private static final byte[] family = new byte[] { 'f' };
 
-  private static boolean completed = false;
-
+  private static boolean[] completed = new boolean[1];
   /**
    * Custom coprocessor that just copies the write to another table.
    */
   public static class SendToOtherTableCoprocessor extends BaseRegionObserver {
 
     @Override
-    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit
edit,
-        final Durability durability) throws IOException {
+    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final
Put put,
+        final WALEdit edit, final Durability durability) throws IOException {
       HTableInterface table = e.getEnvironment().getTable(otherTable);
       Put p = new Put(new byte[] { 'a' });
       p.add(family, null, new byte[] { 'a' });
       table.put(put);
       table.flushCommits();
-      completed = true;
+      completed[0] = true;
+      table.close();
+    }
+
+  }
+
+  private static boolean[] completedWithPool = new boolean[1];
+  /**
+   * Coprocessor that creates an HTable with a pool to write to another table
+   */
+  public static class CustomThreadPoolCoprocessor extends BaseRegionObserver {
+
+    /**
+     * Get a pool that has only ever one thread. A second action added to the pool (running
+     * concurrently), will cause an exception.
+     * @return
+     */
+    private ExecutorService getPool() {
+      int maxThreads = 1;
+      long keepAliveTime = 60;
+      ThreadPoolExecutor pool =
+          new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
+              new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-table"));
+      pool.allowCoreThreadTimeOut(true);
+      return pool;
+    }
+
+    @Override
+    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final
Put put,
+        final WALEdit edit, final Durability durability) throws IOException {
+      HTableInterface table = e.getEnvironment().getTable(otherTable, getPool());
+      Put p = new Put(new byte[] { 'a' });
+      p.add(family, null, new byte[] { 'a' });
+      try {
+        table.batch(Collections.singletonList(put));
+      } catch (InterruptedException e1) {
+        throw new IOException(e1);
+      }
+      completedWithPool[0] = true;
       table.close();
     }
+  }
+
+  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    UTIL.startMiniCluster();
   }
 
-  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  @After
+  public void cleanupTestTable() throws Exception {
+    UTIL.getHBaseAdmin().disableTable(primaryTable);
+    UTIL.getHBaseAdmin().deleteTable(primaryTable);
+
+    UTIL.getHBaseAdmin().disableTable(otherTable);
+    UTIL.getHBaseAdmin().deleteTable(otherTable);
+
+  }
 
   @AfterClass
-  public static void cleanup() throws Exception {
-    UTIL.getHBaseAdmin().close();
+  public static void teardownCluster() throws Exception {
+    UTIL.shutdownMiniCluster();
   }
 
   @Test
   public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable {
-    HTableDescriptor primary = new HTableDescriptor(TableName.valueOf("primary"));
+    runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed);
+  }
+
+  @Test
+  public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable
{
+    runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool);
+  }
+
+  private void runCoprocessorConnectionToRemoteTable(Class<? extends BaseRegionObserver>
clazz,
+      boolean[] completeCheck) throws Throwable {
+    HTableDescriptor primary = new HTableDescriptor(primaryTable);
     primary.addFamily(new HColumnDescriptor(family));
     // add our coprocessor
-    primary.addCoprocessor(SendToOtherTableCoprocessor.class.getName());
+    primary.addCoprocessor(clazz.getName());
 
     HTableDescriptor other = new HTableDescriptor(otherTable);
     other.addFamily(new HColumnDescriptor(family));
-    UTIL.startMiniCluster();
+
 
     HBaseAdmin admin = UTIL.getHBaseAdmin();
     admin.createTable(primary);
     admin.createTable(other);
-    admin.close();
 
     HTable table = new HTable(UTIL.getConfiguration(), "primary");
     Put p = new Put(new byte[] { 'a' });
@@ -103,11 +171,9 @@ public class TestOpenTableInCoprocessor 
     table.close();
 
     HTable target = new HTable(UTIL.getConfiguration(), otherTable);
-    assertTrue("Didn't complete update to target table!", completed);
+    assertTrue("Didn't complete update to target table!", completeCheck[0]);
     assertEquals("Didn't find inserted row", 1, getKeyValueCount(target));
     target.close();
-
-    UTIL.shutdownMiniCluster();
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java?rev=1533187&r1=1533186&r2=1533187&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
Thu Oct 17 18:13:18 2013
@@ -28,6 +28,7 @@ import java.security.PrivilegedException
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -215,6 +216,12 @@ public class TestTokenAuthentication {
         @Override
         public HTableInterface getTable(TableName tableName) throws IOException
           { return null; }
+
+        @Override
+        public HTableInterface getTable(TableName tableName, ExecutorService service)
+            throws IOException {
+          return null;
+        }
       });
 
       started = true;



Mime
View raw message