hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1387310 [1/3] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/thrift/ main/java/org/apache/hadoop/hbase/thrift/generated/ main/java/org/apache/hadoop/hbase/util/ main/reso...
Date Tue, 18 Sep 2012 18:39:55 GMT
Author: mbautin
Date: Tue Sep 18 18:39:54 2012
New Revision: 1387310

URL: http://svn.apache.org/viewvc?rev=1387310&view=rev
Log:
[jira] [HBASE-6807] [89-fb] Add region name to most HBase Thrift calls and use it to bypass
client code

Author: mbautin

Summary: This is an enhancement for Liyin's diff D562373 that adds region name to a few HBase
Thrift API functions. As I started testing this with hbc, I realized that the few calls that
were overridden in the original diff were not triggered by any of hbc's standard commands.
I tested them through the auto-generated Python client and that worked fine, but end-to-end
(including hbc) is needed. Therefore, I am adding the regionName parameter to most HBase Thrift
API methods except scanner-related methods and checkAnd* methods. I am also removing the copy-paste
optimizations in HRegionThriftServer which perform region lookup by row unconditionally. Those
should be unnecessary now that the region name is passed from the client.

Test Plan:
Compile hbc with D564078 and issue put/get commands from the command line.

Connect to the regionserver with a debugger and verify that we are hitting the optimized path
and not going through HTable.

Reviewers: kranganathan, kannan, pritam, aaiyer, avf, liyintang

Reviewed By: liyintang

CC: ajoyfrank, hbase-eng@, davejwatson

Differential Revision: https://phabricator.fb.com/D568325

Task ID: 1430675

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/IOError.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/IOError.java.new
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/TRowResult.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftMutationAPI.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerLegacy.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1387310&r1=1387309&r2=1387310&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
Tue Sep 18 18:39:54 2012
@@ -28,18 +28,16 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
 import org.apache.hadoop.hbase.thrift.ThriftServerRunner;
-import org.apache.hadoop.hbase.thrift.ThriftUtilities;
 import org.apache.hadoop.hbase.thrift.generated.Hbase;
 import org.apache.hadoop.hbase.thrift.generated.IOError;
 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
-import org.apache.hadoop.hbase.thrift.generated.TRowResult;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.thrift.TException;
@@ -139,81 +137,79 @@ public class HRegionThriftServer extends
     }
 
     /**
-     * Get a record. Shortcircuit to get better performance.
+     * Process a get request. If the region name is set, using the shortcircuit optimization.
      */
     @Override
