Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2E7CB1091E for ; Thu, 17 Oct 2013 18:10:42 +0000 (UTC) Received: (qmail 27805 invoked by uid 500); 17 Oct 2013 18:10:42 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 27662 invoked by uid 500); 17 Oct 2013 18:10:41 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 27655 invoked by uid 99); 17 Oct 2013 18:10:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Oct 2013 18:10:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Oct 2013 18:10:37 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9622F2388A29; Thu, 17 Oct 2013 18:10:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1533185 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/coprocessor/ test/java/org/apache/hadoop/hbase/coprocessor/ Date: Thu, 17 Oct 2013 18:10:16 -0000 To: commits@hbase.apache.org From: jyates@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131017181016.9622F2388A29@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jyates Date: Thu Oct 17 18:10:16 2013 New Revision: 1533185 URL: http://svn.apache.org/r1533185 Log: HBASE-9749: Custom threadpool for Coprocessor obtained HTables Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java?rev=1533185&r1=1533184&r2=1533185&view=diff ============================================================================== --- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java (original) +++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java Thu Oct 17 18:10:16 2013 @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTableInterface; @@ -49,4 +50,11 @@ public interface CoprocessorEnvironment * @throws IOException */ public HTableInterface getTable(byte[] tableName) throws IOException; -} + + /** + * @return an interface for accessing the given table using the passed executor to run batch + * operations + * @throws IOException + */ + public HTableInterface getTable(byte[] tableName, ExecutorService service) throws IOException; +} \ No newline at end of file Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1533185&r1=1533184&r2=1533185&view=diff ============================================================================== --- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Oct 17 18:10:16 2013 @@ -175,7 +175,7 @@ public class HTable implements HTableInt this.finishSetup(); } - private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { + public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1533185&r1=1533184&r2=1533185&view=diff ============================================================================== --- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original) +++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Thu Oct 17 18:10:16 2013 @@ -31,6 +31,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -363,9 +364,10 @@ public abstract class CoprocessorHost(), + Threads.newDaemonThreadFactory("hbase-table")); + pool.allowCoreThreadTimeOut(true); + return pool; + } + + @Override + public void prePut(ObserverContext e, Put put, WALEdit edit, + boolean writeToWAL) throws IOException { + HTableInterface table = e.getEnvironment().getTable(otherTable, getPool()); + Put p = new Put(new byte[] { 'a' }); + p.add(family, null, new byte[] { 'a' }); + try { + table.batch(Collections.singletonList(put)); + } catch (InterruptedException e1) { + throw new IOException(e1); + } + completedWithPool[0] = true; + table.close(); + } + } + + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setupCluster() throws Exception { + UTIL.startMiniCluster(); + } + + @After + public void cleanupTestTable() throws Exception { + UTIL.getHBaseAdmin().disableTable(primaryTable); + UTIL.getHBaseAdmin().deleteTable(primaryTable); + + UTIL.getHBaseAdmin().disableTable(otherTable); + UTIL.getHBaseAdmin().deleteTable(otherTable); + + } + + @AfterClass + public static void teardownCluster() throws Exception{ + UTIL.shutdownMiniCluster(); + } + @Test public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable { - HBaseTestingUtility UTIL = new HBaseTestingUtility(); - HTableDescriptor primary = new HTableDescriptor("primary"); + runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed); + } + + @Test + public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable { + runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool); + } + + private void runCoprocessorConnectionToRemoteTable(Class clazz, + boolean[] completeCheck) throws Throwable { + HTableDescriptor primary = new HTableDescriptor(primaryTable); primary.addFamily(new HColumnDescriptor(family)); // add our coprocessor - primary.addCoprocessor(SendToOtherTableCoprocessor.class.getName()); + primary.addCoprocessor(clazz.getName()); HTableDescriptor other = new HTableDescriptor(otherTable); other.addFamily(new HColumnDescriptor(family)); - UTIL.startMiniCluster(); + HBaseAdmin admin = UTIL.getHBaseAdmin(); admin.createTable(primary); admin.createTable(other); - admin.close(); HTable table = new HTable(UTIL.getConfiguration(), "primary"); Put p = new Put(new byte[] { 'a' }); @@ -96,11 +168,10 @@ public class TestOpenTableInCoprocessor table.close(); HTable target = new HTable(UTIL.getConfiguration(), otherTable); - assertTrue("Didn't complete update to target table!", completed); + assertTrue("Didn't complete update to target table!", completeCheck[0]); assertEquals("Didn't find inserted row", 1, getKeyValueCount(target)); target.close(); - UTIL.shutdownMiniCluster(); } /**