hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1526987 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/
Date Fri, 27 Sep 2013 17:31:48 GMT
Author: tedyu
Date: Fri Sep 27 17:31:47 2013
New Revision: 1526987

URL: http://svn.apache.org/r1526987
Log:
HBASE-9605 Allow AggregationClient to skip specifying column family for row count aggregate


Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1526987&r1=1526986&r2=1526987&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
(original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
Fri Sep 27 17:31:47 2013
@@ -127,7 +127,7 @@ public class AggregationClient {
   public <R, S, P extends Message, Q extends Message, T extends Message> 
   R max(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
       final Scan scan) throws Throwable {
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
+    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
     class MaxCallBack implements Batch.Callback<R> {
       R max = null;
 
@@ -164,7 +164,11 @@ public class AggregationClient {
     return aMaxCallBack.getMax();
   }
 
-  private void validateParameters(Scan scan) throws IOException {
+  /*
+   * @param scan
+   * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
+   */
+  private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException
{
     if (scan == null
         || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
             .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
@@ -172,8 +176,10 @@ public class AggregationClient {
         	!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
       throw new IOException(
           "Agg client Exception: Startrow should be smaller than Stoprow");
-    } else if (scan.getFamilyMap().size() != 1) {
-      throw new IOException("There must be only one family.");
+    } else if (!canFamilyBeAbsent) {
+      if (scan.getFamilyMap().size() != 1) {
+        throw new IOException("There must be only one family.");
+      }
     }
   }
 
@@ -214,7 +220,7 @@ public class AggregationClient {
   public <R, S, P extends Message, Q extends Message, T extends Message> 
   R min(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
       final Scan scan) throws Throwable {
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
+    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
     class MinCallBack implements Batch.Callback<R> {
 
       private R min = null;
@@ -297,7 +303,7 @@ public class AggregationClient {
   public <R, S, P extends Message, Q extends Message, T extends Message> 
   long rowCount(final HTable table,
       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable
{
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
+    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
     class RowNumCallback implements Batch.Callback<Long> {
       private final AtomicLong rowCountL = new AtomicLong(0);
 
@@ -367,7 +373,7 @@ public class AggregationClient {
   public <R, S, P extends Message, Q extends Message, T extends Message> 
   S sum(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
       final Scan scan) throws Throwable {
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
+    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
     
     class SumCallBack implements Batch.Callback<S> {
       S sumVal = null;
@@ -439,7 +445,7 @@ public class AggregationClient {
   private <R, S, P extends Message, Q extends Message, T extends Message>
   Pair<S, Long> getAvgArgs(final HTable table,
       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable
{
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
+    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
       S sum = null;
       Long rowCount = 0l;
@@ -536,7 +542,7 @@ public class AggregationClient {
   private <R, S, P extends Message, Q extends Message, T extends Message>
   Pair<List<S>, Long> getStdArgs(final HTable table,
       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable
{
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
+    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
       long rowCountVal = 0l;
       S sumVal = null, sumSqVal = null;
@@ -658,7 +664,7 @@ public class AggregationClient {
   Pair<NavigableMap<byte[], List<S>>, List<S>>
   getMedianArgs(final HTable table,
       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable
{
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
+    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
     final NavigableMap<byte[], List<S>> map =
       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
     class StdCallback implements Batch.Callback<List<S>> {
@@ -814,9 +820,9 @@ public class AggregationClient {
   }
 
   <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest

-  validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci)
+  validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
       throws IOException {
-    validateParameters(scan);
+    validateParameters(scan, canFamilyBeAbsent);
     final AggregateRequest.Builder requestBuilder = 
         AggregateRequest.newBuilder();
     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java?rev=1526987&r1=1526986&r2=1526987&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
Fri Sep 27 17:31:47 2013
@@ -237,8 +237,10 @@ extends AggregateService implements Copr
     InternalScanner scanner = null;
     try {
       Scan scan = ProtobufUtil.toScan(request.getScan());
-      byte[] colFamily = scan.getFamilies()[0];
-      NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
+      byte[][] colFamilies = scan.getFamilies();
+      byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
+      NavigableSet<byte[]> qualifiers = colFamilies != null ?
+          scan.getFamilyMap().get(colFamily) : null;
       byte[] qualifier = null;
       if (qualifiers != null && !qualifiers.isEmpty()) {
         qualifier = qualifiers.pollFirst();

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java?rev=1526987&r1=1526986&r2=1526987&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
Fri Sep 27 17:31:47 2013
@@ -173,7 +173,6 @@ public class TestAggregateProtocol {
   public void testRowCountAllTable() throws Throwable {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
-    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
         new LongColumnInterpreter();
     long rowCount = aClient.rowCount(TEST_TABLE, ci,
@@ -190,7 +189,6 @@ public class TestAggregateProtocol {
   public void testRowCountWithInvalidRange1() {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
-    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[2]);
 
@@ -215,7 +213,6 @@ public class TestAggregateProtocol {
   public void testRowCountWithInvalidRange2() {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
-    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[5]);
 
@@ -230,26 +227,6 @@ public class TestAggregateProtocol {
     assertEquals(0, rowCount);
   }
 
-  /**
-   * This should return a 0
-   */
-  @Test (timeout=300000)
-  public void testRowCountWithNullCF() {
-    AggregationClient aClient = new AggregationClient(conf);
-    Scan scan = new Scan();
-    scan.setStartRow(ROWS[5]);
-    scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
-        new LongColumnInterpreter();
-    long rowCount = -1;
-    try {
-      rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
-    } catch (Throwable e) {
-       rowCount = 0;
-    }
-    assertEquals(0, rowCount);
-  }
-
   @Test (timeout=300000)
   public void testRowCountWithNullCQ() throws Throwable {
     AggregationClient aClient = new AggregationClient(conf);



Mime
View raw message