-    public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row,
-                                                List<ByteBuffer> columns,
-                                                long timestamp)
-      throws IOError {
-      try {
-        HTable table = getTable(tableName);
-        byte[] rowBytes = Bytes.getBytes(row);
-        HRegionLocation location = table.getRegionLocation(rowBytes);
-        byte[] regionName = location.getRegionInfo().getRegionName();
+    protected Result processGet(ByteBuffer tableName, ByteBuffer regionName, Get get)
+        throws IOException, IOError {
+      if (Bytes.isNonEmpty(regionName)) {
+        metrics.incDirectCalls();
+        return rs.get(Bytes.getBytes(regionName), get);
+      } else {
+        metrics.incIndirectCalls();
+        return super.processGet(tableName, regionName, get);
+      }
+    }
 
-        if (columns == null) {
-          Get get = new Get(rowBytes);
-          get.setTimeRange(Long.MIN_VALUE, timestamp);
-          Result result = rs.get(regionName, get);
-          return ThriftUtilities.rowResultFromHBase(result);
-        }
-        byte[][] columnArr = columns.toArray(new byte[columns.size()][]);
-        Get get = new Get(rowBytes);
-        for (byte[] column : columnArr) {
-          byte[][] famAndQf = KeyValue.parseColumn(column);
-          if (famAndQf.length == 1) {
-            get.addFamily(famAndQf[0]);
-          } else {
-            get.addColumn(famAndQf[0], famAndQf[1]);
-          }
-        }
-        get.setTimeRange(Long.MIN_VALUE, timestamp);
-        Result result = rs.get(regionName, get);
-        return ThriftUtilities.rowResultFromHBase(result);
-      } catch (NotServingRegionException e) {
-        if (!redirect) {
-          throw new IOError(e.getMessage(), 0);
-        }
-        LOG.info("ThriftServer redirecting getRowWithColumnsTs");
-        return super.getRowWithColumnsTs(tableName, row, columns, timestamp);
-      } catch (IOException e) {
-        throw new IOError(e.getMessage(), 0);
+    /**
+     * Process a put request. If the region name is set, using the shortcircuit optimization.
+     */
+    @Override
+    protected void processPut(ByteBuffer tableName, ByteBuffer regionName, Put put)
+        throws IOException, IOError {
+      if (Bytes.isNonEmpty(regionName)) {
+        metrics.incDirectCalls();
+        rs.put(Bytes.getBytes(regionName), put);
+      } else {
+        metrics.incIndirectCalls();
+        super.processPut(tableName, regionName, put);
       }
     }
 
+    /**
+     * Process a delete request. If the region name is set, using the shortcircuit optimization.
+     */
     @Override
-    public List<TRowResult> getRowWithColumnPrefix(ByteBuffer tableName,
-        ByteBuffer row, ByteBuffer prefix) throws IOError {
-      return (getRowWithColumnPrefixTs(tableName, row, prefix,
-          HConstants.LATEST_TIMESTAMP));
+    protected void processDelete(ByteBuffer tableName, ByteBuffer regionName, Delete delete)
+        throws IOException, IOError {
+      if (Bytes.isNonEmpty(regionName)) {
+        metrics.incDirectCalls();
+        rs.delete(Bytes.getBytes(regionName), delete);
+      } else {
+        metrics.incIndirectCalls();
+        super.processDelete(tableName, regionName, delete);
+      }
     }
 
+    /**
+     * Process the multiGet requests. If the region name is set, using the shortcircuit 
+     * optimization
+     */
     @Override
-    public List<TRowResult> getRowWithColumnPrefixTs(ByteBuffer tableName,
-        ByteBuffer row, ByteBuffer prefix, long timestamp) throws IOError {
-      try {
-        HTable table = getTable(tableName);
-        byte[] rowBytes = Bytes.getBytes(row);
-        if (prefix == null) {
-          Get get = new Get(rowBytes);
-          get.setTimeRange(Long.MIN_VALUE, timestamp);
-          Result result = table.get(get);
-          return ThriftUtilities.rowResultFromHBase(result);
-        }
-        Get get = new Get(rowBytes);
-        byte[][] famAndPrefix = KeyValue.parseColumn(Bytes.getBytes(prefix));
-        if (famAndPrefix.length == 2) {
-          get.addFamily(famAndPrefix[0]);
-          get.setFilter(new ColumnPrefixFilter(famAndPrefix[1]));
-        } else {
-          get.setFilter(new ColumnPrefixFilter(famAndPrefix[0]));
-        }
-        get.setTimeRange(Long.MIN_VALUE, timestamp);
-        Result result = table.get(get);
-        return ThriftUtilities.rowResultFromHBase(result);
-      } catch (IOException e) {
-        throw new IOError(e.getMessage(), 0);
+    protected Result[] processMultiGet(ByteBuffer tableName, ByteBuffer regionName, List<Get>
gets)
+        throws IOException, IOError {
+      if (Bytes.isNonEmpty(regionName)) {
+        metrics.incDirectCalls();
+        return rs.get(Bytes.getBytes(regionName), gets);
+      } else {
+        metrics.incIndirectCalls();
+        return super.processMultiGet(tableName, regionName, gets);
+      }
+    }
+
+    /**
+     * Process the multiPut requests. If the region name is set, using the shortcircuit
+     * optimization
+     */
+    @Override
+    protected void processMultiPut(ByteBuffer tableName, ByteBuffer regionName, List<Put>
puts)
+        throws IOException, IOError {
+      if (Bytes.isNonEmpty(regionName)) {
+        metrics.incDirectCalls();
+        rs.put(Bytes.getBytes(regionName), puts);
+      } else {
+        metrics.incIndirectCalls();
+        super.processMultiPut(tableName, regionName, puts);
       }
     }
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java?rev=1387310&r1=1387309&r2=1387310&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
Tue Sep 18 18:39:54 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.metrics.Updater
 import org.apache.hadoop.metrics.util.MetricsBase;
 import org.apache.hadoop.metrics.util.MetricsIntValue;
 import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 
 /**
@@ -59,6 +60,8 @@ public class ThriftMetrics implements Up
       new MetricsTimeVaryingRate("numBatchGetRowKeys", registry);
   private final MetricsTimeVaryingRate numBatchMutateRowKeys =
       new MetricsTimeVaryingRate("numBatchMutateRowKeys", registry);
+  private final MetricsTimeVaryingRate numMultiPutRowKeys =
+      new MetricsTimeVaryingRate("numMultiPutRowKeys", registry);
   private final MetricsTimeVaryingRate timeInQueue =
       new MetricsTimeVaryingRate("timeInQueue", registry);
   private MetricsTimeVaryingRate thriftCall =
@@ -66,7 +69,21 @@ public class ThriftMetrics implements Up
   private MetricsTimeVaryingRate slowThriftCall =
       new MetricsTimeVaryingRate("slowThriftCall", registry);
 
-   private final ThriftMBean mbean;
+  /**
+   * Number of calls that go through HTable. Only updated in the embedded Thrift server.
This only
+   * counts API functions that were capable of accepting a region name from the client and
skipping
+   * the client logic, but the region name was not passed.
+   */
+  private MetricsTimeVaryingInt indirectCalls =
+      new MetricsTimeVaryingInt("indirectThriftCalls", registry);
+
+  /**
+   * Number of direct calls to HRegion (preferred way). Only updated in the embedded Thrift
server.
+   */
+  private MetricsTimeVaryingInt directCalls =
+      new MetricsTimeVaryingInt("directThriftCalls", registry);
+
+  private final ThriftMBean mbean;
   
   public ThriftMetrics(int port, Configuration conf, Class<?> iface) {
     slowResponseTime = conf.getLong(
@@ -113,6 +130,18 @@ public class ThriftMetrics implements Up
     numBatchMutateRowKeys.inc(diff);
   }
 
+  public void incNumMultiPutRowKeys(int diff) {
+    this.numMultiPutRowKeys.inc(diff);
+  }
+
+  public final void incIndirectCalls() {
+    this.indirectCalls.inc();
+  }
+
+  public final void incDirectCalls() {
+    this.directCalls.inc();
+  }
+
   public void incMethodTime(String name, long time) {
     MetricsTimeVaryingRate methodTimeMetric = getMethodTimeMetrics(name);
     if (methodTimeMetric == null) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1387310&r1=1387309&r2=1387310&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
Tue Sep 18 18:39:54 2012
@@ -424,7 +424,7 @@ public class ThriftServerRunner implemen
     protected int nextScannerId = 0;
     protected HashMap<Integer, ResultScanner> scannerMap = null;
 
-    private ThriftMetrics metrics;
+    protected ThriftMetrics metrics;
 
     private static ThreadLocal<Map<String, HTable>> threadLocalTables =
         new ThreadLocal<Map<String, HTable>>() {
@@ -462,7 +462,7 @@ public class ThriftServerRunner implemen
      */
     protected HTable getTable(final byte[] tableName) throws IOError,
         IOException {
-      String table = new String(tableName);
+      String table = Bytes.toString(tableName);
       Map<String, HTable> tables = threadLocalTables.get();
       if (!tables.containsKey(table)) {
         tables.put(table, new HTable(conf, tableName));
@@ -625,25 +625,19 @@ public class ThriftServerRunner implemen
 
     @Deprecated
     @Override
-    public List<TCell> get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column)
-        throws IOError {
+    public List<TCell> get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+        ByteBuffer regionName) throws IOError {
       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
-      byte[] tableNameBytes = getBytes(tableName);
-      byte[] rowBytes = getBytes(row);
-      return get(tableNameBytes, rowBytes, famAndQf[0], getQualifier(famAndQf));
-    }
-
-    public List<TCell> get(byte [] tableName, byte [] row, byte [] family,
-        byte [] qualifier) throws IOError {
+      byte[] family = famAndQf[0];
+      byte[] qualifier = getQualifier(famAndQf);
       try {
-        HTable table = getTable(tableName);
-        Get get = new Get(row);
+        Get get = new Get(getBytes(row));
         if (qualifier == null || qualifier.length == 0) {
           get.addFamily(family);
         } else {
           get.addColumn(family, qualifier);
         }
-        Result result = table.get(get);
+        Result result = processGet(tableName, regionName, get);
         return ThriftUtilities.cellFromHBase(result.sorted());
       } catch (IOException e) {
         throw convertIOException(e);
@@ -653,20 +647,15 @@ public class ThriftServerRunner implemen
     @Deprecated
     @Override
     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
-        ByteBuffer column, int numVersions) throws IOError {
+        ByteBuffer column, int numVersions, ByteBuffer regionName) throws IOError {
       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
-      return getVer(getBytes(tableName), getBytes(row), famAndQf[0],
-          getQualifier(famAndQf), numVersions);
-    }
-
-    public List<TCell> getVer(byte [] tableName, byte [] row, byte [] family,
-        byte [] qualifier, int numVersions) throws IOError {
+      byte[] family = famAndQf[0];
+      byte[] qualifier = getQualifier(famAndQf);
       try {
-        HTable table = getTable(tableName);
-        Get get = new Get(row);
+        Get get = new Get(getBytes(row));
         get.addColumn(family, qualifier);
         get.setMaxVersions(numVersions);
-        Result result = table.get(get);
+        Result result = processGet(tableName, regionName, get);
         return ThriftUtilities.cellFromHBase(result.sorted());
       } catch (IOException e) {
         throw convertIOException(e);
@@ -676,22 +665,17 @@ public class ThriftServerRunner implemen
     @Override
     @Deprecated
     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row,
-        ByteBuffer column, long timestamp, int numVersions) throws IOError {
+        ByteBuffer column, long timestamp, int numVersions,
+        ByteBuffer regionName) throws IOError {
       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
-      return getVerTs(getBytes(tableName), getBytes(row),
-          famAndQf[0], getQualifier(famAndQf),
-          timestamp, numVersions);
-    }
-
-    public List<TCell> getVerTs(byte [] tableName, byte [] row, byte [] family,
-        byte [] qualifier, long timestamp, int numVersions) throws IOError {
+      byte[] family = famAndQf[0];
+      byte[] qualifier = getQualifier(famAndQf);
       try {
-        HTable table = getTable(tableName);
-        Get get = new Get(row);
+        Get get = new Get(getBytes(row));
         get.addColumn(family, qualifier);
         get.setTimeRange(Long.MIN_VALUE, timestamp);
         get.setMaxVersions(numVersions);
-        Result result = table.get(get);
+        Result result = processGet(tableName, regionName, get);
         return ThriftUtilities.cellFromHBase(result.sorted());
       } catch (IOException e) {
         throw convertIOException(e);
@@ -699,36 +683,36 @@ public class ThriftServerRunner implemen
     }
 
     @Override
-    public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row)
+    public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row, ByteBuffer
regionName)
         throws IOError {
       return getRowWithColumnsTs(tableName, row, null,
-                                 HConstants.LATEST_TIMESTAMP);
+                                 HConstants.LATEST_TIMESTAMP, regionName);
     }
 
     @Override
     public List<TRowResult> getRowWithColumns(ByteBuffer tableName, ByteBuffer row,
-        List<ByteBuffer> columns) throws IOError {
+        List<ByteBuffer> columns, ByteBuffer regionName) throws IOError {
       return getRowWithColumnsTs(tableName, row, columns,
-                                 HConstants.LATEST_TIMESTAMP);
+                                 HConstants.LATEST_TIMESTAMP, regionName);
     }
 
     @Override
     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
-        long timestamp) throws IOError {
+        long timestamp, ByteBuffer regionName) throws IOError {
       return getRowWithColumnsTs(tableName, row, null,
-                                 timestamp);
+                                 timestamp, regionName);
     }
 
     @Override
     public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row,
-        List<ByteBuffer> columns, long timestamp) throws IOError {
+        List<ByteBuffer> columns, long timestamp, ByteBuffer regionName) throws IOError
{
       try {
         HTable table = getTable(tableName);
         byte[] rowBytes = getBytes(row);
         if (columns == null) {
           Get get = new Get(rowBytes);
           get.setTimeRange(Long.MIN_VALUE, timestamp);
-          Result result = table.get(get);
+          Result result = processGet(tableName, regionName, get);
           return ThriftUtilities.rowResultFromHBase(result);
         }
         Get get = new Get(rowBytes);
@@ -741,7 +725,7 @@ public class ThriftServerRunner implemen
           }
         }
         get.setTimeRange(Long.MIN_VALUE, timestamp);
-        Result result = table.get(get);
+        Result result = processGet(tableName, regionName, get);
         return ThriftUtilities.rowResultFromHBase(result);
       } catch (IOException e) {
         throw convertIOException(e);
@@ -750,23 +734,23 @@ public class ThriftServerRunner implemen
 
     @Override
     public List<TRowResult> getRowWithColumnPrefix(ByteBuffer tableName, ByteBuffer
row,
-        ByteBuffer prefix) throws IOError {
+        ByteBuffer prefix, ByteBuffer regionName) throws IOError {
       return (getRowWithColumnPrefixTs(tableName, row, prefix,
-                                       HConstants.LATEST_TIMESTAMP));
+                                       HConstants.LATEST_TIMESTAMP, regionName));
     }
 
     @Override
     public List<TRowResult> getRowWithColumnPrefixTs(ByteBuffer tableName, ByteBuffer
row,
-        ByteBuffer prefix, long timestamp) throws IOError {
+        ByteBuffer prefix, long timestamp, ByteBuffer regionName) throws IOError {
       try {
-        HTable table = getTable(tableName);
+        byte[] rowBytes = getBytes(row);
         if (prefix == null) {
-          Get get = new Get(getBytes(row));
+          Get get = new Get(rowBytes);
           get.setTimeRange(Long.MIN_VALUE, timestamp);
-          Result result = table.get(get);
+          Result result = processGet(tableName, regionName, get);
           return ThriftUtilities.rowResultFromHBase(result);
         }
-        Get get = new Get(getBytes(row));
+        Get get = new Get(rowBytes);
         byte [][] famAndPrefix = KeyValue.parseColumn(getBytes(prefix));
         if (famAndPrefix.length == 2) {
           get.addFamily(famAndPrefix[0]);
@@ -775,7 +759,7 @@ public class ThriftServerRunner implemen
           get.setFilter(new ColumnPrefixFilter(famAndPrefix[0]));
         }
         get.setTimeRange(Long.MIN_VALUE, timestamp);
-        Result result = table.get(get);
+        Result result = processGet(tableName, regionName, get);
         return ThriftUtilities.rowResultFromHBase(result);
       } catch (IOException e) {
         throw convertIOException(e);
@@ -783,37 +767,39 @@ public class ThriftServerRunner implemen
     }
 
     @Override
-    public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column)
+    public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+        ByteBuffer regionName)
         throws IOError {
-      deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP);
+      deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP, regionName);
     }
 
     @Override
