hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1398175 [1/2] - in /hbase/trunk/hbase-server/src/main: java/org/apache/hadoop/hbase/client/coprocessor/ java/org/apache/hadoop/hbase/coprocessor/ java/org/apache/hadoop/hbase/protobuf/generated/ protobuf/
Date Mon, 15 Oct 2012 02:32:09 GMT
Author: tedyu
Date: Mon Oct 15 02:32:09 2012
New Revision: 1398175

URL: http://svn.apache.org/viewvc?rev=1398175&view=rev
Log:
HBASE-6785 Convert AggregateProtocol to protobuf defined coprocessor service (Devaraj Das)


Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AggregateProtos.java
    hbase/trunk/hbase-server/src/main/protobuf/Aggregate.proto
Removed:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1398175&r1=1398174&r2=1398175&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
Mon Oct 15 02:32:09 2012
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.client.coprocessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -39,16 +40,23 @@ import org.apache.hadoop.hbase.client.HT
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.AggregateProtocol;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateArgument;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 
+import com.google.protobuf.ByteString;
+
 /**
  * This client class is for invoking the aggregate functions deployed on the
- * Region Server side via the AggregateProtocol. This class will implement the
+ * Region Server side via the AggregateService. This class will implement the
  * supporting functionality for summing/processing the individual results
- * obtained from the AggregateProtocol for each region.
+ * obtained from the AggregateService for each region.
  * <p>
  * This will serve as the client side handler for invoking the aggregate
  * functions.
@@ -92,7 +100,7 @@ public class AggregationClient {
    */
   public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci,
       final Scan scan) throws Throwable {
-    validateParameters(scan);
+    final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class MaxCallBack implements Batch.Callback<R> {
       R max = null;
 
@@ -109,11 +117,24 @@ public class AggregationClient {
     HTable table = null;
     try {
       table = new HTable(conf, tableName);
-      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
-          scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
+      table.coprocessorService(AggregateService.class, scan.getStartRow(),
+          scan.getStopRow(), new Batch.Call<AggregateService, R>() {
             @Override
-            public R call(AggregateProtocol instance) throws IOException {
-              return instance.getMax(ci, scan);
+            public R call(AggregateService instance) throws IOException {
+              ServerRpcController controller = new ServerRpcController();
+              BlockingRpcCallback<AggregateResponse> rpcCallback =
+                  new BlockingRpcCallback<AggregateResponse>();
+              instance.getMax(controller, requestArg, rpcCallback);
+              AggregateResponse response = rpcCallback.get();
+              if (controller.failedOnException()) {
+                throw controller.getFailedOn();
+              }
+              if (response.getFirstPartCount() > 0) {
+                return ci.castToCellType(
+                          ci.parseResponseAsPromotedType(
+                              getBytesFromResponse(response.getFirstPart(0))));
+              }
+              return null;
             }
           }, aMaxCallBack);
     } finally {
@@ -149,7 +170,7 @@ public class AggregationClient {
    */
   public <R, S> R min(final byte[] tableName, final ColumnInterpreter<R, S> ci,
       final Scan scan) throws Throwable {
-    validateParameters(scan);
+    final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class MinCallBack implements Batch.Callback<R> {
 
       private R min = null;
@@ -167,12 +188,25 @@ public class AggregationClient {
     HTable table = null;
     try {
       table = new HTable(conf, tableName);
-      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
-          scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
+      table.coprocessorService(AggregateService.class, scan.getStartRow(),
+          scan.getStopRow(), new Batch.Call<AggregateService, R>() {
 
             @Override
-            public R call(AggregateProtocol instance) throws IOException {
-              return instance.getMin(ci, scan);
+            public R call(AggregateService instance) throws IOException {
+              ServerRpcController controller = new ServerRpcController();
+              BlockingRpcCallback<AggregateResponse> rpcCallback =
+                  new BlockingRpcCallback<AggregateResponse>();
+              instance.getMin(controller, requestArg, rpcCallback);
+              AggregateResponse response = rpcCallback.get();
+              if (controller.failedOnException()) {
+                throw controller.getFailedOn();
+              }
+              if (response.getFirstPartCount() > 0) {
+                return ci.castToCellType(
+                  ci.parseResponseAsPromotedType(
+                      getBytesFromResponse(response.getFirstPart(0))));
+              }
+              return null;
             }
           }, minCallBack);
     } finally {
@@ -199,7 +233,7 @@ public class AggregationClient {
    */
   public <R, S> long rowCount(final byte[] tableName,
       final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
-    validateParameters(scan);
+    final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class RowNumCallback implements Batch.Callback<Long> {
       private final AtomicLong rowCountL = new AtomicLong(0);
 
@@ -216,11 +250,22 @@ public class AggregationClient {
     HTable table = null;
     try {
       table = new HTable(conf, tableName);
-      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
-          scan.getStopRow(), new Batch.Call<AggregateProtocol, Long>() {
+      table.coprocessorService(AggregateService.class, scan.getStartRow(),
+          scan.getStopRow(), new Batch.Call<AggregateService, Long>() {
             @Override
-            public Long call(AggregateProtocol instance) throws IOException {
-              return instance.getRowNum(ci, scan);
+            public Long call(AggregateService instance) throws IOException {
+              ServerRpcController controller = new ServerRpcController();
+              BlockingRpcCallback<AggregateResponse> rpcCallback =
+                  new BlockingRpcCallback<AggregateResponse>();
+              instance.getRowNum(controller, requestArg, rpcCallback);
+              AggregateResponse response = rpcCallback.get();
+              if (controller.failedOnException()) {
+                throw controller.getFailedOn();
+              }
+              byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
+              ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
+              bb.rewind();
+              return bb.getLong();
             }
           }, rowNum);
     } finally {
@@ -242,7 +287,8 @@ public class AggregationClient {
    */
   public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci,
       final Scan scan) throws Throwable {
-    validateParameters(scan);
+    final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
+    
     class SumCallBack implements Batch.Callback<S> {
       S sumVal = null;
 
@@ -259,11 +305,23 @@ public class AggregationClient {
     HTable table = null;
     try {
       table = new HTable(conf, tableName);
-      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
-          scan.getStopRow(), new Batch.Call<AggregateProtocol, S>() {
+      table.coprocessorService(AggregateService.class, scan.getStartRow(),
+          scan.getStopRow(), new Batch.Call<AggregateService, S>() {
             @Override
-            public S call(AggregateProtocol instance) throws IOException {
-              return instance.getSum(ci, scan);
+            public S call(AggregateService instance) throws IOException {
+              ServerRpcController controller = new ServerRpcController();
+              BlockingRpcCallback<AggregateResponse> rpcCallback =
+                  new BlockingRpcCallback<AggregateResponse>();
+              instance.getSum(controller, requestArg, rpcCallback);
+              AggregateResponse response = rpcCallback.get();
+              if (controller.failedOnException()) {
+                throw controller.getFailedOn();
+              }
+              if (response.getFirstPartCount() == 0) {
+                return null;
+              }
+              return ci.parseResponseAsPromotedType(
+                  getBytesFromResponse(response.getFirstPart(0)));
             }
           }, sumCallBack);
     } finally {
@@ -284,7 +342,7 @@ public class AggregationClient {
    */
   private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName,
       final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
-    validateParameters(scan);
+    final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
       S sum = null;
       Long rowCount = 0l;
@@ -303,13 +361,31 @@ public class AggregationClient {
     HTable table = null;
     try {
       table = new HTable(conf, tableName);
-      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
+      table.coprocessorService(AggregateService.class, scan.getStartRow(),
           scan.getStopRow(),
-          new Batch.Call<AggregateProtocol, Pair<S, Long>>() {
+          new Batch.Call<AggregateService, Pair<S, Long>>() {
             @Override
-            public Pair<S, Long> call(AggregateProtocol instance)
+            public Pair<S, Long> call(AggregateService instance)
                 throws IOException {
-              return instance.getAvg(ci, scan);
+              ServerRpcController controller = new ServerRpcController();
+              BlockingRpcCallback<AggregateResponse> rpcCallback =
+                  new BlockingRpcCallback<AggregateResponse>();
+              instance.getAvg(controller, requestArg, rpcCallback);
+              AggregateResponse response = rpcCallback.get();
+              if (controller.failedOnException()) {
+                throw controller.getFailedOn();
+              }
+              Pair<S,Long> pair = new Pair<S, Long>(null, 0L);
+              if (response.getFirstPartCount() == 0) {
+                return pair;
+              }
+              pair.setFirst(ci.parseResponseAsPromotedType(
+                  getBytesFromResponse(response.getFirstPart(0))));
+              ByteBuffer bb = ByteBuffer.allocate(8).put(
+                  getBytesFromResponse(response.getSecondPart()));
+              bb.rewind();
+              pair.setSecond(bb.getLong());
+              return pair;
             }
           }, avgCallBack);
     } finally {
@@ -351,7 +427,7 @@ public class AggregationClient {
    */
   private <R, S> Pair<List<S>, Long> getStdArgs(final byte[] tableName,
       final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
-    validateParameters(scan);
+    final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
       long rowCountVal = 0l;
       S sumVal = null, sumSqVal = null;
@@ -366,24 +442,48 @@ public class AggregationClient {
 
       @Override
       public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long>
result) {
-        sumVal = ci.add(sumVal, result.getFirst().get(0));
-        sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
-        rowCountVal += result.getSecond();
+        if (result.getFirst().size() > 0) {
+          sumVal = ci.add(sumVal, result.getFirst().get(0));
+          sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
+          rowCountVal += result.getSecond();
+        }
       }
     }
     StdCallback stdCallback = new StdCallback();
     HTable table = null;
     try {
       table = new HTable(conf, tableName);
-      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
+      table.coprocessorService(AggregateService.class, scan.getStartRow(),
           scan.getStopRow(),
-          new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() {
+          new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
             @Override
-            public Pair<List<S>, Long> call(AggregateProtocol instance)
+            public Pair<List<S>, Long> call(AggregateService instance)
                 throws IOException {
-              return instance.getStd(ci, scan);
+              ServerRpcController controller = new ServerRpcController();
+              BlockingRpcCallback<AggregateResponse> rpcCallback =
+                  new BlockingRpcCallback<AggregateResponse>();
+              instance.getStd(controller, requestArg, rpcCallback);
+              AggregateResponse response = rpcCallback.get();
+              if (controller.failedOnException()) {
+                throw controller.getFailedOn();
+              }
+              Pair<List<S>,Long> pair = 
+                  new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
+              if (response.getFirstPartCount() == 0) {
+                return pair;
+              }
+              List<S> list = new ArrayList<S>();
+              for (int i = 0; i < response.getFirstPartCount(); i++) {
+                list.add(ci.parseResponseAsPromotedType(
+                    getBytesFromResponse(response.getFirstPart(i))));
+              }
+              pair.setFirst(list);
+              ByteBuffer bb = ByteBuffer.allocate(8).put(
+                  getBytesFromResponse(response.getSecondPart()));
+              bb.rewind();
+              pair.setSecond(bb.getLong());
+              return pair;
             }
-
           }, stdCallback);
     } finally {
       if (table != null) {
@@ -431,7 +531,7 @@ public class AggregationClient {
   private <R, S> Pair<NavigableMap<byte[], List<S>>, List<S>>
   getMedianArgs(final byte[] tableName,
       final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
-    validateParameters(scan);
+    final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     final NavigableMap<byte[], List<S>> map =
       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
     class StdCallback implements Batch.Callback<List<S>> {
@@ -457,11 +557,25 @@ public class AggregationClient {
     HTable table = null;
     try {
       table = new HTable(conf, tableName);
-      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
-          scan.getStopRow(), new Batch.Call<AggregateProtocol, List<S>>() {
+      table.coprocessorService(AggregateService.class, scan.getStartRow(),
+          scan.getStopRow(), new Batch.Call<AggregateService, List<S>>() {
             @Override
-            public List<S> call(AggregateProtocol instance) throws IOException {
-              return instance.getMedian(ci, scan);
+            public List<S> call(AggregateService instance) throws IOException {
+              ServerRpcController controller = new ServerRpcController();
+              BlockingRpcCallback<AggregateResponse> rpcCallback =
+                  new BlockingRpcCallback<AggregateResponse>();
+              instance.getMedian(controller, requestArg, rpcCallback);
+              AggregateResponse response = rpcCallback.get();
+              if (controller.failedOnException()) {
+                throw controller.getFailedOn();
+              }
+
+              List<S> list = new ArrayList<S>();
+              for (int i = 0; i < response.getFirstPartCount(); i++) {
+                list.add(ci.parseResponseAsPromotedType(
+                    getBytesFromResponse(response.getFirstPart(i))));
+              }
+              return list;
             }
 
           }, stdCallback);
@@ -557,4 +671,30 @@ public class AggregationClient {
     }
     return null;
   }
+
+  <R,S>AggregateArgument validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S>
ci)
+      throws IOException {
+    validateParameters(scan);
+    final AggregateArgument.Builder requestBuilder = 
+        AggregateArgument.newBuilder();
+    requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
+    if (ci.columnInterpreterSpecificData() != null) {
+      requestBuilder.setInterpreterSpecificBytes(
+        ci.columnInterpreterSpecificData());
+    }
+    requestBuilder.setScan(ProtobufUtil.toScan(scan));
+    return requestBuilder.build();
+  }
+
+  byte[] getBytesFromResponse(ByteString response) {
+    ByteBuffer bb = response.asReadOnlyByteBuffer();
+    bb.rewind();
+    byte[] bytes;
+    if (bb.hasArray()) {
+      bytes = bb.array();
+    } else {
+      bytes = response.toByteArray();
+    }
+    return bytes;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java?rev=1398175&r1=1398174&r2=1398175&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
Mon Oct 15 02:32:09 2012
@@ -18,9 +18,8 @@
  */
 package org.apache.hadoop.hbase.client.coprocessor;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -28,6 +27,8 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import com.google.protobuf.ByteString;
+
 /**
  * a concrete column interpreter implementation. The cell value is a Long value
  * and its promoted data type is also a Long value. For computing aggregation
@@ -86,24 +87,55 @@ public class LongColumnInterpreter imple
   }
 
   @Override
-  public void readFields(DataInput arg0) throws IOException {
-    // nothing to serialize
+  public double divideForAvg(Long l1, Long l2) {
+    return (l2 == null || l1 == null) ? Double.NaN : (l1.doubleValue() / l2
+        .doubleValue());
   }
 
   @Override
-  public void write(DataOutput arg0) throws IOException {
-     // nothing to serialize
+  public Long castToReturnType(Long o) {
+    return o;
   }
 
+
   @Override
-  public double divideForAvg(Long l1, Long l2) {
-    return (l2 == null || l1 == null) ? Double.NaN : (l1.doubleValue() / l2
-        .doubleValue());
+  public Long parseResponseAsPromotedType(byte[] response) {
+    ByteBuffer b = ByteBuffer.allocate(8).put(response);
+    b.rewind();
+    long l = b.getLong();
+    return l;
   }
 
   @Override
-  public Long castToReturnType(Long o) {
-    return o;
+  public Long castToCellType(Long l) {
+    return l;
   }
 
-}
+  @Override
+  public ByteString columnInterpreterSpecificData() {
+    // nothing
+    return null;
+  }
+
+  @Override
+  public void initialize(ByteString bytes) {
+    // nothing
+  }
+
+  @Override
+  public ByteString getProtoForCellType(Long t) {
+    return getProtoForPromotedOrCellType(t);
+  }
+
+  @Override
+  public ByteString getProtoForPromotedType(Long s) {
+    return getProtoForPromotedOrCellType(s);
+  }
+
+  private ByteString getProtoForPromotedOrCellType(Long s) {
+    ByteBuffer bb = ByteBuffer.allocate(8).putLong(s);
+    bb.rewind();
+    ByteString bs = ByteString.copyFrom(bb);
+    return bs;
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java?rev=1398175&r1=1398174&r2=1398175&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
Mon Oct 15 02:32:09 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.coprocessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
@@ -27,45 +28,59 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.ipc.ProtocolSignature;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateArgument;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
 
 /**
  * A concrete AggregateProtocol implementation. Its system level coprocessor
  * that computes the aggregate function at a region level.
+ * @param <T>
+ * @param <S>
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class AggregateImplementation extends BaseEndpointCoprocessor implements
-    AggregateProtocol {
+public class AggregateImplementation<T, S> extends AggregateService implements
+    CoprocessorService, Coprocessor {
   protected static Log log = LogFactory.getLog(AggregateImplementation.class);
+  private RegionCoprocessorEnvironment env;
 
+  /**
+   * Gives the maximum for a given combination of column qualifier and column
+   * family, in the given row range as defined in the Scan object. In its
+   * current implementation, it takes one column family and one column qualifier
+   * (if provided). In case of null column qualifier, maximum value for the
+   * entire column family will be returned.
+   */
   @Override
-  public ProtocolSignature getProtocolSignature(
-      String protocol, long version, int clientMethodsHashCode)
-  throws IOException {
-    if (AggregateProtocol.class.getName().equals(protocol)) {
-      return new ProtocolSignature(AggregateProtocol.VERSION, null);
-    }
-    throw new IOException("Unknown protocol: " + protocol);
-  }
-
-  @Override
-  public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan)
-      throws IOException {
-    T temp;
+  public void getMax(RpcController controller, AggregateArgument request,
+      RpcCallback<AggregateResponse> done) {
+    InternalScanner scanner = null;
+    AggregateResponse response = null;
     T max = null;
-    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
-        .getRegion().getScanner(scan);
-    List<KeyValue> results = new ArrayList<KeyValue>();
-    byte[] colFamily = scan.getFamilies()[0];
-    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
-    // qualifier can be null.
     try {
+      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      T temp;
+      Scan scan = ProtobufUtil.toScan(request.getScan());
+      scanner = env.getRegion().getScanner(scan);
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      byte[] colFamily = scan.getFamilies()[0];
+      byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+      // qualifier can be null.
       boolean hasMoreRows = false;
       do {
         hasMoreRows = scanner.next(results);
@@ -75,26 +90,46 @@ public class AggregateImplementation ext
         }
         results.clear();
       } while (hasMoreRows);
+      if (max != null) {
+        AggregateResponse.Builder builder = AggregateResponse.newBuilder();
+        builder.addFirstPart(ci.getProtoForCellType(max));
+        response = builder.build();
+      }
+    } catch (IOException e) {
+      ResponseConverter.setControllerException(controller, e);
     } finally {
-      scanner.close();
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException ignored) {}
+      }
     }
     log.info("Maximum from this region is "
-        + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
-            .getRegionNameAsString() + ": " + max);
-    return max;
+        + env.getRegion().getRegionNameAsString() + ": " + max);
+    done.run(response);
   }
 
+  /**
+   * Gives the minimum for a given combination of column qualifier and column
+   * family, in the given row range as defined in the Scan object. In its
+   * current implementation, it takes one column family and one column qualifier
+   * (if provided). In case of null column qualifier, minimum value for the
+   * entire column family will be returned.
+   */
   @Override
-  public <T, S> T getMin(ColumnInterpreter<T, S> ci, Scan scan)
-      throws IOException {
+  public void getMin(RpcController controller, AggregateArgument request,
+      RpcCallback<AggregateResponse> done) {
+    AggregateResponse response = null;
+    InternalScanner scanner = null;
     T min = null;
-    T temp;
-    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
-        .getRegion().getScanner(scan);
-    List<KeyValue> results = new ArrayList<KeyValue>();
-    byte[] colFamily = scan.getFamilies()[0];
-    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
     try {
+      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      T temp;
+      Scan scan = ProtobufUtil.toScan(request.getScan());
+      scanner = env.getRegion().getScanner(scan);
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      byte[] colFamily = scan.getFamilies()[0];
+      byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
       boolean hasMoreRows = false;
       do {
         hasMoreRows = scanner.next(results);
@@ -104,27 +139,46 @@ public class AggregateImplementation ext
         }
         results.clear();
       } while (hasMoreRows);
+      if (min != null) {
+        response = AggregateResponse.newBuilder().addFirstPart( 
+          ci.getProtoForCellType(min)).build();
+      }
+    } catch (IOException e) {
+      ResponseConverter.setControllerException(controller, e);
     } finally {
-      scanner.close();
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException ignored) {}
+      }
     }
     log.info("Minimum from this region is "
-        + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
-            .getRegionNameAsString() + ": " + min);
-    return min;
+        + env.getRegion().getRegionNameAsString() + ": " + min);
+    done.run(response);
   }
 
+  /**
+   * Gives the sum for a given combination of column qualifier and column
+   * family, in the given row range as defined in the Scan object. In its
+   * current implementation, it takes one column family and one column qualifier
+   * (if provided). In case of null column qualifier, sum for the entire column
+   * family will be returned.
+   */
   @Override
-  public <T, S> S getSum(ColumnInterpreter<T, S> ci, Scan scan)
-      throws IOException {
+  public void getSum(RpcController controller, AggregateArgument request,
+      RpcCallback<AggregateResponse> done) {
+    AggregateResponse response = null;
+    InternalScanner scanner = null;
     long sum = 0l;
-    S sumVal = null;
-    T temp;
-    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
-        .getRegion().getScanner(scan);
-    byte[] colFamily = scan.getFamilies()[0];
-    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
-    List<KeyValue> results = new ArrayList<KeyValue>();
     try {
+      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      S sumVal = null;
+      T temp;
+      Scan scan = ProtobufUtil.toScan(request.getScan());
+      scanner = env.getRegion().getScanner(scan);
+      byte[] colFamily = scan.getFamilies()[0];
+      byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+      List<KeyValue> results = new ArrayList<KeyValue>();
       boolean hasMoreRows = false;
       do {
         hasMoreRows = scanner.next(results);
@@ -135,27 +189,43 @@ public class AggregateImplementation ext
         }
         results.clear();
       } while (hasMoreRows);
+      if (sumVal != null) {
+        response = AggregateResponse.newBuilder().addFirstPart( 
+          ci.getProtoForPromotedType(sumVal)).build();
+      }
+    } catch (IOException e) {
+      ResponseConverter.setControllerException(controller, e);
     } finally {
-      scanner.close();
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException ignored) {}
+      }
     }
     log.debug("Sum from this region is "
-        + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
-            .getRegionNameAsString() + ": " + sum);
-    return sumVal;
+        + env.getRegion().getRegionNameAsString() + ": " + sum);
+    done.run(response);
   }
 
+  /**
+   * Gives the row count for the given column family and column qualifier, in
+   * the given row range as defined in the Scan object.
+   * @throws IOException
+   */
   @Override
-  public <T, S> long getRowNum(ColumnInterpreter<T, S> ci, Scan scan)
-      throws IOException {
+  public void getRowNum(RpcController controller, AggregateArgument request,
+      RpcCallback<AggregateResponse> done) {
+    AggregateResponse response = null;
     long counter = 0l;
     List<KeyValue> results = new ArrayList<KeyValue>();
-    byte[] colFamily = scan.getFamilies()[0];
-    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
-    if (scan.getFilter() == null && qualifier == null)
-      scan.setFilter(new FirstKeyOnlyFilter());
-    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
-        .getRegion().getScanner(scan);
+    InternalScanner scanner = null;
     try {
+      Scan scan = ProtobufUtil.toScan(request.getScan());
+      byte[] colFamily = scan.getFamilies()[0];
+      byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+      if (scan.getFilter() == null && qualifier == null)
+        scan.setFilter(new FirstKeyOnlyFilter());
+      scanner = env.getRegion().getScanner(scan);
       boolean hasMoreRows = false;
       do {
         hasMoreRows = scanner.next(results);
@@ -164,27 +234,53 @@ public class AggregateImplementation ext
         }
         results.clear();
       } while (hasMoreRows);
+      ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
+      bb.rewind();
+      response = AggregateResponse.newBuilder().addFirstPart( 
+          ByteString.copyFrom(bb)).build();
+    } catch (IOException e) {
+      ResponseConverter.setControllerException(controller, e);
     } finally {
-      scanner.close();
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException ignored) {}
+      }
     }
     log.info("Row counter from this region is "
-        + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
-            .getRegionNameAsString() + ": " + counter);
-    return counter;
+        + env.getRegion().getRegionNameAsString() + ": " + counter);
+    done.run(response);
   }
 
+  /**
+   * Gives a Pair with first object as Sum and second object as row count,
+   * computed for a given combination of column qualifier and column family in
+   * the given row range as defined in the Scan object. In its current
+   * implementation, it takes one column family and one column qualifier (if
+   * provided). In case of null column qualifier, an aggregate sum over all the
+   * entire column family will be returned.
+   * <p>
+   * The average is computed in
+   * {@link AggregationClient#avg(byte[], ColumnInterpreter, Scan)} by
+   * processing results from all regions, so its "ok" to pass sum and a Long
+   * type.
+   */
   @Override
-  public <T, S> Pair<S, Long> getAvg(ColumnInterpreter<T, S> ci, Scan scan)
-      throws IOException {
-    S sumVal = null;
-    Long rowCountVal = 0l;
-    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
-        .getRegion().getScanner(scan);
-    byte[] colFamily = scan.getFamilies()[0];
-    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
-    List<KeyValue> results = new ArrayList<KeyValue>();
-    boolean hasMoreRows = false;
+  public void getAvg(RpcController controller, AggregateArgument request,
+      RpcCallback<AggregateResponse> done) {
+    AggregateResponse response = null;
+    InternalScanner scanner = null;
     try {
+      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      S sumVal = null;
+      Long rowCountVal = 0l;
+      Scan scan = ProtobufUtil.toScan(request.getScan());
+      scanner = env.getRegion().getScanner(scan);
+      byte[] colFamily = scan.getFamilies()[0];
+      byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      boolean hasMoreRows = false;
+    
       do {
         results.clear();
         hasMoreRows = scanner.next(results);
@@ -194,26 +290,53 @@ public class AggregateImplementation ext
         }
         rowCountVal++;
       } while (hasMoreRows);
+      if (sumVal != null) {
+        ByteString first = ci.getProtoForPromotedType(sumVal);
+        AggregateResponse.Builder pair = AggregateResponse.newBuilder();
+        pair.addFirstPart(first);
+        ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
+        bb.rewind();
+        pair.setSecondPart(ByteString.copyFrom(bb));
+        response = pair.build();
+      }
+    } catch (IOException e) {
+      ResponseConverter.setControllerException(controller, e);
     } finally {
-      scanner.close();
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException ignored) {}
+      }
     }
-    Pair<S, Long> pair = new Pair<S, Long>(sumVal, rowCountVal);
-    return pair;
+    done.run(response);
   }
 
+  /**
+   * Gives a Pair with first object a List containing Sum and sum of squares,
+   * and the second object as row count. It is computed for a given combination of
+   * column qualifier and column family in the given row range as defined in the
+   * Scan object. In its current implementation, it takes one column family and
+   * one column qualifier (if provided). The idea is get the value of variance first:
+   * the average of the squares less the square of the average a standard
+   * deviation is square root of variance.
+   */
   @Override
-  public <T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S>
ci, Scan scan)
-      throws IOException {
-    S sumVal = null, sumSqVal = null, tempVal = null;
-    long rowCountVal = 0l;
-    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
-        .getRegion().getScanner(scan);
-    byte[] colFamily = scan.getFamilies()[0];
-    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
-    List<KeyValue> results = new ArrayList<KeyValue>();
-
-    boolean hasMoreRows = false;
+  public void getStd(RpcController controller, AggregateArgument request,
+      RpcCallback<AggregateResponse> done) {
+    InternalScanner scanner = null;
+    AggregateResponse response = null;
     try {
+      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      S sumVal = null, sumSqVal = null, tempVal = null;
+      long rowCountVal = 0l;
+      Scan scan = ProtobufUtil.toScan(request.getScan());
+      scanner = env.getRegion().getScanner(scan);
+      byte[] colFamily = scan.getFamilies()[0];
+      byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+      List<KeyValue> results = new ArrayList<KeyValue>();
+
+      boolean hasMoreRows = false;
+    
       do {
         tempVal = null;
         hasMoreRows = scanner.next(results);
@@ -226,32 +349,56 @@ public class AggregateImplementation ext
         sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
         rowCountVal++;
       } while (hasMoreRows);
+      if (sumVal != null) {
+        ByteString first_sumVal = ci.getProtoForPromotedType(sumVal);
+        ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal);
+        AggregateResponse.Builder pair = AggregateResponse.newBuilder();
+        pair.addFirstPart(first_sumVal);
+        pair.addFirstPart(first_sumSqVal);
+        ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
+        bb.rewind();
+        pair.setSecondPart(ByteString.copyFrom(bb));
+        response = pair.build();
+      }
+    } catch (IOException e) {
+      ResponseConverter.setControllerException(controller, e);
     } finally {
-      scanner.close();
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException ignored) {}
+      }
     }
-    List<S> l = new ArrayList<S>();
-    l.add(sumVal);
-    l.add(sumSqVal);
-    Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
-    return p;
+    done.run(response);
   }
 
+  /**
+   * Gives a List containing sum of values and sum of weights.
+   * It is computed for the combination of column
+   * family and column qualifier(s) in the given row range as defined in the
+   * Scan object. In its current implementation, it takes one column family and
+   * two column qualifiers. The first qualifier is for values column and 
+   * the second qualifier (optional) is for weight column.
+   */
   @Override
-  public <T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
-  throws IOException {
-    S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
-
-    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
-    .getRegion().getScanner(scan);
-    byte[] colFamily = scan.getFamilies()[0];
-    NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
-    byte[] valQualifier = quals.pollFirst();
-    // if weighted median is requested, get qualifier for the weight column
-    byte[] weightQualifier = quals.size() > 1 ? quals.pollLast() : null;
-    List<KeyValue> results = new ArrayList<KeyValue>();
-
-    boolean hasMoreRows = false;
+  public void getMedian(RpcController controller, AggregateArgument request,
+      RpcCallback<AggregateResponse> done) {
+    AggregateResponse response = null;
+    InternalScanner scanner = null;
     try {
+      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
+      Scan scan = ProtobufUtil.toScan(request.getScan());
+      scanner = env.getRegion().getScanner(scan);
+      byte[] colFamily = scan.getFamilies()[0];
+      NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
+      byte[] valQualifier = quals.pollFirst();
+      // if weighted median is requested, get qualifier for the weight column
+      byte[] weightQualifier = quals.size() > 1 ? quals.pollLast() : null;
+      List<KeyValue> results = new ArrayList<KeyValue>();
+
+      boolean hasMoreRows = false;
+    
       do {
         tempVal = null;
         tempWeight = null;
@@ -268,13 +415,73 @@ public class AggregateImplementation ext
         sumVal = ci.add(sumVal, tempVal);
         sumWeights = ci.add(sumWeights, tempWeight);
       } while (hasMoreRows);
+      ByteString first_sumVal = ci.getProtoForPromotedType(sumVal);
+      S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
+      ByteString first_sumWeights = ci.getProtoForPromotedType(s);
+      AggregateResponse.Builder pair = AggregateResponse.newBuilder();
+      pair.addFirstPart(first_sumVal);
+      pair.addFirstPart(first_sumWeights); 
+      response = pair.build();
+    } catch (IOException e) {
+      ResponseConverter.setControllerException(controller, e);
     } finally {
-      scanner.close();
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException ignored) {}
+      }
+    }
+    done.run(response);
+  }
+
+  @SuppressWarnings("unchecked")
+  ColumnInterpreter<T,S> constructColumnInterpreterFromRequest(
+      AggregateArgument request) throws IOException {
+    String className = request.getInterpreterClassName();
+    Class<?> cls;
+    try {
+      cls = Class.forName(className);
+      ColumnInterpreter<T,S> ci = (ColumnInterpreter<T, S>) cls.newInstance();
+      if (request.hasInterpreterSpecificBytes()) {
+        ci.initialize(request.getInterpreterSpecificBytes());
+      }
+      return ci;
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  /**
+   * Stores a reference to the coprocessor environment provided by the
+   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where
this
+   * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be
loaded
+   * on a table region, so always expects this to be an instance of
+   * {@link RegionCoprocessorEnvironment}.
+   * @param env the environment provided by the coprocessor host
+   * @throws IOException if the provided environment is not an instance of
+   * {@code RegionCoprocessorEnvironment}
+   */
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    if (env instanceof RegionCoprocessorEnvironment) {
+      this.env = (RegionCoprocessorEnvironment)env;
+    } else {
+      throw new CoprocessorException("Must be loaded on a table region!");
     }
-    List<S> l = new ArrayList<S>();
-    l.add(sumVal);
-    l.add(sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights);
-    return l;
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+    // nothing to do
   }
   
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java?rev=1398175&r1=1398174&r2=1398175&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
Mon Oct 15 02:32:09 2012
@@ -25,7 +25,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
-import org.apache.hadoop.io.Writable;
+
+import com.google.protobuf.ByteString;
 
 /**
  * Defines how value for specific column is interpreted and provides utility
@@ -48,7 +49,7 @@ import org.apache.hadoop.io.Writable;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public interface ColumnInterpreter<T, S> extends Writable {
+public interface ColumnInterpreter<T, S> {
 
   /**
    * @param colFamily
@@ -114,4 +115,50 @@ public interface ColumnInterpreter<T, S>
    * @return Average
    */
   double divideForAvg(S o, Long l);
-}
+
+  /**
+   * This method should return any additional data that is needed on the
+   * server side to construct the ColumnInterpreter. The server
+   * will pass this to the {@link #initialize(org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.ColumnInterpreter)}
+   * method. If there is no ColumnInterpreter specific data (for e.g.,
+   * {@link LongColumnInterpreter}) then null should be returned.
+   * @return the PB message
+   */
+  ByteString columnInterpreterSpecificData();
+
+  /**
+   * Return the PB for type T
+   * @param t
+   * @return PB-message
+   */
+  ByteString getProtoForCellType(T t);
+
+  /**
+   * Return the PB for type S
+   * @param s
+   * @return PB-message
+   */
+  ByteString getProtoForPromotedType(S s);
+
+  /**
+   * This method should initialize any field(s) of the ColumnInterpreter with
+   * a parsing of the passed message bytes (used on the server side).
+   * @param bytes
+   */
+  void initialize(ByteString bytes);
+  
+  /**
+   * Converts the bytes in the server's response to the expected type S
+   * @param response
+   * @return response of type S constructed from the message
+   */
+  S parseResponseAsPromotedType(byte[] response);
+  
+  /**
+   * The response message comes as type S. This will convert/cast it to T.
+   * In some sense, performs the opposite of {@link #castToReturnType(Object)}
+   * @param response
+   * @return cast
+   */
+  T castToCellType(S response);
+}
\ No newline at end of file



Mime
View raw message