Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C3855D439 for ; Tue, 18 Sep 2012 18:40:46 +0000 (UTC) Received: (qmail 52258 invoked by uid 500); 18 Sep 2012 18:40:46 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 52213 invoked by uid 500); 18 Sep 2012 18:40:46 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 52205 invoked by uid 99); 18 Sep 2012 18:40:46 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Sep 2012 18:40:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Sep 2012 18:40:40 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 378FB23888FE for ; Tue, 18 Sep 2012 18:39:56 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1387310 [1/3] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/thrift/ main/java/org/apache/hadoop/hbase/thrift/generated/ main/java/org/apache/hadoop/hbase/util/ main/reso... Date: Tue, 18 Sep 2012 18:39:55 -0000 To: commits@hbase.apache.org From: mbautin@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120918183956.378FB23888FE@eris.apache.org> Author: mbautin Date: Tue Sep 18 18:39:54 2012 New Revision: 1387310 URL: http://svn.apache.org/viewvc?rev=1387310&view=rev Log: [jira] [HBASE-6807] [89-fb] Add region name to most HBase Thrift calls and use it to bypass client code Author: mbautin Summary: This is an enhancement for Liyin's diff D562373 that adds region name to a few HBase Thrift API functions. As I started testing this with hbc, I realized that the few calls that were overridden in the original diff were not triggered by any of hbc's standard commands. I tested them through the auto-generated Python client and that worked fine, but end-to-end (including hbc) is needed. Therefore, I am adding the regionName parameter to most HBase Thrift API methods except scanner-related methods and checkAnd* methods. I am also removing the copy-paste optimizations in HRegionThriftServer which perform region lookup by row unconditionally. Those should be unnecessary now that the region name is passed from the client. Test Plan: Compile hbc with D564078 and issue put/get commands from the command line. Connect to the regionserver with a debugger and verify that we are hitting the optimized path and not going through HTable. Reviewers: kranganathan, kannan, pritam, aaiyer, avf, liyintang Reviewed By: liyintang CC: ajoyfrank, hbase-eng@, davejwatson Differential Revision: https://phabricator.fb.com/D568325 Task ID: 1430675 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/IOError.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/IOError.java.new hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/TRowResult.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftMutationAPI.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerLegacy.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1387310&r1=1387309&r2=1387310&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Tue Sep 18 18:39:54 2012 @@ -28,18 +28,16 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.thrift.ThriftServerRunner; -import org.apache.hadoop.hbase.thrift.ThriftUtilities; import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.thrift.generated.IOError; import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; -import org.apache.hadoop.hbase.thrift.generated.TRowResult; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HasThread; import org.apache.thrift.TException; @@ -139,81 +137,79 @@ public class HRegionThriftServer extends } /** - * Get a record. Shortcircuit to get better performance. + * Process a get request. If the region name is set, using the shortcircuit optimization. */ @Override - public List getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row, - List columns, - long timestamp) - throws IOError { - try { - HTable table = getTable(tableName); - byte[] rowBytes = Bytes.getBytes(row); - HRegionLocation location = table.getRegionLocation(rowBytes); - byte[] regionName = location.getRegionInfo().getRegionName(); + protected Result processGet(ByteBuffer tableName, ByteBuffer regionName, Get get) + throws IOException, IOError { + if (Bytes.isNonEmpty(regionName)) { + metrics.incDirectCalls(); + return rs.get(Bytes.getBytes(regionName), get); + } else { + metrics.incIndirectCalls(); + return super.processGet(tableName, regionName, get); + } + } - if (columns == null) { - Get get = new Get(rowBytes); - get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = rs.get(regionName, get); - return ThriftUtilities.rowResultFromHBase(result); - } - byte[][] columnArr = columns.toArray(new byte[columns.size()][]); - Get get = new Get(rowBytes); - for (byte[] column : columnArr) { - byte[][] famAndQf = KeyValue.parseColumn(column); - if (famAndQf.length == 1) { - get.addFamily(famAndQf[0]); - } else { - get.addColumn(famAndQf[0], famAndQf[1]); - } - } - get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = rs.get(regionName, get); - return ThriftUtilities.rowResultFromHBase(result); - } catch (NotServingRegionException e) { - if (!redirect) { - throw new IOError(e.getMessage(), 0); - } - LOG.info("ThriftServer redirecting getRowWithColumnsTs"); - return super.getRowWithColumnsTs(tableName, row, columns, timestamp); - } catch (IOException e) { - throw new IOError(e.getMessage(), 0); + /** + * Process a put request. If the region name is set, using the shortcircuit optimization. + */ + @Override + protected void processPut(ByteBuffer tableName, ByteBuffer regionName, Put put) + throws IOException, IOError { + if (Bytes.isNonEmpty(regionName)) { + metrics.incDirectCalls(); + rs.put(Bytes.getBytes(regionName), put); + } else { + metrics.incIndirectCalls(); + super.processPut(tableName, regionName, put); } } + /** + * Process a delete request. If the region name is set, using the shortcircuit optimization. + */ @Override - public List getRowWithColumnPrefix(ByteBuffer tableName, - ByteBuffer row, ByteBuffer prefix) throws IOError { - return (getRowWithColumnPrefixTs(tableName, row, prefix, - HConstants.LATEST_TIMESTAMP)); + protected void processDelete(ByteBuffer tableName, ByteBuffer regionName, Delete delete) + throws IOException, IOError { + if (Bytes.isNonEmpty(regionName)) { + metrics.incDirectCalls(); + rs.delete(Bytes.getBytes(regionName), delete); + } else { + metrics.incIndirectCalls(); + super.processDelete(tableName, regionName, delete); + } } + /** + * Process the multiGet requests. If the region name is set, using the shortcircuit + * optimization + */ @Override - public List getRowWithColumnPrefixTs(ByteBuffer tableName, - ByteBuffer row, ByteBuffer prefix, long timestamp) throws IOError { - try { - HTable table = getTable(tableName); - byte[] rowBytes = Bytes.getBytes(row); - if (prefix == null) { - Get get = new Get(rowBytes); - get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = table.get(get); - return ThriftUtilities.rowResultFromHBase(result); - } - Get get = new Get(rowBytes); - byte[][] famAndPrefix = KeyValue.parseColumn(Bytes.getBytes(prefix)); - if (famAndPrefix.length == 2) { - get.addFamily(famAndPrefix[0]); - get.setFilter(new ColumnPrefixFilter(famAndPrefix[1])); - } else { - get.setFilter(new ColumnPrefixFilter(famAndPrefix[0])); - } - get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = table.get(get); - return ThriftUtilities.rowResultFromHBase(result); - } catch (IOException e) { - throw new IOError(e.getMessage(), 0); + protected Result[] processMultiGet(ByteBuffer tableName, ByteBuffer regionName, List gets) + throws IOException, IOError { + if (Bytes.isNonEmpty(regionName)) { + metrics.incDirectCalls(); + return rs.get(Bytes.getBytes(regionName), gets); + } else { + metrics.incIndirectCalls(); + return super.processMultiGet(tableName, regionName, gets); + } + } + + /** + * Process the multiPut requests. If the region name is set, using the shortcircuit + * optimization + */ + @Override + protected void processMultiPut(ByteBuffer tableName, ByteBuffer regionName, List puts) + throws IOException, IOError { + if (Bytes.isNonEmpty(regionName)) { + metrics.incDirectCalls(); + rs.put(Bytes.getBytes(regionName), puts); + } else { + metrics.incIndirectCalls(); + super.processMultiPut(tableName, regionName, puts); } } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java?rev=1387310&r1=1387309&r2=1387310&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java Tue Sep 18 18:39:54 2012 @@ -31,6 +31,7 @@ import org.apache.hadoop.metrics.Updater import org.apache.hadoop.metrics.util.MetricsBase; import org.apache.hadoop.metrics.util.MetricsIntValue; import org.apache.hadoop.metrics.util.MetricsRegistry; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt; import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; /** @@ -59,6 +60,8 @@ public class ThriftMetrics implements Up new MetricsTimeVaryingRate("numBatchGetRowKeys", registry); private final MetricsTimeVaryingRate numBatchMutateRowKeys = new MetricsTimeVaryingRate("numBatchMutateRowKeys", registry); + private final MetricsTimeVaryingRate numMultiPutRowKeys = + new MetricsTimeVaryingRate("numMultiPutRowKeys", registry); private final MetricsTimeVaryingRate timeInQueue = new MetricsTimeVaryingRate("timeInQueue", registry); private MetricsTimeVaryingRate thriftCall = @@ -66,7 +69,21 @@ public class ThriftMetrics implements Up private MetricsTimeVaryingRate slowThriftCall = new MetricsTimeVaryingRate("slowThriftCall", registry); - private final ThriftMBean mbean; + /** + * Number of calls that go through HTable. Only updated in the embedded Thrift server. This only + * counts API functions that were capable of accepting a region name from the client and skipping + * the client logic, but the region name was not passed. + */ + private MetricsTimeVaryingInt indirectCalls = + new MetricsTimeVaryingInt("indirectThriftCalls", registry); + + /** + * Number of direct calls to HRegion (preferred way). Only updated in the embedded Thrift server. + */ + private MetricsTimeVaryingInt directCalls = + new MetricsTimeVaryingInt("directThriftCalls", registry); + + private final ThriftMBean mbean; public ThriftMetrics(int port, Configuration conf, Class iface) { slowResponseTime = conf.getLong( @@ -113,6 +130,18 @@ public class ThriftMetrics implements Up numBatchMutateRowKeys.inc(diff); } + public void incNumMultiPutRowKeys(int diff) { + this.numMultiPutRowKeys.inc(diff); + } + + public final void incIndirectCalls() { + this.indirectCalls.inc(); + } + + public final void incDirectCalls() { + this.directCalls.inc(); + } + public void incMethodTime(String name, long time) { MetricsTimeVaryingRate methodTimeMetric = getMethodTimeMetrics(name); if (methodTimeMetric == null) { Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1387310&r1=1387309&r2=1387310&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java Tue Sep 18 18:39:54 2012 @@ -424,7 +424,7 @@ public class ThriftServerRunner implemen protected int nextScannerId = 0; protected HashMap scannerMap = null; - private ThriftMetrics metrics; + protected ThriftMetrics metrics; private static ThreadLocal> threadLocalTables = new ThreadLocal>() { @@ -462,7 +462,7 @@ public class ThriftServerRunner implemen */ protected HTable getTable(final byte[] tableName) throws IOError, IOException { - String table = new String(tableName); + String table = Bytes.toString(tableName); Map tables = threadLocalTables.get(); if (!tables.containsKey(table)) { tables.put(table, new HTable(conf, tableName)); @@ -625,25 +625,19 @@ public class ThriftServerRunner implemen @Deprecated @Override - public List get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column) - throws IOError { + public List get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, + ByteBuffer regionName) throws IOError { byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - byte[] tableNameBytes = getBytes(tableName); - byte[] rowBytes = getBytes(row); - return get(tableNameBytes, rowBytes, famAndQf[0], getQualifier(famAndQf)); - } - - public List get(byte [] tableName, byte [] row, byte [] family, - byte [] qualifier) throws IOError { + byte[] family = famAndQf[0]; + byte[] qualifier = getQualifier(famAndQf); try { - HTable table = getTable(tableName); - Get get = new Get(row); + Get get = new Get(getBytes(row)); if (qualifier == null || qualifier.length == 0) { get.addFamily(family); } else { get.addColumn(family, qualifier); } - Result result = table.get(get); + Result result = processGet(tableName, regionName, get); return ThriftUtilities.cellFromHBase(result.sorted()); } catch (IOException e) { throw convertIOException(e); @@ -653,20 +647,15 @@ public class ThriftServerRunner implemen @Deprecated @Override public List getVer(ByteBuffer tableName, ByteBuffer row, - ByteBuffer column, int numVersions) throws IOError { + ByteBuffer column, int numVersions, ByteBuffer regionName) throws IOError { byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - return getVer(getBytes(tableName), getBytes(row), famAndQf[0], - getQualifier(famAndQf), numVersions); - } - - public List getVer(byte [] tableName, byte [] row, byte [] family, - byte [] qualifier, int numVersions) throws IOError { + byte[] family = famAndQf[0]; + byte[] qualifier = getQualifier(famAndQf); try { - HTable table = getTable(tableName); - Get get = new Get(row); + Get get = new Get(getBytes(row)); get.addColumn(family, qualifier); get.setMaxVersions(numVersions); - Result result = table.get(get); + Result result = processGet(tableName, regionName, get); return ThriftUtilities.cellFromHBase(result.sorted()); } catch (IOException e) { throw convertIOException(e); @@ -676,22 +665,17 @@ public class ThriftServerRunner implemen @Override @Deprecated public List getVerTs(ByteBuffer tableName, ByteBuffer row, - ByteBuffer column, long timestamp, int numVersions) throws IOError { + ByteBuffer column, long timestamp, int numVersions, + ByteBuffer regionName) throws IOError { byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - return getVerTs(getBytes(tableName), getBytes(row), - famAndQf[0], getQualifier(famAndQf), - timestamp, numVersions); - } - - public List getVerTs(byte [] tableName, byte [] row, byte [] family, - byte [] qualifier, long timestamp, int numVersions) throws IOError { + byte[] family = famAndQf[0]; + byte[] qualifier = getQualifier(famAndQf); try { - HTable table = getTable(tableName); - Get get = new Get(row); + Get get = new Get(getBytes(row)); get.addColumn(family, qualifier); get.setTimeRange(Long.MIN_VALUE, timestamp); get.setMaxVersions(numVersions); - Result result = table.get(get); + Result result = processGet(tableName, regionName, get); return ThriftUtilities.cellFromHBase(result.sorted()); } catch (IOException e) { throw convertIOException(e); @@ -699,36 +683,36 @@ public class ThriftServerRunner implemen } @Override - public List getRow(ByteBuffer tableName, ByteBuffer row) + public List getRow(ByteBuffer tableName, ByteBuffer row, ByteBuffer regionName) throws IOError { return getRowWithColumnsTs(tableName, row, null, - HConstants.LATEST_TIMESTAMP); + HConstants.LATEST_TIMESTAMP, regionName); } @Override public List getRowWithColumns(ByteBuffer tableName, ByteBuffer row, - List columns) throws IOError { + List columns, ByteBuffer regionName) throws IOError { return getRowWithColumnsTs(tableName, row, columns, - HConstants.LATEST_TIMESTAMP); + HConstants.LATEST_TIMESTAMP, regionName); } @Override public List getRowTs(ByteBuffer tableName, ByteBuffer row, - long timestamp) throws IOError { + long timestamp, ByteBuffer regionName) throws IOError { return getRowWithColumnsTs(tableName, row, null, - timestamp); + timestamp, regionName); } @Override public List getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row, - List columns, long timestamp) throws IOError { + List columns, long timestamp, ByteBuffer regionName) throws IOError { try { HTable table = getTable(tableName); byte[] rowBytes = getBytes(row); if (columns == null) { Get get = new Get(rowBytes); get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = table.get(get); + Result result = processGet(tableName, regionName, get); return ThriftUtilities.rowResultFromHBase(result); } Get get = new Get(rowBytes); @@ -741,7 +725,7 @@ public class ThriftServerRunner implemen } } get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = table.get(get); + Result result = processGet(tableName, regionName, get); return ThriftUtilities.rowResultFromHBase(result); } catch (IOException e) { throw convertIOException(e); @@ -750,23 +734,23 @@ public class ThriftServerRunner implemen @Override public List getRowWithColumnPrefix(ByteBuffer tableName, ByteBuffer row, - ByteBuffer prefix) throws IOError { + ByteBuffer prefix, ByteBuffer regionName) throws IOError { return (getRowWithColumnPrefixTs(tableName, row, prefix, - HConstants.LATEST_TIMESTAMP)); + HConstants.LATEST_TIMESTAMP, regionName)); } @Override public List getRowWithColumnPrefixTs(ByteBuffer tableName, ByteBuffer row, - ByteBuffer prefix, long timestamp) throws IOError { + ByteBuffer prefix, long timestamp, ByteBuffer regionName) throws IOError { try { - HTable table = getTable(tableName); + byte[] rowBytes = getBytes(row); if (prefix == null) { - Get get = new Get(getBytes(row)); + Get get = new Get(rowBytes); get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = table.get(get); + Result result = processGet(tableName, regionName, get); return ThriftUtilities.rowResultFromHBase(result); } - Get get = new Get(getBytes(row)); + Get get = new Get(rowBytes); byte [][] famAndPrefix = KeyValue.parseColumn(getBytes(prefix)); if (famAndPrefix.length == 2) { get.addFamily(famAndPrefix[0]); @@ -775,7 +759,7 @@ public class ThriftServerRunner implemen get.setFilter(new ColumnPrefixFilter(famAndPrefix[0])); } get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = table.get(get); + Result result = processGet(tableName, regionName, get); return ThriftUtilities.rowResultFromHBase(result); } catch (IOException e) { throw convertIOException(e); @@ -783,37 +767,39 @@ public class ThriftServerRunner implemen } @Override - public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column) + public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, + ByteBuffer regionName) throws IOError { - deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP); + deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP, regionName); } @Override - public List getRows(ByteBuffer tableName, List rows) + public List getRows(ByteBuffer tableName, List rows, + ByteBuffer regionName) throws IOError, TException { return getRowsWithColumnsTs(tableName, rows, null, - HConstants.LATEST_TIMESTAMP); + HConstants.LATEST_TIMESTAMP, regionName); } @Override public List getRowsWithColumns(ByteBuffer tableName, - List rows, List columns) throws IOError { + List rows, List columns, ByteBuffer regionName) throws IOError { return getRowsWithColumnsTs(tableName, rows, columns, - HConstants.LATEST_TIMESTAMP); + HConstants.LATEST_TIMESTAMP, regionName); } @Override public List getRowsTs(ByteBuffer tableName, List rows, - long timestamp) throws IOError { - return getRowsWithColumnsTs(tableName, rows, null, timestamp); + long timestamp, ByteBuffer regionName) throws IOError { + return getRowsWithColumnsTs(tableName, rows, null, timestamp, regionName); } @Override public List getRowsWithColumnsTs(ByteBuffer tableName, - List rows, List columns, long timestamp) throws IOError { + List rows, List columns, long timestamp, + ByteBuffer regionName) throws IOError { try { List gets = new ArrayList(rows.size()); - HTable table = getTable(tableName); if (metrics != null) { metrics.incNumBatchGetRowKeys(rows.size()); } @@ -835,7 +821,7 @@ public class ThriftServerRunner implemen } gets.add(get); } - Result[] result = table.get(gets); + Result[] result = processMultiGet(tableName, regionName, gets); return ThriftUtilities.rowResultFromHBase(result); } catch (IOException e) { throw convertIOException(e); @@ -844,9 +830,8 @@ public class ThriftServerRunner implemen @Override public void deleteAllTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, - long timestamp) throws IOError { + long timestamp, ByteBuffer regionName) throws IOError { try { - HTable table = getTable(tableName); Delete delete = new Delete(getBytes(row)); byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); if (famAndQf.length == 1) { @@ -854,8 +839,7 @@ public class ThriftServerRunner implemen } else { delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); } - table.delete(delete); - + processDelete(tableName, regionName, delete); } catch (IOException e) { throw convertIOException(e); } @@ -863,16 +847,15 @@ public class ThriftServerRunner implemen @Override public void deleteAllRow(ByteBuffer tableName, ByteBuffer row, - Map attributes) throws IOError { - deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP); + Map attributes, ByteBuffer regionName) throws IOError { + deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, regionName); } - public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp) + public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp, + ByteBuffer regionName) throws IOError { try { - HTable table = getTable(tableName); - Delete delete = new Delete(getBytes(row), timestamp, null); - table.delete(delete); + processDelete(tableName, regionName, new Delete(getBytes(row), timestamp, null)); } catch (IOException e) { throw convertIOException(e); } @@ -918,17 +901,18 @@ public class ThriftServerRunner implemen @Override public void mutateRow(ByteBuffer tableName, ByteBuffer row, List mutations, - Map attributes) throws IOError, IllegalArgument { - mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes); + Map attributes, ByteBuffer regionName) + throws IOError, IllegalArgument { + mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes, + regionName); } @Override public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, List mutations, long timestamp, - Map attributes) throws IOError, IllegalArgument { - HTable table = null; + Map attributes, ByteBuffer regionName) + throws IOError, IllegalArgument { try { - table = getTable(getBytes(tableName)); byte[] rowBytes = getBytes(row); Put put = null; Delete delete = null; @@ -967,11 +951,11 @@ public class ThriftServerRunner implemen } if (delete != null) { delete.setWriteToWAL(writeToWAL); - table.delete(delete); + processDelete(tableName, regionName, delete); } if (put != null) { put.setWriteToWAL(writeToWAL); - table.put(put); + processPut(tableName, regionName, put); } } catch (IOException e) { throw convertIOException(e); @@ -982,15 +966,16 @@ public class ThriftServerRunner implemen @Override public void mutateRows(ByteBuffer tableName, List rowBatches, - Map attributes) + Map attributes, ByteBuffer regionName) throws IOError, IllegalArgument, TException { - mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes); + mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes, + regionName); } @Override public void mutateRowsTs( ByteBuffer tableName, List rowBatches, long timestamp, - Map attributes) + Map attributes, ByteBuffer regionName) throws IOError, IllegalArgument, TException { List puts = null; List deletes = null; @@ -1059,13 +1044,12 @@ public class ThriftServerRunner implemen HTable table = null; try { - table = getTable(tableName); if (puts != null) { - table.put(puts); + processMultiPut(tableName, regionName, puts); } if (deletes != null) { for (Delete del : deletes) { - table.delete(del); + processDelete(tableName, regionName, del); } } } catch (IOException e) { @@ -1470,6 +1454,38 @@ public class ThriftServerRunner implemen } } + @Override + public void multiPut(ByteBuffer tableName, List tPuts, ByteBuffer regionName) + throws IOError, IllegalArgument, TException { + if (metrics != null) { + metrics.incNumMultiPutRowKeys(tPuts.size()); + } + List puts = new ArrayList(tPuts.size()); + try { + for (BatchMutation batch : tPuts) { + byte[] row = getBytes(batch.row); + List mutations = batch.mutations; + Put put = null; + for (Mutation m : mutations) { + byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); + put = new Put(row); + + if (famAndQf.length == 1) { + put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, m.getTimestamp(), + getBytes(m.value)); + } else { + put.add(famAndQf[0], famAndQf[1], m.getTimestamp(), getBytes(m.value)); + } + } + puts.add(put); + } + + processMultiPut(tableName, regionName, puts); + } catch (IOException e) { + throw new IOError(e.getMessage(), 0); + } + } + void initMetrics(ThriftMetrics metrics) { this.metrics = metrics; } @@ -1485,6 +1501,33 @@ public class ThriftServerRunner implemen long timestamp) throws TException { throw new TException("Not implemented"); } + + protected Result processGet(ByteBuffer tableName, ByteBuffer regionName, Get get) + throws IOException, IOError { + return getTable(tableName).get(get); + } + + protected void processPut(ByteBuffer tableName, ByteBuffer regionName, Put put) + throws IOException, IOError { + getTable(tableName).put(put); + } + + protected void processDelete(ByteBuffer tableName, ByteBuffer regionName, Delete delete) + throws IOException, IOError { + getTable(tableName).delete(delete); + } + + protected Result[] processMultiGet(ByteBuffer tableName, ByteBuffer regionName, List gets) + throws IOException, IOError { + HTable table = getTable(tableName); + return table.get(gets); + } + + protected void processMultiPut(ByteBuffer tableName, ByteBuffer regionName, List puts) + throws IOException, IOError { + HTable table = getTable(tableName); + table.put(puts); + } } public static void registerFilters(Configuration conf) { @@ -1514,5 +1557,4 @@ public class ThriftServerRunner implemen } return HConstants.EMPTY_BYTE_ARRAY; } - }