-    public List<TRowResult> getRows(ByteBuffer tableName, List<ByteBuffer> rows)
+    public List<TRowResult> getRows(ByteBuffer tableName, List<ByteBuffer> rows,
+        ByteBuffer regionName)
         throws IOError, TException {
       return getRowsWithColumnsTs(tableName, rows, null,
-          HConstants.LATEST_TIMESTAMP);
+          HConstants.LATEST_TIMESTAMP, regionName);
     }
 
     @Override
     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
-        List<ByteBuffer> rows, List<ByteBuffer> columns) throws IOError {
+        List<ByteBuffer> rows, List<ByteBuffer> columns, ByteBuffer regionName)
throws IOError {
       return getRowsWithColumnsTs(tableName, rows, columns,
-          HConstants.LATEST_TIMESTAMP);
+          HConstants.LATEST_TIMESTAMP, regionName);
     }
 
     @Override
     public List<TRowResult> getRowsTs(ByteBuffer tableName, List<ByteBuffer>
rows,
-        long timestamp) throws IOError {
-      return getRowsWithColumnsTs(tableName, rows, null, timestamp);
+        long timestamp, ByteBuffer regionName) throws IOError {
+      return getRowsWithColumnsTs(tableName, rows, null, timestamp, regionName);
     }
 
     @Override
     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
