hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1096620 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/client/coprocessor/ src/main/java/org/apache/hadoop/hbase/coprocessor/ src/test/java/org/apache/hadoop/hbase/coprocessor/
Date Mon, 25 Apr 2011 22:02:25 GMT
Author: stack
Date: Mon Apr 25 22:02:24 2011
New Revision: 1096620

URL: http://svn.apache.org/viewvc?rev=1096620&view=rev
Log:
HBASE-1512 Coprocessors: Support aggregate functions

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
Modified:
    hbase/trunk/CHANGES.txt

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1096620&r1=1096619&r2=1096620&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Apr 25 22:02:24 2011
@@ -185,6 +185,7 @@ Release 0.91.0 - Unreleased
    HBASE-3798  [REST] Allow representation to elide row key and column key
    HBASE-3812  Tidy up naming consistency and documentation in coprocessor
                framework (Mingjie Lai)
+   HBASE-1512  Support aggregate functions (Himanshu Vashishtha)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1096620&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java Mon Apr 25 22:02:24 2011
@@ -0,0 +1,362 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.AggregateProtocol;
+import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * This client class is for invoking the aggregate functions deployed on the
+ * Region Server side via the AggregateProtocol. This class will implement the
+ * supporting functionality for summing/processing the individual results
+ * obtained from the AggregateProtocol for each region.
+ * <p>
+ * This will serve as the client side handler for invoking the aggregate
+ * functions.
+ * <ul>
+ * For all aggregate functions,
+ * <li>start row < end row is an essential condition (if they are not
+ * {@link HConstants#EMPTY_BYTE_ARRAY})
+ * <li>Column family can't be null. In case where multiple families are
+ * provided, an IOException will be thrown. An optional column qualifier can
+ * also be defined.
+ * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
+ * parameter type. For average and std, it returns a double value. For row
+ * count, it returns a long value.
+ */
+public class AggregationClient {
+
+  private static final Log log = LogFactory.getLog(AggregationClient.class);
+  Configuration conf;
+
+  /**
+   * Constructor with Conf object
+   * @param cfg
+   */
+  public AggregationClient(Configuration cfg) {
+    this.conf = cfg;
+  }
+
+  /**
+   * It gives the maximum value of a column for a given column family for the
+   * given range. In case qualifier is null, a max of all values for the given
+   * family is returned.
+   * @param tableName
+   * @param ci
+   * @param scan
+   * @return max val <R>
+   * @throws Throwable
+   *           The caller is supposed to handle the exception as they are thrown
+   *           & propagated to it.
+   */
+  public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci,
+      final Scan scan) throws Throwable {
+    validateParameters(scan);
+    HTable table = new HTable(conf, tableName);
+
+    class MaxCallBack implements Batch.Callback<R> {
+      R max = null;
+
+      R getMax() {
+        return max;
+      }
+
+      @Override
+      public void update(byte[] region, byte[] row, R result) {
+        max = ci.compare(max, result) < 0 ? result : max;
+      }
+    }
+    MaxCallBack aMaxCallBack = new MaxCallBack();
+    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
+        .getStopRow(), new Batch.Call<AggregateProtocol, R>() {
+      @Override
+      public R call(AggregateProtocol instance) throws IOException {
+        return instance.getMax(ci, scan);
+      }
+    }, aMaxCallBack);
+    return aMaxCallBack.getMax();
+  }
+
+  private void validateParameters(Scan scan) throws IOException {
+    if (scan == null || 
+        (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
+                      .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))){
+      throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow");
+    }else if(scan.getFamilyMap().size() != 1) {
+      throw new IOException("There must be only one family.");
+    }
+  }
+
+  /**
+   * It gives the minimum value of a column for a given column family for the
+   * given range. In case qualifier is null, a min of all values for the given
+   * family is returned.
+   * @param tableName
+   * @param ci
+   * @param scan
+   * @return min val <R>
+   * @throws Throwable
+   */
+  public <R, S> R min(final byte[] tableName, final ColumnInterpreter<R, S> ci,
+      final Scan scan) throws Throwable {
+    validateParameters(scan);
+    class MinCallBack implements Batch.Callback<R> {
+
+      private R min = null;
+
+      public R getMinimum() {
+        return min;
+      }
+
+      @Override
+      public void update(byte[] region, byte[] row, R result) {
+        min = (min == null || ci.compare(result, min) < 0) ? result : min;
+      }
+    }
+    HTable table = new HTable(conf, tableName);
+    MinCallBack minCallBack = new MinCallBack();
+    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
+        .getStopRow(), new Batch.Call<AggregateProtocol, R>() {
+
+      @Override
+      public R call(AggregateProtocol instance) throws IOException {
+        return instance.getMin(ci, scan);
+      }
+    }, minCallBack);
+    log.debug("Min fom all regions is: " + minCallBack.getMinimum());
+    return minCallBack.getMinimum();
+  }
+
+  /**
+   * It gives the row count, by summing up the individual results obtained from
+   * regions. In case the qualifier is null, FirstKEyValueFilter is used to
+   * optimised the operation. In case qualifier is provided, I can't use the
+   * filter as it may set the flag to skip to next row, but the value read is
+   * not of the given filter: in this case, this particular row will not be
+   * counted ==> an error.
+   * @param tableName
+   * @param ci
+   * @param scan
+   * @return
+   * @throws Throwable
+   */
+  public <R, S> long rowCount(final byte[] tableName,
+      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+    validateParameters(scan);
+    class RowNumCallback implements Batch.Callback<Long> {
+      private long rowCountL = 0l;
+
+      public long getRowNumCount() {
+        return rowCountL;
+      }
+
+      @Override
+      public void update(byte[] region, byte[] row, Long result) {
+        rowCountL += result.longValue();
+      }
+    }
+    RowNumCallback rowNum = new RowNumCallback();
+    HTable table = new HTable(conf, tableName);
+    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
+        .getStopRow(), new Batch.Call<AggregateProtocol, Long>() {
+      @Override
+      public Long call(AggregateProtocol instance) throws IOException {
+        return instance.getRowNum(ci, scan);
+      }
+    }, rowNum);
+    return rowNum.getRowNumCount();
+  }
+
+  /**
+   * It sums up the value returned from various regions. In case qualifier is
+   * null, summation of all the column qualifiers in the given family is done.
+   * @param tableName
+   * @param ci
+   * @param scan
+   * @return sum <S>
+   * @throws Throwable
+   */
+  public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci,
+      final Scan scan) throws Throwable {
+    validateParameters(scan);
+    class SumCallBack implements Batch.Callback<S> {
+      S sumVal = null;
+
+      public S getSumResult() {
+        return sumVal;
+      }
+
+      @Override
+      public void update(byte[] region, byte[] row, S result) {
+        sumVal = ci.add(sumVal, result);
+      }
+    }
+    SumCallBack sumCallBack = new SumCallBack();
+    HTable table = new HTable(conf, tableName);
+    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
+        .getStopRow(), new Batch.Call<AggregateProtocol, S>() {
+      @Override
+      public S call(AggregateProtocol instance) throws IOException {
+        return instance.getSum(ci, scan);
+      }
+    }, sumCallBack);
+    return sumCallBack.getSumResult();
+  }
+
+  /**
+   * It computes average while fetching sum and row count from all the
+   * corresponding regions. Approach is to compute a global sum of region level
+   * sum and rowcount and then compute the average.
+   * @param tableName
+   * @param scan
+   * @throws Throwable
+   */
+  private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName,
+      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+    validateParameters(scan);
+    class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
+      S sum = null;
+      Long rowCount = 0l;
+
+      public Pair<S, Long> getAvgArgs() {
+        return new Pair<S, Long>(sum, rowCount);
+      }
+
+      @Override
+      public void update(byte[] region, byte[] row, Pair<S, Long> result) {
+        sum = ci.add(sum, result.getFirst());
+        rowCount += result.getSecond();
+      }
+    }
+    AvgCallBack avgCallBack = new AvgCallBack();
+    HTable table = new HTable(conf, tableName);
+    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
+        .getStopRow(), new Batch.Call<AggregateProtocol, Pair<S, Long>>() {
+      @Override
+      public Pair<S, Long> call(AggregateProtocol instance) throws IOException {
+        return instance.getAvg(ci, scan);
+      }
+    }, avgCallBack);
+    return avgCallBack.getAvgArgs();
+  }
+
+  /**
+   * This is the client side interface/handle for calling the average method for
+   * a given cf-cq combination. It was necessary to add one more call stack as
+   * its return type should be a decimal value, irrespective of what
+   * columninterpreter says. So, this methods collects the necessary parameters
+   * to compute the average and returs the double value.
+   * @param tableName
+   * @param ci
+   * @param scan
+   * @return
+   * @throws Throwable
+   */
+  public <R, S> double avg(final byte[] tableName,
+      final ColumnInterpreter<R, S> ci, Scan scan) throws Throwable {
+    Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
+    return ci.divideForAvg(p.getFirst(), p.getSecond());
+  }
+
+  /**
+   * It computes a global standard deviation for a given column and its value.
+   * Standard deviation is square root of (average of squares -
+   * average*average). From individual regions, it obtains sum, square sum and
+   * number of rows. With these, the above values are computed to get the global
+   * std.
+   * @param tableName
+   * @param scan
+   * @return
+   * @throws Throwable
+   */
+  private <R, S> Pair<List<S>, Long> getStdArgs(final byte[] tableName,
+      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+    validateParameters(scan);
+    class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
+      long rowCountVal = 0l;
+      S sumVal = null, sumSqVal = null;
+
+      public Pair<List<S>, Long> getStdParams() {
+        List<S> l = new ArrayList<S>();
+        l.add(sumVal);
+        l.add(sumSqVal);
+        Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
+        return p;
+      }
+
+      @Override
+      public void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
+        sumVal = ci.add(sumVal, result.getFirst().get(0));
+        sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
+        rowCountVal += result.getSecond();
+      }
+    }
+    StdCallback stdCallback = new StdCallback();
+    HTable table = new HTable(conf, tableName);
+    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
+        .getStopRow(),
+        new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() {
+          @Override
+          public Pair<List<S>, Long> call(AggregateProtocol instance)
+              throws IOException {
+            return instance.getStd(ci, scan);
+          }
+
+        }, stdCallback);
+    return stdCallback.getStdParams();
+  }
+
+  /**
+   * This is the client side interface/handle for calling the std method for a
+   * given cf-cq combination. It was necessary to add one more call stack as its
+   * return type should be a decimal value, irrespective of what
+   * columninterpreter says. So, this methods collects the necessary parameters
+   * to compute the std and returns the double value.
+   * @param tableName
+   * @param ci
+   * @param scan
+   * @return
+   * @throws Throwable
+   */
+  public <R, S> double std(final byte[] tableName, ColumnInterpreter<R, S> ci,
+      Scan scan) throws Throwable {
+    Pair<List<S>, Long> p = getStdArgs(tableName, ci, scan);
+    double res = 0d;
+    double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
+    double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
+    res = avgOfSumSq - (avg) * (avg); // variance
+    res = Math.pow(res, 0.5);
+    return res;
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java?rev=1096620&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java Mon Apr 25 22:02:24 2011
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * a concrete column interpreter implementation. The cell value is a Long value
+ * and its promoted data type is also a Long value. For computing aggregation
+ * function, this class is used to find the datatype of the cell value. Client
+ * is supposed to instantiate it and passed along as a parameter. See
+ * {@link TestAggregateProtocol} methods for its sample usage.
+ * Its methods handle null arguments gracefully. 
+ */
+public class LongColumnInterpreter implements ColumnInterpreter<Long, Long> {
+
+  public Long getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
+      throws IOException {
+    if (kv == null || kv.getValue().length != Bytes.SIZEOF_LONG)
+      return null;
+    return Bytes.toLong(kv.getValue());
+  }
+
+   @Override
+  public Long add(Long l1, Long l2) {
+    if (l1 == null ^ l2 == null) {
+      return (l1 == null) ? l2 : l1; // either of one is null.
+    } else if (l1 == null) // both are null
+      return null;
+    return l1 + l2;
+  }
+
+  @Override
+  public int compare(final Long l1, final Long l2) {
+    if (l1 == null ^ l2 == null) {
+      return l1 == null ? -1 : 1; // either of one is null.
+    } else if (l1 == null)
+      return 0; // both are null
+    return l1.compareTo(l2); // natural ordering.
+  }
+
+  @Override
+  public Long getMaxValue() {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public Long increment(Long o) {
+    return o == null ? null : (o + 1l);
+  }
+
+  @Override
+  public Long multiply(Long l1, Long l2) {
+    return (l1 == null || l2 == null) ? null : l1 * l2;
+  }
+
+  @Override
+  public Long getMinValue() {
+    return Long.MIN_VALUE;
+  }
+
+  @Override
+  public void readFields(DataInput arg0) throws IOException {
+    // nothing to serialize
+  }
+
+  @Override
+  public void write(DataOutput arg0) throws IOException {
+     // nothing to serialize
+  }
+
+  @Override
+  public double divideForAvg(Long l1, Long l2) {
+    return (l2 == null || l1 == null) ? Double.NaN : (l1.doubleValue() / l2
+        .doubleValue());
+  }
+
+  @Override
+  public Long castToReturnType(Long o) {
+    return o;
+  }
+
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java?rev=1096620&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java Mon Apr 25 22:02:24 2011
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * A concrete AggregateProtocol implementation. Its system level coprocessor
+ * that computes the aggregate function at a region level.
+ */
+public class AggregateImplementation extends BaseEndpointCoprocessor implements
+    AggregateProtocol {
+  protected static Log log = LogFactory.getLog(AggregateImplementation.class);
+
+  @Override
+  public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException {
+    T temp;
+    T max = null;
+    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
+        .getRegion().getScanner(scan);
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    byte[] colFamily = scan.getFamilies()[0];
+    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+    // qualifier can be null.
+    try {
+      boolean hasMoreRows = false;
+      do {
+        hasMoreRows = scanner.next(results);
+        for (KeyValue kv : results) {
+          temp = ci.getValue(colFamily, qualifier, kv);
+          max = (max == null || ci.compare(temp, max) > 0) ? temp : max;
+        }
+        results.clear();
+      } while (hasMoreRows);
+    } finally {
+      scanner.close();
+    }
+    log.info("Maximum from this region is "
+        + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
+            .getRegionNameAsString() + ": " + max);
+    return max;
+  }
+
+  @Override
+  public <T, S> T getMin(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException {
+    T min = null;
+    T temp;
+    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
+        .getRegion().getScanner(scan);
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    byte[] colFamily = scan.getFamilies()[0];
+    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+    try {
+      boolean hasMoreRows = false;
+      do {
+        hasMoreRows = scanner.next(results);
+        for (KeyValue kv : results) {
+          temp = ci.getValue(colFamily, qualifier, kv);
+          min = (min == null || ci.compare(temp, min) < 0) ? temp : min;
+        }
+        results.clear();
+      } while (hasMoreRows);
+    } finally {
+      scanner.close();
+    }
+    log.info("Minimum from this region is "
+        + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
+            .getRegionNameAsString() + ": " + min);
+    return min;
+  }
+
+  @Override
+  public <T, S> S getSum(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException {
+    long sum = 0l;
+    S sumVal = null;
+    T temp;
+    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
+        .getRegion().getScanner(scan);
+    byte[] colFamily = scan.getFamilies()[0];
+    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    try {
+      boolean hasMoreRows = false;
+      do {
+        hasMoreRows = scanner.next(results);
+        for (KeyValue kv : results) {
+          temp = ci.getValue(colFamily, qualifier, kv);
+          if (temp != null)
+            sumVal = ci.add(sumVal, ci.castToReturnType(temp));
+        }
+        results.clear();
+      } while (hasMoreRows);
+    } finally {
+      scanner.close();
+    }
+    log.debug("Sum from this region is "
+        + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
+            .getRegionNameAsString() + ": " + sum);
+    return sumVal;
+  }
+
+  @Override
+  public <T, S> long getRowNum(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException {
+    long counter = 0l;
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    byte[] colFamily = scan.getFamilies()[0];
+    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+    if (scan.getFilter() == null && qualifier == null)
+      scan.setFilter(new FirstKeyOnlyFilter());
+    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
+        .getRegion().getScanner(scan);
+    try {
+      boolean hasMoreRows = false;
+      do {
+        hasMoreRows = scanner.next(results);
+        if (results.size() > 0) {
+          counter++;
+        }
+        results.clear();
+      } while (hasMoreRows);
+    } finally {
+      scanner.close();
+    }
+    log.info("Row counter from this region is "
+        + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
+            .getRegionNameAsString() + ": " + counter);
+    return counter;
+  }
+
+  @Override
+  public <T, S> Pair<S, Long> getAvg(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException {
+    S sumVal = null;
+    Long rowCountVal = 0l;
+    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
+        .getRegion().getScanner(scan);
+    byte[] colFamily = scan.getFamilies()[0];
+    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    boolean hasMoreRows = false;
+    try {
+      do {
+        results.clear();
+        hasMoreRows = scanner.next(results);
+        for (KeyValue kv : results) {
+          sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
+              qualifier, kv)));
+        }
+        rowCountVal++;
+      } while (hasMoreRows);
+    } finally {
+      scanner.close();
+    }
+    Pair<S, Long> pair = new Pair<S, Long>(sumVal, rowCountVal);
+    return pair;
+  }
+
+  @Override
+  public <T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException {
+    S sumVal = null, sumSqVal = null, tempVal = null;
+    long rowCountVal = 0l;
+    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
+        .getRegion().getScanner(scan);
+    byte[] colFamily = scan.getFamilies()[0];
+    byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
+    List<KeyValue> results = new ArrayList<KeyValue>();
+
+    boolean hasMoreRows = false;
+    try {
+      do {
+        tempVal = null;
+        hasMoreRows = scanner.next(results);
+        for (KeyValue kv : results) {
+          tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
+              qualifier, kv)));
+        }
+        results.clear();
+        sumVal = ci.add(sumVal, tempVal);
+        sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
+        rowCountVal++;
+      } while (hasMoreRows);
+    } finally {
+      scanner.close();
+    }
+    List<S> l = new ArrayList<S>();
+    l.add(sumVal);
+    l.add(sumSqVal);
+    Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
+    return p;
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java?rev=1096620&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java Mon Apr 25 22:02:24 2011
@@ -0,0 +1,129 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Defines the aggregation functions that are to be supported in this
+ * Coprocessor. For each method, it takes a Scan object and a columnInterpreter.
+ * The scan object should have a column family (else an exception will be
+ * thrown), and an optional column qualifier. In the current implementation
+ * {@link AggregateImplementation}, only one column family and column qualifier
+ * combination is served. In case there are more than one, only first one will
+ * be picked. Refer to {@link AggregationClient} for some general conditions on
+ * input parameters.
+ */
+public interface AggregateProtocol extends CoprocessorProtocol {
+
+  /**
+   * Gives the maximum for a given combination of column qualifier and column
+   * family, in the given row range as defined in the Scan object. In its
+   * current implementation, it takes one column family and one column qualifier
+   * (if provided). In case of null column qualifier, maximum value for the
+   * entire column family will be returned.
+   * @param ci
+   * @param scan
+   * @return max value as mentioned above
+   * @throws IOException
+   */
+  <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan) throws IOException;
+
+  /**
+   * Gives the minimum for a given combination of column qualifier and column
+   * family, in the given row range as defined in the Scan object. In its
+   * current implementation, it takes one column family and one column qualifier
+   * (if provided). In case of null column qualifier, minimum value for the
+   * entire column family will be returned.
+   * @param ci
+   * @param scan
+   * @return min as mentioned above
+   * @throws IOException
+   */
+  <T, S> T getMin(ColumnInterpreter<T, S> ci, Scan scan) throws IOException;
+
+  /**
+   * Gives the sum for a given combination of column qualifier and column
+   * family, in the given row range as defined in the Scan object. In its
+   * current implementation, it takes one column family and one column qualifier
+   * (if provided). In case of null column qualifier, sum for the entire column
+   * family will be returned.
+   * @param ci
+   * @param scan
+   * @return sum of values as defined by the column interpreter
+   * @throws IOException
+   */
+  <T, S> S getSum(ColumnInterpreter<T, S> ci, Scan scan) throws IOException;
+
+  /**
+   * Gives the row count for the given column family and column qualifier, in
+   * the given row range as defined in the Scan object.
+   * @param ci
+   * @param scan
+   * @return
+   * @throws IOException
+   */
+  <T, S> long getRowNum(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException;
+
+  /**
+   * Gives a Pair with first object as Sum and second object as row count,
+   * computed for a given combination of column qualifier and column family in
+   * the given row range as defined in the Scan object. In its current
+   * implementation, it takes one column family and one column qualifier (if
+   * provided). In case of null column qualifier, an aggregate sum over all the
+   * entire column family will be returned.
+   * <p>
+   * The average is computed in
+   * {@link AggregationClient#avg(byte[], ColumnInterpreter, Scan)} by
+   * processing results from all regions, so its "ok" to pass sum and a Long
+   * type.
+   * @param ci
+   * @param scan
+   * @return
+   * @throws IOException
+   */
+  <T, S> Pair<S, Long> getAvg(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException;
+
+  /**
+   * Gives a Pair with first object a List containing Sum and sum of squares,
+   * and the second object as row count. It is computed for a given combination of
+   * column qualifier and column family in the given row range as defined in the
+   * Scan object. In its current implementation, it takes one column family and
+   * one column qualifier (if provided). The idea is get the value of variance first:
+   * the average of the squares less the square of the average a standard
+   * deviation is square root of variance.
+   * @param ci
+   * @param scan
+   * @return
+   * @throws IOException
+   */
+  <T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException;
+
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java?rev=1096620&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java Mon Apr 25 22:02:24 2011
@@ -0,0 +1,118 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Defines how value for specific column is interpreted and provides utility
+ * methods like compare, add, multiply etc for them. Takes column family, column
+ * qualifier and return the cell value. Its concrete implementation should
+ * handle null case gracefully. Refer to {@link LongColumnInterpreter} for an
+ * example.
+ * <p>
+ * Takes two generic parameters. The cell value type of the interpreter is <T>.
+ * During some computations like sum, average, the return type can be different
+ * than the cell value data type, for eg, sum of int cell values might overflow
+ * in case of a int result, we should use Long for its result. Therefore, this
+ * class mandates to use a different (promoted) data type for result of these
+ * computations <S>. All computations are performed on the promoted data type
+ * <S>. There is a conversion method
+ * {@link ColumnInterpreter#castToReturnType(Object)} which takes a <T> type and
+ * returns a <S> type.
+ * @param <T, S>: T - cell value data type, S - promoted data type
+ */
+public interface ColumnInterpreter<T, S> extends Writable {
+
+  /**
+   * @param colFamily
+   * @param colQualifier
+   * @param value
+   * @return value of type T
+   * @throws IOException
+   */
+  T getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
+      throws IOException;
+
+  /**
+   * returns sum or non null value among (if either of them is null); otherwise
+   * returns a null.
+   * @param l1
+   * @param l2
+   * @return
+   */
+  public S add(S l1, S l2);
+
+  /**
+   * returns the maximum value for this type T
+   * @return
+   */
+
+  T getMaxValue();
+
+  /**
+   * @return
+   */
+
+  T getMinValue();
+
+  /**
+   * @param o1
+   * @param o2
+   * @return
+   */
+  S multiply(S o1, S o2);
+
+  /**
+   * @param o
+   * @return
+   */
+  S increment(S o);
+
+  /**
+   * provides casting opportunity between the data types.
+   * @param o
+   * @return
+   */
+  S castToReturnType(T o);
+
+  /**
+   * This takes care if either of arguments are null. returns 0 if they are
+   * equal or both are null;
+   * <ul>
+   * <li>>0 if l1 > l2 or l1 is not null and l2 is null.
+   * <li>< 0 if l1 < l2 or l1 is null and l2 is not null.
+   */
+  int compare(final T l1, final T l2);
+
+  /**
+   * used for computing average of <S> data values. Not providing the divide
+   * method that takes two <S> values as it si not needed as of now.
+   * @param o
+   * @param l
+   * @return
+   */
+  double divideForAvg(S o, Long l);
+}
\ No newline at end of file

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java?rev=1096620&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java Mon Apr 25 22:02:24 2011
@@ -0,0 +1,785 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
+import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * A test class to cover aggregate functions, that can be implemented using
+ * Coprocessors.
+ */
+public class TestAggregateProtocol {
+  protected static Log myLog = LogFactory.getLog(TestAggregateProtocol.class);
+
+  /**
+   * Creating the test infrastructure.
+   */
+  private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
+  private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
+  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
+  private static final byte[] TEST_MULTI_CQ = Bytes.toBytes("TestMultiCQ");
+
+  private static byte[] ROW = Bytes.toBytes("testRow");
+  private static final int ROWSIZE = 20;
+  private static final int rowSeperator1 = 5;
+  private static final int rowSeperator2 = 12;
+  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+  private static HBaseTestingUtility util = new HBaseTestingUtility();
+  private static MiniHBaseCluster cluster = null;
+  private static Configuration conf = util.getConfiguration();
+
+  /**
+   * A set up method to start the test cluster. AggregateProtocolImpl is
+   * registered and will be loaded during region startup.
+   * @throws Exception
+   */
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        "org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
+
+    util.startMiniCluster(2);
+    cluster = util.getMiniHBaseCluster();
+    HTable table = util.createTable(TEST_TABLE, TEST_FAMILY);
+    util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY,
+        new byte[][] { HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1],
+            ROWS[rowSeperator2] });
+    /**
+     * The testtable has one CQ which is always populated and one variable CQ
+     * for each row rowkey1: CF:CQ CF:CQ1 rowKey2: CF:CQ CF:CQ2
+     */
+    for (int i = 0; i < ROWSIZE; i++) {
+      Put put = new Put(ROWS[i]);
+      Long l = new Long(i);
+      put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(l));
+      table.put(put);
+      Put p2 = new Put(ROWS[i]);
+      p2.add(TEST_FAMILY, Bytes.add(TEST_MULTI_CQ, Bytes.toBytes(l)), Bytes
+          .toBytes(l * 10));
+      table.put(p2);
+    }
+  }
+
+  /**
+   * Shutting down the cluster
+   * @throws Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  /**
+   * an infrastructure method to prepare rows for the testtable.
+   * @param base
+   * @param n
+   * @return
+   */
+  private static byte[][] makeN(byte[] base, int n) {
+    byte[][] ret = new byte[n][];
+    for (int i = 0; i < n; i++) {
+      ret[i] = Bytes.add(base, Bytes.toBytes(i));
+    }
+    return ret;
+  }
+
+  /**
+   * **************************** ROW COUNT Test cases *******************
+   */
+
+  /**
+   * This will test rowcount with a valid range, i.e., a subset of rows. It will
+   * be the most common use case.
+   * @throws Throwable
+   */
+  @Test
+  public void testRowCountWithValidRange() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    scan.setStartRow(ROWS[2]);
+    scan.setStopRow(ROWS[14]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
+    assertEquals(12, rowCount);
+  }
+
+  /**
+   * This will test the row count on the entire table. Startrow and endrow will
+   * be null.
+   * @throws Throwable
+   */
+  @Test
+  public void testRowCountAllTable() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long rowCount = aClient.rowCount(TEST_TABLE, ci,
+        scan);
+    assertEquals(ROWSIZE, rowCount);
+  }
+
+  /**
+   * This will test the row count with startrow > endrow. The result should be
+   * -1.
+   * @throws Throwable
+   */
+  @Test
+  public void testRowCountWithInvalidRange1() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    scan.setStartRow(ROWS[5]);
+    scan.setStopRow(ROWS[2]);
+
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long rowCount = -1;
+    try {
+      rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+      myLog.error("Exception thrown in the invalidRange method"
+          + e.getStackTrace());
+    }
+    assertEquals(-1, rowCount);
+  }
+
+  /**
+   * This will test the row count with startrow = endrow and they will be
+   * non-null. The result should be 0, as it assumes a non-get query.
+   * @throws Throwable
+   */
+  @Test
+  public void testRowCountWithInvalidRange2() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    scan.setStartRow(ROWS[5]);
+    scan.setStopRow(ROWS[5]);
+
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long rowCount = -1;
+    try {
+      rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+      rowCount = 0;
+    }
+    assertEquals(0, rowCount);
+  }
+
+  /**
+   * This should return a 0
+   */
+  @Test
+  public void testRowCountWithNullCF() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.setStartRow(ROWS[5]);
+    scan.setStopRow(ROWS[15]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long rowCount = -1;
+    try {
+      rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+       rowCount = 0;
+    }
+    assertEquals(0, rowCount);
+  }
+
+  @Test
+  public void testRowCountWithNullCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long rowCount = aClient.rowCount(TEST_TABLE, ci,
+        scan);
+    assertEquals(20, rowCount);
+  }
+
+  @Test
+  public void testRowCountWithPrefixFilter() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+    scan.setFilter(f);
+    long rowCount = aClient.rowCount(TEST_TABLE, ci,
+        scan);
+    assertEquals(0, rowCount);
+  }
+
+  /**
+   * ***************Test cases for Maximum *******************
+   */
+
+  /**
+   * give max for the entire table.
+   * @throws Throwable
+   */
+  @Test
+  public void testMaxWithValidRange() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long maximum = aClient.max(TEST_TABLE, ci, scan);
+    assertEquals(19, maximum);
+  }
+
+  /**
+   * @throws Throwable
+   */
+  @Test
+  public void testMaxWithValidRange2() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    scan.setStartRow(ROWS[5]);
+    scan.setStopRow(ROWS[15]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long max = aClient.max(TEST_TABLE, ci, scan);
+    assertEquals(14, max);
+  }
+
+  @Test
+  public void testMaxWithValidRangeWithNoCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long maximum = aClient.max(TEST_TABLE, ci, scan);
+    assertEquals(190, maximum);
+  }
+
+  @Test
+  public void testMaxWithValidRange2WithNoCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setStartRow(ROWS[6]);
+    scan.setStopRow(ROWS[7]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long max = aClient.max(TEST_TABLE, ci, scan);
+    assertEquals(60, max);
+  }
+
+  @Test
+  public void testMaxWithValidRangeWithNullCF() {
+    AggregationClient aClient = new AggregationClient(conf);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Scan scan = new Scan();
+    Long max = null;
+    try {
+      max = aClient.max(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+      max = null;
+    }
+    assertEquals(null, max);// CP will throw an IOException about the
+    // null column family, and max will be set to 0
+  }
+
+  @Test
+  public void testMaxWithInvalidRange() {
+    AggregationClient aClient = new AggregationClient(conf);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Scan scan = new Scan();
+    scan.setStartRow(ROWS[4]);
+    scan.setStopRow(ROWS[2]);
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    long max = Long.MIN_VALUE;
+    try {
+      max = aClient.max(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+      max = 0;
+    }
+    assertEquals(0, max);// control should go to the catch block
+  }
+
+  @Test
+  public void testMaxWithInvalidRange2() throws Throwable {
+    long max = Long.MIN_VALUE;
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    scan.setStartRow(ROWS[4]);
+    scan.setStopRow(ROWS[4]);
+    try {
+      AggregationClient aClient = new AggregationClient(conf);
+      final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+      max = aClient.max(TEST_TABLE, ci, scan);
+    } catch (Exception e) {
+      max = 0;
+    }
+    assertEquals(0, max);// control should go to the catch block
+  }
+
+  @Test
+  public void testMaxWithFilter() throws Throwable {
+    Long max = 0l;
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+    scan.setFilter(f);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    max = aClient.max(TEST_TABLE, ci, scan);
+    assertEquals(null, max);
+  }
+
+  /**
+   * **************************Test cases for Minimum ***********************
+   */
+
+  /**
+   * @throws Throwable
+   */
+  @Test
+  public void testMinWithValidRange() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    scan.setStartRow(HConstants.EMPTY_START_ROW);
+    scan.setStopRow(HConstants.EMPTY_END_ROW);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Long min = aClient.min(TEST_TABLE, ci,
+        scan);
+    assertEquals(0l, min.longValue());
+  }
+
+  /**
+   * @throws Throwable
+   */
+  @Test
+  public void testMinWithValidRange2() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    scan.setStartRow(ROWS[5]);
+    scan.setStopRow(ROWS[15]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long min = aClient.min(TEST_TABLE, ci, scan);
+    assertEquals(5, min);
+  }
+
+  @Test
+  public void testMinWithValidRangeWithNoCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setStartRow(HConstants.EMPTY_START_ROW);
+    scan.setStopRow(HConstants.EMPTY_END_ROW);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long min = aClient.min(TEST_TABLE, ci,
+        scan);
+    assertEquals(0, min);
+  }
+
+  @Test
+  public void testMinWithValidRange2WithNoCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setStartRow(ROWS[6]);
+    scan.setStopRow(ROWS[7]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long min = aClient.min(TEST_TABLE, ci, scan);
+    assertEquals(6, min);
+  }
+
+  @Test
+  public void testMinWithValidRangeWithNullCF() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.setStartRow(ROWS[5]);
+    scan.setStopRow(ROWS[15]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Long min = null;
+    try {
+      min = aClient.min(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+    }
+    assertEquals(null, min);// CP will throw an IOException about the
+    // null column family, and max will be set to 0
+  }
+
+  @Test
+  public void testMinWithInvalidRange() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Long min = null;
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setStartRow(ROWS[4]);
+    scan.setStopRow(ROWS[2]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    try {
+      min = aClient.min(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+    }
+    assertEquals(null, min);// control should go to the catch block
+  }
+
+  @Test
+  public void testMinWithInvalidRange2() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setStartRow(ROWS[6]);
+    scan.setStopRow(ROWS[6]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Long min = null;
+    try {
+      min = aClient.min(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+    }
+    assertEquals(null, min);// control should go to the catch block
+  }
+
+  @Test
+  public void testMinWithFilter() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
+    Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+    scan.setFilter(f);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Long min = null;
+    min = aClient.min(TEST_TABLE, ci, scan);
+    assertEquals(null, min);
+  }
+
+  /**
+   * *************** Test cases for Sum *********************
+   */
+  /**
+   * @throws Throwable
+   */
+  @Test
+  public void testSumWithValidRange() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long sum = aClient.sum(TEST_TABLE, ci,
+        scan);
+    assertEquals(190, sum);
+  }
+
+  /**
+   * @throws Throwable
+   */
+  @Test
+  public void testSumWithValidRange2() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
+    scan.setStartRow(ROWS[5]);
+    scan.setStopRow(ROWS[15]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long sum = aClient.sum(TEST_TABLE, ci, scan);
+    assertEquals(95, sum);
+  }
+
+  @Test
+  public void testSumWithValidRangeWithNoCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long sum = aClient.sum(TEST_TABLE, ci,
+        scan);
+    assertEquals(190 + 1900, sum);
+  }
+
+  @Test
+  public void testSumWithValidRange2WithNoCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setStartRow(ROWS[6]);
+    scan.setStopRow(ROWS[7]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long sum = aClient.sum(TEST_TABLE, ci, scan);
+    assertEquals(6 + 60, sum);
+  }
+
+  @Test
+  public void testSumWithValidRangeWithNullCF() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.setStartRow(ROWS[6]);
+    scan.setStopRow(ROWS[7]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Long sum = null;
+    try {
+      sum = aClient.sum(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+    }
+    assertEquals(null, sum);// CP will throw an IOException about the
+    // null column family, and max will be set to 0
+  }
+
+  @Test
+  public void testSumWithInvalidRange() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setStartRow(ROWS[6]);
+    scan.setStopRow(ROWS[2]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Long sum = null;
+    try {
+      sum = aClient.sum(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+    }
+    assertEquals(null, sum);// control should go to the catch block
+  }
+
+  @Test
+  public void testSumWithFilter() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setFilter(f);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Long sum = null;
+    sum = aClient.sum(TEST_TABLE, ci, scan);
+    assertEquals(null, sum);
+  }
+
+  /**
+   * ****************************** Test Cases for Avg **************
+   */
+  /**
+   * @throws Throwable
+   */
+  @Test
+  public void testAvgWithValidRange() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    double avg = aClient.avg(TEST_TABLE, ci,
+        scan);
+    assertEquals(9.5, avg, 0);
+  }
+
+  /**
+   * @throws Throwable
+   */
+  @Test
+  public void testAvgWithValidRange2() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
+    scan.setStartRow(ROWS[5]);
+    scan.setStopRow(ROWS[15]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    double avg = aClient.avg(TEST_TABLE, ci, scan);
+    assertEquals(9.5, avg, 0);
+  }
+
+  @Test
+  public void testAvgWithValidRangeWithNoCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    double avg = aClient.avg(TEST_TABLE, ci,
+        scan);
+    assertEquals(104.5, avg, 0);
+  }
+
+  @Test
+  public void testAvgWithValidRange2WithNoCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setStartRow(ROWS[6]);
+    scan.setStopRow(ROWS[7]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    double avg = aClient.avg(TEST_TABLE, ci, scan);
+    assertEquals(6 + 60, avg, 0);
+  }
+
+  @Test
+  public void testAvgWithValidRangeWithNullCF() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Double avg = null;
+    try {
+      avg = aClient.avg(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+    }
+    assertEquals(null, avg);// CP will throw an IOException about the
+    // null column family, and max will be set to 0
+  }
+
+  @Test
+  public void testAvgWithInvalidRange() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
+    scan.setStartRow(ROWS[5]);
+    scan.setStopRow(ROWS[1]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Double avg = null;
+    try {
+      avg = aClient.avg(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+    }
+    assertEquals(null, avg);// control should go to the catch block
+  }
+
+  @Test
+  public void testAvgWithFilter() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
+    Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+    scan.setFilter(f);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Double avg = null;
+    avg = aClient.avg(TEST_TABLE, ci, scan);
+    assertEquals(Double.NaN, avg, 0);
+  }
+
+  /**
+   * ****************** Test cases for STD **********************
+   */
+  /**
+   * @throws Throwable
+   */
+  @Test
+  public void testStdWithValidRange() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    double std = aClient.std(TEST_TABLE, ci,
+        scan);
+    assertEquals(5.766, std, 0.05d);
+  }
+
+  /**
+   * @throws Throwable
+   */
+  @Test
+  public void testStdWithValidRange2() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
+    scan.setStartRow(ROWS[5]);
+    scan.setStopRow(ROWS[15]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    double std = aClient.std(TEST_TABLE, ci, scan);
+    assertEquals(2.87, std, 0.05d);
+  }
+
+  @Test
+  public void testStdWithValidRangeWithNoCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    double std = aClient.std(TEST_TABLE, ci,
+        scan);
+    assertEquals(63.42, std, 0.05d);
+  }
+
+  @Test
+  public void testStdWithValidRange2WithNoCQ() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setStartRow(ROWS[6]);
+    scan.setStopRow(ROWS[7]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    double std = aClient.std(TEST_TABLE, ci, scan);
+    assertEquals(0, std, 0);
+  }
+
+  @Test
+  public void testStdWithValidRangeWithNullCF() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.setStartRow(ROWS[6]);
+    scan.setStopRow(ROWS[17]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Double std = null;
+    try {
+      std = aClient.std(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+    }
+    assertEquals(null, std);// CP will throw an IOException about the
+    // null column family, and max will be set to 0
+  }
+
+  @Test
+  public void testStdWithInvalidRange() {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setStartRow(ROWS[6]);
+    scan.setStopRow(ROWS[1]);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Double std = null;
+    try {
+      std = aClient.std(TEST_TABLE, ci, scan);
+    } catch (Throwable e) {
+    }
+    assertEquals(null, std);// control should go to the catch block
+  }
+
+  @Test
+  public void testStdWithFilter() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+    Scan scan = new Scan();
+    scan.addFamily(TEST_FAMILY);
+    scan.setFilter(f);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    Double std = null;
+    std = aClient.std(TEST_TABLE, ci, scan);
+    assertEquals(Double.NaN, std, 0);
+  }
+}



Mime
View raw message