-        List<ByteBuffer> rows, List<ByteBuffer> columns, long timestamp) throws
IOError {
+        List<ByteBuffer> rows, List<ByteBuffer> columns, long timestamp,
+        ByteBuffer regionName) throws IOError {
       try {
         List<Get> gets = new ArrayList<Get>(rows.size());
-        HTable table = getTable(tableName);
         if (metrics != null) {
           metrics.incNumBatchGetRowKeys(rows.size());
         }
@@ -835,7 +821,7 @@ public class ThriftServerRunner implemen
           }
           gets.add(get);
         }
-        Result[] result = table.get(gets);
+        Result[] result = processMultiGet(tableName, regionName, gets);
         return ThriftUtilities.rowResultFromHBase(result);
       } catch (IOException e) {
         throw convertIOException(e);
@@ -844,9 +830,8 @@ public class ThriftServerRunner implemen
 
     @Override
     public void deleteAllTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
-        long timestamp) throws IOError {
+        long timestamp, ByteBuffer regionName) throws IOError {
       try {
-        HTable table = getTable(tableName);
         Delete delete  = new Delete(getBytes(row));
         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
         if (famAndQf.length == 1) {
@@ -854,8 +839,7 @@ public class ThriftServerRunner implemen
         } else {
           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
         }
-        table.delete(delete);
-
+        processDelete(tableName, regionName, delete);
       } catch (IOException e) {
         throw convertIOException(e);
       }
@@ -863,16 +847,15 @@ public class ThriftServerRunner implemen
 
     @Override
     public void deleteAllRow(ByteBuffer tableName, ByteBuffer row,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP);
+        Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName) throws IOError
{
+      deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, regionName);
     }
 
-    public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp)
+    public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp,
+        ByteBuffer regionName)
         throws IOError {
       try {
-        HTable table = getTable(tableName);
-        Delete delete  = new Delete(getBytes(row), timestamp, null);
-        table.delete(delete);
+        processDelete(tableName, regionName, new Delete(getBytes(row), timestamp, null));
       } catch (IOException e) {
         throw convertIOException(e);
       }
@@ -918,17 +901,18 @@ public class ThriftServerRunner implemen
 
     @Override
     public void mutateRow(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument {
-      mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
+        Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName)
+    throws IOError, IllegalArgument {
+      mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes,
+          regionName);
     }
 
     @Override
     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
         List<Mutation> mutations, long timestamp,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument {
-      HTable table = null;
+        Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName)
+    throws IOError, IllegalArgument {
       try {
-        table = getTable(getBytes(tableName));
         byte[] rowBytes = getBytes(row);
         Put put = null;
         Delete delete = null;
@@ -967,11 +951,11 @@ public class ThriftServerRunner implemen
         }
         if (delete != null) {
           delete.setWriteToWAL(writeToWAL);
-          table.delete(delete);
+          processDelete(tableName, regionName, delete);
         }
         if (put != null) {
           put.setWriteToWAL(writeToWAL);
-          table.put(put);
+          processPut(tableName, regionName, put);
         }
       } catch (IOException e) {
         throw convertIOException(e);
@@ -982,15 +966,16 @@ public class ThriftServerRunner implemen
 
     @Override
     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
-        Map<ByteBuffer, ByteBuffer> attributes)
+        Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName)
         throws IOError, IllegalArgument, TException {
-      mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
+      mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes,
+          regionName);
     }
 
     @Override
     public void mutateRowsTs(
         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
-        Map<ByteBuffer, ByteBuffer> attributes)
+        Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName)
         throws IOError, IllegalArgument, TException {
       List<Put> puts = null;
       List<Delete> deletes = null;
@@ -1059,13 +1044,12 @@ public class ThriftServerRunner implemen
 
       HTable table = null;
       try {
-        table = getTable(tableName);
         if (puts != null) {
-          table.put(puts);
+          processMultiPut(tableName, regionName, puts);
         }
         if (deletes != null) {
           for (Delete del : deletes) {
-            table.delete(del);
+            processDelete(tableName, regionName, del);
           }
         }
       } catch (IOException e) {
@@ -1470,6 +1454,38 @@ public class ThriftServerRunner implemen
       }
     }
 
+    @Override
+    public void multiPut(ByteBuffer tableName, List<BatchMutation> tPuts, ByteBuffer
regionName)
+        throws IOError, IllegalArgument, TException {
+      if (metrics != null) {
+        metrics.incNumMultiPutRowKeys(tPuts.size());
+      }
+      List<Put> puts = new ArrayList<Put>(tPuts.size());
+      try {
+        for (BatchMutation batch : tPuts) {
+          byte[] row = getBytes(batch.row);
+          List<Mutation> mutations = batch.mutations;
+          Put put = null;
+          for (Mutation m : mutations) {
+            byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
+            put = new Put(row);
+
+            if (famAndQf.length == 1) {
+              put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, m.getTimestamp(),
+                  getBytes(m.value));
+            } else {
+              put.add(famAndQf[0], famAndQf[1], m.getTimestamp(), getBytes(m.value));
+            }
+          }
+          puts.add(put);
+        }
+
+        processMultiPut(tableName, regionName, puts);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage(), 0);
+      }
+    }
+
     void initMetrics(ThriftMetrics metrics) {
       this.metrics = metrics;
     }
@@ -1485,6 +1501,33 @@ public class ThriftServerRunner implemen
         long timestamp) throws TException {
       throw new TException("Not implemented");
     }
+
+    protected Result processGet(ByteBuffer tableName, ByteBuffer regionName, Get get)
+        throws IOException, IOError {
+      return getTable(tableName).get(get);
+    }
+
+    protected void processPut(ByteBuffer tableName, ByteBuffer regionName, Put put)
+        throws IOException, IOError {
+      getTable(tableName).put(put);
+    }
+
+    protected void processDelete(ByteBuffer tableName, ByteBuffer regionName, Delete delete)
+        throws IOException, IOError {
+      getTable(tableName).delete(delete);
+    }
+
+    protected Result[] processMultiGet(ByteBuffer tableName, ByteBuffer regionName, List<Get>
gets)
+        throws IOException, IOError {
+      HTable table = getTable(tableName);
+      return table.get(gets);
+    }
+
+    protected void processMultiPut(ByteBuffer tableName, ByteBuffer regionName, List<Put>
puts)
+        throws IOException, IOError {
+      HTable table = getTable(tableName);
+      table.put(puts);
+    }
   }
 
   public static void registerFilters(Configuration conf) {
@@ -1514,5 +1557,4 @@ public class ThriftServerRunner implemen
     }
     return HConstants.EMPTY_BYTE_ARRAY;
   }
-
 }



Mime
View raw message