eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [47/52] [abbrv] incubator-eagle git commit: [EAGLE-520] Fix and decouple co-processor from eagle aggreation query service
Date Wed, 07 Sep 2016 17:42:43 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateClientImpl.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateClientImpl.java
index 5835738..0e92c64 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateClientImpl.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateClientImpl.java
@@ -18,8 +18,8 @@ package org.apache.eagle.storage.hbase.query.coprocessor.impl;
 
 import org.apache.eagle.log.entity.meta.EntityDefinition;
 import org.apache.eagle.query.aggregate.AggregateFunctionType;
-import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
 import org.apache.eagle.storage.hbase.query.coprocessor.*;
+import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -32,115 +32,111 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Not thread safe
- *
- * @since : 11/2/14,2014
+ * Not thread safe.
  */
 public class AggregateClientImpl implements AggregateClient {
-	private final static Logger LOG = LoggerFactory.getLogger(AggregateClient.class);
-	private AggregateResultCallback callback;
+    private static final Logger LOG = LoggerFactory.getLogger(AggregateClient.class);
+    private AggregateResultCallback callback;
 
-	private void checkNotNull(Object obj,String name) {
-		if(obj==null) throw new NullPointerException(name+" is null");
-	}
+    private void checkNotNull(Object obj, String name) {
+        if (obj == null) {
+            throw new NullPointerException(name + " is null");
+        }
+    }
 
-	@Override
-	public AggregateResult aggregate(final HTableInterface table,
-	                                       final EntityDefinition entityDefinition,
-	                                       final Scan scan,
-	                                       final List<String> groupbyFields,
-	                                       final List<AggregateFunctionType> aggregateFuncTypes,
-	                                       final List<String> aggregatedFields,
-	                                       final boolean timeSeries,
-	                                       final long startTime,
-	                                       final long endTime,
-	                                       final long intervalMin) throws IOException {
-		checkNotNull(entityDefinition,"entityDefinition");
-		final List<AggregateFunctionType> _aggregateFuncTypes = convertToCoprocessorAggregateFunc(aggregateFuncTypes);
-		final List<byte[]> _aggregateFuncTypesBytes = AggregateFunctionType.toBytesList(_aggregateFuncTypes);
-//		if(timeSeries) TimeSeriesAggregator.validateTimeRange(startTime,endTime,intervalMin);
-		callback = new AggregateResultCallbackImpl(aggregateFuncTypes);
-		try{
-			if(!LOG.isDebugEnabled()){
-				LOG.info("Going to exec coprocessor: "+AggregateProtocol.class.getSimpleName());
-			}else{
-				LOG.debug("Going to exec coprocessor: "+AggregateProtocol.class.getName());
-			}
+    @Override
+    public AggregateResult aggregate(final HTableInterface table,
+                                     final EntityDefinition entityDefinition,
+                                     final Scan scan,
+                                     final List<String> groupbyFields,
+                                     final List<AggregateFunctionType> aggregateFuncTypes,
+                                     final List<String> aggregatedFields,
+                                     final boolean timeSeries,
+                                     final long startTime,
+                                     final long endTime,
+                                     final long intervalMin) throws IOException {
+        checkNotNull(entityDefinition, "entityDefinition");
+        final List<AggregateFunctionType> _aggregateFuncTypes = convertToCoprocessorAggregateFunc(aggregateFuncTypes);
+        final List<byte[]> _aggregateFuncTypesBytes = AggregateFunctionType.toBytesList(_aggregateFuncTypes);
+        // if(timeSeries) TimeSeriesAggregator.validateTimeRange(startTime,endTime,intervalMin);
+        callback = new AggregateResultCallbackImpl(aggregateFuncTypes);
+        try {
+            if (!LOG.isDebugEnabled()) {
+                LOG.info("Going to exec coprocessor: " + AggregateProtocol.class.getSimpleName());
+            } else {
+                LOG.debug("Going to exec coprocessor: " + AggregateProtocol.class.getName());
+            }
 
-//			table.coprocessorExec(AggregateProtocol.class,scan.getStartRow(),scan.getStopRow(),new Batch.Call<AggregateProtocol, AggregateResult>(){
-//				@Override
-//				public AggregateResult call(AggregateProtocol instance) throws IOException {
-//					if(timeSeries){
-//						return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields,startTime,endTime,intervalMin);
-//					}else{
-//						return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
-//					}
-//				}
-//			},callback);
+            //  table.coprocessorExec(AggregateProtocol.class,scan.getStartRow(),scan.getStopRow(),new Batch.Call<AggregateProtocol, AggregateResult>(){
+            //  @Override
+            //  public AggregateResult call(AggregateProtocol instance) throws IOException {
+            //      if(timeSeries){
+            //   return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields,startTime,endTime,intervalMin);
+            //      }else{
+            //   return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
+            //      }
+            //  }
+            //  },callback);
 
-          table.coprocessorService(AggregateProtos.AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtos.AggregateProtocol, AggregateProtos.AggregateResult>() {
-              @Override
-              public AggregateProtos.AggregateResult call(AggregateProtos.AggregateProtocol instance) throws IOException {
-                  BlockingRpcCallback<AggregateProtos.AggregateResult> rpcCallback = new BlockingRpcCallback<AggregateProtos.AggregateResult>();
-                  if(timeSeries){
-                      AggregateProtos.TimeSeriesAggregateRequest timeSeriesAggregateRequest = ProtoBufConverter
-                              .toPBTimeSeriesRequest(
-                                      entityDefinition,
-                                      scan,
-                                      groupbyFields,
-                                      _aggregateFuncTypesBytes,
-                                      aggregatedFields,
-                                      startTime,
-                                      endTime,
-                                      intervalMin);
-                      instance.timeseriesAggregate(null, timeSeriesAggregateRequest, rpcCallback);
-                      return rpcCallback.get();
-					}else{
-                      AggregateProtos.AggregateRequest aggregateRequest = ProtoBufConverter.toPBRequest(
-                                      entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
-                      instance.aggregate(null, aggregateRequest, rpcCallback);
-                      return rpcCallback.get();
-					}
-              }
-          }, callback);
-		} catch (Throwable t){
-			LOG.error(t.getMessage(),t);
-			throw new IOException(t);
-		}
-		return callback.result();
-	}
-	
-//	@Override
-//	public void result(final GroupbyKeyValueCreationListener[] listeners) {
-//		callback.asyncRead(Arrays.asList(listeners));
-//	}
+            table.coprocessorService(AggregateProtos.AggregateProtocol.class,
+                scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtos.AggregateProtocol, AggregateProtos.AggregateResult>() {
+                    @Override
+                    public AggregateProtos.AggregateResult call(AggregateProtos.AggregateProtocol instance) throws IOException {
+                        BlockingRpcCallback<AggregateProtos.AggregateResult> rpcCallback = new BlockingRpcCallback<>();
+                        if (timeSeries) {
+                            AggregateProtos.TimeSeriesAggregateRequest timeSeriesAggregateRequest = ProtoBufConverter
+                                    .toPBTimeSeriesRequest(
+                                            entityDefinition,
+                                            scan,
+                                            groupbyFields,
+                                            _aggregateFuncTypesBytes,
+                                            aggregatedFields,
+                                            startTime,
+                                            endTime,
+                                            intervalMin);
+                            instance.timeseriesAggregate(null, timeSeriesAggregateRequest, rpcCallback);
+                            return rpcCallback.get();
+                        } else {
+                            AggregateProtos.AggregateRequest aggregateRequest = ProtoBufConverter.toPBRequest(
+                                    entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
+                            instance.aggregate(null, aggregateRequest, rpcCallback);
+                            return rpcCallback.get();
+                        }
+                    }
+                }, callback);
+        } catch (Throwable t) {
+            LOG.error(t.getMessage(), t);
+            throw new IOException(t);
+        }
+        return callback.result();
+    }
 
-	@Override
-	public AggregateResult  aggregate(HTableInterface table, EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<AggregateFunctionType> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
-		return this.aggregate(table,entityDefinition,scan,groupbyFields,aggregateFuncTypes,aggregatedFields,false,0,0,0);
-	}
+    @Override
+    public AggregateResult aggregate(HTableInterface table, EntityDefinition entityDefinition, Scan scan,
+                                     List<String> groupbyFields, List<AggregateFunctionType> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
+        return this.aggregate(table, entityDefinition, scan, groupbyFields, aggregateFuncTypes, aggregatedFields, false, 0, 0, 0);
+    }
 
-	/**
-	 *
-	 * <h4>
-	 *   Convert client side funcs to server side funcs, especially for <b>avg</b>
-	 * </h4>
-	 * <ul>
-	 *  <li><b>avg</b>:
-	 *    Coprocessor[ <b>&lt;sum,count&gt;</b>] => Callback[(sum<SUB>1</SUB>+sum<SUB>2</SUB>+...+sum<SUB>n</SUB>)/(count<SUB>1</SUB>+count<SUB>2</SUB>+...+count<SUB>n</SUB>)]
-	 * </li>
-	 * </ul>
-	 * @param funcs List&lt;AggregateFunctionType&gt;
-	 * @return
-	 */
-	private List<AggregateFunctionType> convertToCoprocessorAggregateFunc(List<AggregateFunctionType> funcs){
-		List<AggregateFunctionType> copy = new ArrayList<AggregateFunctionType>(funcs);
-		for(int i=0;i<funcs.size();i++){
-			AggregateFunctionType func = copy.get(i);
-			if(AggregateFunctionType.avg.equals(func)){
-				copy.set(i,AggregateFunctionType.sum);
-			}
-		}
-		return copy;
-	}
+    /**
+     * <h4>
+     * Convert client side funcs to server side funcs, especially for <b>avg</b>
+     * </h4>
+     * <ul>
+     * <li><b>avg</b>:
+     * Coprocessor[ <b>&lt;sum,count&gt;</b>] => Callback[(sum<SUB>1</SUB>+sum<SUB>2</SUB>+...+sum<SUB>n</SUB>)/(count<SUB>1</SUB>+count<SUB>2</SUB>+...+count<SUB>n</SUB>)]
+     * </li>
+     * </ul>
+     *
+     * @param funcs List&lt;AggregateFunctionType&gt;
+     */
+    private List<AggregateFunctionType> convertToCoprocessorAggregateFunc(List<AggregateFunctionType> funcs) {
+        List<AggregateFunctionType> copy = new ArrayList<AggregateFunctionType>(funcs);
+        for (int i = 0; i < funcs.size(); i++) {
+            AggregateFunctionType func = copy.get(i);
+            if (AggregateFunctionType.avg.equals(func)) {
+                copy.set(i, AggregateFunctionType.sum);
+            }
+        }
+        return copy;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateResultCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateResultCallbackImpl.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateResultCallbackImpl.java
index 2e0248f..5d7011f 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateResultCallbackImpl.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateResultCallbackImpl.java
@@ -16,13 +16,13 @@
  */
 package org.apache.eagle.storage.hbase.query.coprocessor.impl;
 
+import org.apache.eagle.common.ByteUtil;
 import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.apache.eagle.query.aggregate.raw.*;
 import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult;
 import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResultCallback;
 import org.apache.eagle.storage.hbase.query.coprocessor.ProtoBufConverter;
 import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
-import org.apache.eagle.common.ByteUtil;
-import org.apache.eagle.query.aggregate.raw.*;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,111 +33,113 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-/**
- * @since : 11/3/14,2014
- */
 public class AggregateResultCallbackImpl implements AggregateResultCallback {
-	private final static Logger LOG = LoggerFactory.getLogger(AggregateResultCallback.class);
-	private Map<GroupbyKey,List<Function>> groupedFuncs = new HashMap<GroupbyKey, List<Function>>();
-	private List<FunctionFactory> functionFactories = new ArrayList<FunctionFactory>();
-	private int numFuncs = 0;
-	private long kvCounter = 0;
-	private int regionCounter = 0;
-	private long startTimestamp;
-	private long stopTimestamp;
-	
-	// Start RPC call time (i.e constructor initialized time)
-	private final long _start;
-	
-	public AggregateResultCallbackImpl(List<AggregateFunctionType> aggregateFunctionTypes){
-		this.numFuncs = aggregateFunctionTypes.size();
-		for(AggregateFunctionType type: aggregateFunctionTypes){
-			 functionFactories.add(FunctionFactory.locateFunctionFactory(type));
-		}
-		this._start = System.currentTimeMillis();
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(AggregateResultCallback.class);
+    private Map<GroupbyKey, List<Function>> groupedFuncs = new HashMap<GroupbyKey, List<Function>>();
+    private List<FunctionFactory> functionFactories = new ArrayList<FunctionFactory>();
+    private int numFuncs = 0;
+    private long kvCounter = 0;
+    private int regionCounter = 0;
+    private long startTimestamp;
+    private long stopTimestamp;
 
-//	@Override
-	public void update(byte[] region, byte[] row, AggregateResult result) {
-		AggregateResult _result = result;
-		regionCounter ++;
-		kvCounter += _result.getKeyValues().size();
-		if(this.startTimestamp == 0 || this.startTimestamp > _result.getStartTimestamp()){
-			this.startTimestamp = _result.getStartTimestamp();
-		}
-		if(this.stopTimestamp == 0 || this.stopTimestamp < _result.getStopTimestamp()){
-			this.stopTimestamp = _result.getStopTimestamp();
-		}
-		for(GroupbyKeyValue keyValue:_result.getKeyValues()){
-			update(keyValue);
-		}
-	}
+    // Start RPC call time (i.e constructor initialized time)
+    private final long _start;
 
-	public void update(GroupbyKeyValue keyValue) {
-		// Incr kvCounter if call #update(GroupbyKeyValue) directly
-		// instead of #update(byte[] region, byte[] row, AggregateResult result)
-		if(this.getKVCounter() == 0) this.kvCounter ++;
-		// Accumulate key value for GroubyKey mapped Functions
-		GroupbyKey groupedKey = keyValue.getKey();
-		List<Function> funcs = groupedFuncs.get(groupedKey);
-		if(funcs==null){
-			funcs = new ArrayList<Function>();
-			for(FunctionFactory functionFactory:this.functionFactories){
-				funcs.add(functionFactory.createFunction());
-			}
-			groupedFuncs.put(groupedKey, funcs);
-		}
-		for(int i=0;i<this.numFuncs;i++){
-			int intCount = 1;
-			byte[] count = keyValue.getValue().getMeta(i).getBytes();
-			if(count != null){
-				intCount = ByteUtil.bytesToInt(count);
-			}
-			funcs.get(i).run(keyValue.getValue().get(i).get(), intCount);
-		}
-	}
+    public AggregateResultCallbackImpl(List<AggregateFunctionType> aggregateFunctionTypes) {
+        this.numFuncs = aggregateFunctionTypes.size();
+        for (AggregateFunctionType type : aggregateFunctionTypes) {
+            functionFactories.add(FunctionFactory.locateFunctionFactory(type));
+        }
+        this._start = System.currentTimeMillis();
+    }
 
-	public long getKVCounter(){
-		return this.kvCounter;
-	}
+    public long getKVCounter() {
+        return this.kvCounter;
+    }
 
-	public long getRegionCounter(){
-		return this.regionCounter;
-	}
+    public long getRegionCounter() {
+        return this.regionCounter;
+    }
 
-	public AggregateResult result(){
-		List<GroupbyKeyValue> mergedKeyValues = new ArrayList<GroupbyKeyValue>();
-		for(Map.Entry<GroupbyKey,List<Function>> entry:this.groupedFuncs.entrySet()){
-			GroupbyValue value = new GroupbyValue(this.numFuncs);
-			for(Function func:entry.getValue()){
-				double _result = func.result();
-				int _count = func.count();
-				value.add(_result);
-				value.addMeta(_count);
-			}
-			mergedKeyValues.add(new GroupbyKeyValue(entry.getKey(),value));
-		}
-		
-		final long _stop = System.currentTimeMillis();
-		if(this.getRegionCounter() > 0) {
-			LOG.info(String.format("result = %d rows, startTime = %d, endTime = %d, source = %d rows, regions = %d, , spend = %d ms", mergedKeyValues.size(),this.startTimestamp,this.stopTimestamp, this.getKVCounter(), this.getRegionCounter(),(_stop - _start)));
-		}else{
-			LOG.info(String.format("result = %d rows, startTime = %d, endTime = %d, source = %d rows, spend = %d ms", mergedKeyValues.size(),this.startTimestamp,this.stopTimestamp,this.getKVCounter(), (_stop - _start)));
-		}
-		AggregateResult result = new AggregateResult();
-		result.setKeyValues(mergedKeyValues);
-		result.setStartTimestamp(this.startTimestamp);
-		result.setStopTimestamp(this.stopTimestamp);
-		return result;
-	}
+    public AggregateResult result() {
+        List<GroupbyKeyValue> mergedKeyValues = new ArrayList<GroupbyKeyValue>();
+        for (Map.Entry<GroupbyKey, List<Function>> entry : this.groupedFuncs.entrySet()) {
+            GroupbyValue value = new GroupbyValue(this.numFuncs);
+            for (Function func : entry.getValue()) {
+                double _result = func.result();
+                int _count = func.count();
+                value.add(_result);
+                value.addMeta(_count);
+            }
+            mergedKeyValues.add(new GroupbyKeyValue(entry.getKey(), value));
+        }
+
+        final long _stop = System.currentTimeMillis();
+        if (this.getRegionCounter() > 0) {
+            LOG.info(String.format("result = %d rows, startTime = %d, endTime = %d, source = %d rows, regions = %d, , spend = %d ms",
+                mergedKeyValues.size(), this.startTimestamp, this.stopTimestamp, this.getKVCounter(), this.getRegionCounter(), (_stop - _start)));
+        } else {
+            LOG.info(String.format("result = %d rows, startTime = %d, endTime = %d, source = %d rows, spend = %d ms",
+                mergedKeyValues.size(), this.startTimestamp, this.stopTimestamp, this.getKVCounter(), (_stop - _start)));
+        }
+        AggregateResult result = new AggregateResult();
+        result.setKeyValues(mergedKeyValues);
+        result.setStartTimestamp(this.startTimestamp);
+        result.setStopTimestamp(this.stopTimestamp);
+        return result;
+    }
 
     @Override
     public void update(byte[] region, byte[] row, AggregateProtos.AggregateResult result) {
         try {
-            if(result == null) throw new IllegalStateException(new CoprocessorException("result is null"));
-            this.update(region,row, ProtoBufConverter.fromPBResult(result));
+            if (result == null) {
+                throw new IllegalStateException(new CoprocessorException("result is null"));
+            }
+            this.update(region, row, ProtoBufConverter.fromPBResult(result));
         } catch (IOException e) {
-            LOG.error("Failed to convert PB-Based message",e);
+            LOG.error("Failed to convert PB-Based message", e);
+        }
+    }
+
+    public void update(GroupbyKeyValue keyValue) {
+        // Incr kvCounter if call #update(GroupbyKeyValue) directly
+        // instead of #update(byte[] region, byte[] row, AggregateResult result)
+        if (this.getKVCounter() == 0) {
+            this.kvCounter++;
+        }
+        // Accumulate key value for GroubyKey mapped Functions
+        GroupbyKey groupedKey = keyValue.getKey();
+        List<Function> funcs = groupedFuncs.get(groupedKey);
+        if (funcs == null) {
+            funcs = new ArrayList<Function>();
+            for (FunctionFactory functionFactory : this.functionFactories) {
+                funcs.add(functionFactory.createFunction());
+            }
+            groupedFuncs.put(groupedKey, funcs);
+        }
+        for (int i = 0; i < this.numFuncs; i++) {
+            int intCount = 1;
+            byte[] count = keyValue.getValue().getMeta(i).getBytes();
+            if (count != null) {
+                intCount = ByteUtil.bytesToInt(count);
+            }
+            funcs.get(i).run(keyValue.getValue().get(i).get(), intCount);
+        }
+    }
+
+    public void update(byte[] region, byte[] row, AggregateResult result) {
+        AggregateResult _result = result;
+        regionCounter++;
+        kvCounter += _result.getKeyValues().size();
+        if (this.startTimestamp == 0 || this.startTimestamp > _result.getStartTimestamp()) {
+            this.startTimestamp = _result.getStartTimestamp();
+        }
+        if (this.stopTimestamp == 0 || this.stopTimestamp < _result.getStopTimestamp()) {
+            this.stopTimestamp = _result.getStopTimestamp();
+        }
+        for (GroupbyKeyValue keyValue : _result.getKeyValues()) {
+            update(keyValue);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/package-info.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/package-info.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/package-info.java
index 7d81872..09cf983 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/package-info.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/package-info.java
@@ -14,35 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 /**
- *
  * <h1>Eagle Aggregation Coprocessor</h1>
- *
+ * <p>
  * <h2>Deployment and Usage</h2>
  * <ol>
- * 	<li>
- *  Firstly deploy jar files to cluster on local file system or HDFS.<br/>
- * 	</li>
- * 	<li>
- * 	Secondly configure in <code>hbase-site.xml</code> as following:
- * 	<pre>&lt;property&gt;
+ * <li>
+ * Firstly deploy jar files to cluster on local file system or HDFS.<br/>
+ * </li>
+ * <li>
+ * Secondly configure in <code>hbase-site.xml</code> as following:
+ * <pre>&lt;property&gt;
  *   &lt;name>hbase.coprocessor.region.classes&lt;/name&gt;
  *   &lt;value>AggregateProtocolEndPoint&lt;/value&gt;
  * &lt;/property&gt;
- * 	</pre>
- * 	Or register on related hbase tables
- * 	<pre> hbase(main):005:0>  alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs:///foo.jar|AggregateProtocolEndPoint|1001|'</pre>
- * 	</li>
- * 	<li>
+ * </pre>
+ * Or register on related hbase tables
+ * <pre> hbase(main):005:0>  alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs:///foo.jar|AggregateProtocolEndPoint|1001|'</pre>
+ * </li>
+ * <li>
  * <code>
  * AggregateClient client = new AggregateClientImpl();
  * client.aggregate
  * AggregateResult result = client.result
- * 
+ *
  * </code>
  * </li>
  * </ol>
- * 
+ *
  * <h2>Performance</h2>
  *
  * <b>NOTE:</b>
@@ -56,56 +56,53 @@
  * <b>A simple benchmark report for reference</b>
  * <br/>
  * <table border="1">
- *     <thead>
- *         <tr>
- *             <th>Region Servers</th> <th>Record Count</th>
- *             <th>Coprocessor</th><th>No-Coprocessor</th><th>Aggregation</th>
- *         </tr>
- *     </thead>
- *     <tbody>
- *         <tr>
- *             <td rowspan="10">1</td><td rowspan="10">1000,000</td>
- *             <td>10193 ms</td><td>21988 ms</td><td><@cluster,@datacenter>{count}</td>
- *         </tr>
- *         <tr>
- *             <td>10010 ms</td><td>22547 ms</td><td><@cluster,@datacenter>{sum(numTotalMaps)}</td>
- *         </tr>
- *         <tr>
- *             <td>10334 ms</td><td>23433 ms</td><td><@cluster,@datacenter>{avg(numTotalMaps)}</td>
- *         </tr>
- *         <tr>
- *             <td>10045 ms</td><td>22690 ms</td><td><@cluster,@datacenter>{max(numTotalMaps)}</td>
- *         </tr>
- *         <tr>
- *             <td>10190 ms</td><td>21902 ms</td><td><@cluster,@datacenter>{min(numTotalMaps)}</td>
- *         </tr>
- *     </tbody>
+ * <thead>
+ * <tr>
+ * <th>Region Servers</th> <th>Record Count</th>
+ * <th>Coprocessor</th><th>No-Coprocessor</th><th>Aggregation</th>
+ * </tr>
+ * </thead>
+ * <tbody>
+ * <tr>
+ * <td rowspan="10">1</td><td rowspan="10">1000,000</td>
+ * <td>10193 ms</td><td>21988 ms</td><td><@cluster,@datacenter>{count}</td>
+ * </tr>
+ * <tr>
+ * <td>10010 ms</td><td>22547 ms</td><td><@cluster,@datacenter>{sum(numTotalMaps)}</td>
+ * </tr>
+ * <tr>
+ * <td>10334 ms</td><td>23433 ms</td><td><@cluster,@datacenter>{avg(numTotalMaps)}</td>
+ * </tr>
+ * <tr>
+ * <td>10045 ms</td><td>22690 ms</td><td><@cluster,@datacenter>{max(numTotalMaps)}</td>
+ * </tr>
+ * <tr>
+ * <td>10190 ms</td><td>21902 ms</td><td><@cluster,@datacenter>{min(numTotalMaps)}</td>
+ * </tr>
+ * </tbody>
  * </table>
  * <h2>Reference</h2>
  * <a href="https://blogs.apache.org/hbase/entry/coprocessor_introduction">
- * 	Coprocessor Introduction 
+ * Coprocessor Introduction
  * </a>
  * (Trend Micro Hadoop Group: Mingjie Lai, Eugene Koontz, Andrew Purtell)
- * 
+ *
  * <h2>TO-DO</h2>
  * <ol>
  * <li>
- *   TODO: Pass writable self-described entity definition into HBase coprocessor instead of serviceName in String
+ * TODO: Pass writable self-described entity definition into HBase coprocessor instead of serviceName in String
  *
- *   Because using serviceName to get entity definition will reply on entity API code under eagle-app, so that
- *   when modifying or creating new entities, we have to update coprocessor jar in HBase side
- *   (hchen9@xyz.com)
+ * Because using serviceName to get entity definition will reply on entity API code under eagle-app, so that
+ * when modifying or creating new entities, we have to update coprocessor jar in HBase side
+ * (hchen9@xyz.com)
  * </li>
  * <li>
- * 	 TODO: Using String.format instead substrings addition for long log to avoid recreating string objects
+ * TODO: Using String.format instead substrings addition for long log to avoid recreating string objects
  * </li>
  * </ol>
  *
  * </table>
- * @see eagle.query.aggregate.coprocessor.AggregateClient
- * @see eagle.query.aggregate.coprocessor.AggregateResult
- * @see eagle.query.aggregate.coprocessor.AggregateProtocol
- * 
- * @since   : 11/10/14,2014
+ *
+ * @since : 11/10/14,2014
  */
 package org.apache.eagle.storage.hbase.query.coprocessor;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/spi/HBaseStorageServiceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/spi/HBaseStorageServiceProvider.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/spi/HBaseStorageServiceProvider.java
index 1ee1c52..3f3f831 100644
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/spi/HBaseStorageServiceProvider.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/spi/HBaseStorageServiceProvider.java
@@ -20,11 +20,8 @@ import org.apache.eagle.storage.DataStorage;
 import org.apache.eagle.storage.hbase.HBaseStorage;
 import org.apache.eagle.storage.spi.DataStorageServiceProvider;
 
-/**
- * @since 3/20/15
- */
 public final class HBaseStorageServiceProvider implements DataStorageServiceProvider {
-    private final static String HBASE = "hbase";
+    private static final String HBASE = "hbase";
 
     @Override
     public String getType() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/tools/CoprocessorTool.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/tools/CoprocessorTool.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/tools/CoprocessorTool.java
new file mode 100644
index 0000000..c5aecab
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/tools/CoprocessorTool.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.storage.hbase.tools;
+
+import org.apache.eagle.storage.hbase.query.coprocessor.AggregateProtocolEndPoint;
+
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * Coprocessor CLI Tool.
+ */
+public class CoprocessorTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(CoprocessorTool.class);
+
+    public static void main(String[] args) throws Exception {
+        System.exit(ToolRunner.run(new CoprocessorTool(), args));
+    }
+
+    private void unregisterCoprocessor(String tableName) throws IOException {
+        Configuration configuration = getConf();
+        TableName table = TableName.valueOf(tableName);
+        try (HBaseAdmin admin = new HBaseAdmin(configuration)) {
+            HTableDescriptor tableDescriptor = admin.getTableDescriptor(table);
+            LOGGER.info("Table {} found", tableName);
+            if (tableDescriptor.hasCoprocessor(AggregateProtocolEndPoint.class.getName())) {
+                LOGGER.warn("No coprocessor was registered on table '{}'", tableName);
+                throw new IOException("No coprocessor was registered on table " + tableName);
+            } else {
+                tableDescriptor.removeCoprocessor(AggregateProtocolEndPoint.class.getName());
+                admin.modifyTable(table, tableDescriptor);
+                LOGGER.info("Succeed to remove coprocessor from table " + tableName);
+            }
+        }
+    }
+
+    private void registerCoprocessor(String jarPath, String tableName, String localJarPath) throws IOException {
+        Configuration configuration = getConf();
+        try (FileSystem fs = FileSystem.get(configuration); HBaseAdmin admin = new HBaseAdmin(configuration)) {
+            Path path = new Path(fs.getUri() + Path.SEPARATOR + jarPath);
+            LOGGER.info("Checking path {} ... ", path.toString());
+            if (!fs.exists(path)) {
+                LOGGER.info("Path: {} not exist, uploading jar ...", path.toString());
+                if (localJarPath == null) {
+                    throw new IOException("local jar path is not given, please manually upload coprocessor jar onto hdfs at " + jarPath
+                            + " and retry, or provide local coprocessor jar path through CLI argument and upload automatically");
+                }
+                LOGGER.info("Copying from local {} to {}", localJarPath, jarPath);
+                fs.copyFromLocalFile(new Path(localJarPath), path);
+                LOGGER.info("Succeed to copied coprocessor jar to {}", path.toString());
+            } else {
+                LOGGER.info("Path {} already exists", path.toString());
+            }
+            LOGGER.info("Checking hbase table {}", tableName);
+            TableName table = TableName.valueOf(tableName);
+            HTableDescriptor tableDescriptor = admin.getTableDescriptor(table);
+            LOGGER.info("Table {} found", tableName);
+            if (tableDescriptor.hasCoprocessor(AggregateProtocolEndPoint.class.getName())) {
+                LOGGER.warn("Table '" + tableName + "' already registered coprocessor: " + AggregateProtocolEndPoint.class.getName() + ", removing firstly");
+                tableDescriptor.removeCoprocessor(AggregateProtocolEndPoint.class.getName());
+                admin.modifyTable(table, tableDescriptor);
+                tableDescriptor = admin.getTableDescriptor(table);
+            }
+            tableDescriptor.addCoprocessor(AggregateProtocolEndPoint.class.getName(),
+                    path, Coprocessor.PRIORITY_USER, new HashMap<>());
+            admin.modifyTable(table, tableDescriptor);
+            LOGGER.info("Succeed to enable coprocessor on table " + tableName);
+        }
+    }
+
+    private void printHelpMessage(Options cmdOptions) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("java " + CoprocessorTool.class.getName() + " [--register/--unregister] [OPTIONS]", cmdOptions);
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options cmdOptions = new Options();
+        cmdOptions.addOption(new Option("register", false, "Register coprocessor"));
+        cmdOptions.addOption(new Option("unregister", false, "Unregister coprocessor"));
+
+        cmdOptions.addOption("table", true, "HBase table name, separated with comma, for example, table1,table2,..");
+        cmdOptions.addOption("jar", true, "Coprocessor target jar path");
+        cmdOptions.addOption("localJar", true, "Coprocessor local source jar path");
+        cmdOptions.addOption("config", true, "Configuration file");
+
+        cmdOptions.getOption("table").setType(String.class);
+        cmdOptions.getOption("table").setRequired(true);
+        cmdOptions.getOption("jar").setType(String.class);
+        cmdOptions.getOption("jar").setRequired(false);
+        cmdOptions.getOption("localJar").setType(String.class);
+        cmdOptions.getOption("localJar").setRequired(false);
+        cmdOptions.getOption("config").setType(String.class);
+        cmdOptions.getOption("config").setRequired(false);
+
+        GnuParser parser = new GnuParser();
+        CommandLine cmdCli = parser.parse(cmdOptions, args);
+        String tableName = cmdCli.getOptionValue("table");
+        String configFile = cmdCli.getOptionValue("config");
+
+        if (configFile != null) {
+            Configuration.addDefaultResource(configFile);
+        }
+
+        if (cmdCli.hasOption("register")) {
+            if (args.length < 3) {
+                System.err.println("Error: coprocessor jar path is missing");
+                System.err.println("Usage: java " + CoprocessorTool.class.getName() + " enable " + tableName + " [jarOnHdfs] [jarOnLocal]");
+                return 1;
+            }
+            String jarPath = cmdCli.getOptionValue("jar");
+            LOGGER.info("Table name: {}", tableName);
+            LOGGER.info("Coprocessor jar on hdfs: {}", jarPath);
+            String localJarPath = cmdCli.getOptionValue("localJar");
+            LOGGER.info("Coprocessor jar on local: {}", localJarPath);
+
+            String[] tableNames = tableName.split(",\\s*");
+            for (String table : tableNames) {
+                LOGGER.info("Registering coprocessor for table {}", table);
+                registerCoprocessor(jarPath, table, localJarPath);
+            }
+        } else if (cmdCli.hasOption("unregister")) {
+            unregisterCoprocessor(tableName);
+        } else {
+            System.err.println("command is required, --register/--unregister");
+            printHelpMessage(cmdOptions);
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/protobuf/AggregateProtocol.proto
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/protobuf/AggregateProtocol.proto b/eagle-core/eagle-query/eagle-storage-hbase/src/main/protobuf/AggregateProtocol.proto
index c3385a1..da5846c 100644
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/protobuf/AggregateProtocol.proto
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/protobuf/AggregateProtocol.proto
@@ -27,7 +27,7 @@ option optimize_for = SPEED;
  */
 import "Client.proto";
 
-//
+
 //message ScanWrapper{
 //    required bytes byte_array = 1;
 //}
@@ -43,7 +43,7 @@ message AggregateResult {
 message AggregateRequest {
     required EntityDefinition entity_definition = 1;
     required Scan scan = 2;
-    repeated string groupby_fields= 3;
+    repeated string groupby_fields = 3;
     repeated bytes aggregate_func_types = 4;
     repeated string aggregated_fields = 5;
 }
@@ -51,7 +51,7 @@ message AggregateRequest {
 message TimeSeriesAggregateRequest {
     required EntityDefinition entity_definition = 1;
     required Scan scan = 2;
-    repeated string groupby_fields= 3;
+    repeated string groupby_fields = 3;
     repeated bytes aggregate_func_types = 4;
     repeated string aggregated_fields = 5;
     required int64 start_time = 6;
@@ -60,6 +60,6 @@ message TimeSeriesAggregateRequest {
 }
 
 service AggregateProtocol {
-    rpc aggregate(AggregateRequest) returns (AggregateResult);
-    rpc timeseriesAggregate(TimeSeriesAggregateRequest) returns (AggregateResult);
+    rpc aggregate (AggregateRequest) returns (AggregateResult);
+    rpc timeseriesAggregate (TimeSeriesAggregateRequest) returns (AggregateResult);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStatement.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStatement.java b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStatement.java
index 079cf12..6dcbc78 100644
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStatement.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStatement.java
@@ -67,18 +67,18 @@ public class TestHBaseStatement extends TestHBaseBase {
             {
                 put("cluster", "test");
                 put("datacenter", "test");
-                put("name","unit.test.name");
+                put("name", "unit.test.name");
             }
         });
 
         entities.add(entity);
 
-        CreateStatement createStatement = new CreateStatement(entities,"TestTimeSeriesAPIEntity");
+        CreateStatement createStatement = new CreateStatement(entities, "TestTimeSeriesAPIEntity");
         ModifyResult resultSet = createStatement.execute(DataStorageManager.newDataStorage("hbase"));
 
         Assert.assertEquals(1, resultSet.getIdentifiers().size());
 
-        createStatement = new CreateStatement(entities,"TestTimeSeriesAPIEntity");
+        createStatement = new CreateStatement(entities, "TestTimeSeriesAPIEntity");
         resultSet = createStatement.execute(DataStorageManager.newDataStorage("hbase"));
 
         Assert.assertEquals(1, resultSet.getIdentifiers().size());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStorage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStorage.java b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStorage.java
deleted file mode 100644
index 76cc507..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStorage.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.hbase;
-
-import org.junit.Test;
-
-/**
- * @since 3/23/15
- */
-public class TestHBaseStorage {
-    @Test
-    public void testCreate(){
-
-    }
-
-    @Test
-    public void testQuery(){
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestAggregateResultCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestAggregateResultCallback.java b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestAggregateResultCallback.java
index ba1b781..df5dcd9 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestAggregateResultCallback.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestAggregateResultCallback.java
@@ -16,26 +16,24 @@
  */
 package org.apache.eagle.storage.hbase.aggregate.coprocessor;
 
+import org.apache.eagle.common.ByteUtil;
 import org.apache.eagle.query.aggregate.AggregateFunctionType;
-import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult;
-import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResultCallback;
-import org.apache.eagle.storage.hbase.query.coprocessor.impl.AggregateResultCallbackImpl;
 import org.apache.eagle.query.aggregate.raw.GroupbyKey;
 import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
 import org.apache.eagle.query.aggregate.raw.GroupbyValue;
-import org.apache.eagle.common.ByteUtil;
+import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult;
+import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResultCallback;
+import org.apache.eagle.storage.hbase.query.coprocessor.impl.AggregateResultCallbackImpl;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-@Ignore
 public class TestAggregateResultCallback {
     @Test
-    public void testUpdate(){
+    public void testUpdate() {
         // -----------------------------------------------------------------------------
         // key      |       max      min        count       avg         sum      | count
         // -----------------------------------------------------------------------------
@@ -49,89 +47,88 @@ public class TestAggregateResultCallback {
         // -----------------------------------------------------------------------------
 
         AggregateResultCallback callback = new AggregateResultCallbackImpl(Arrays.asList(
-                        AggregateFunctionType.max,
-                        AggregateFunctionType.min,
-                        AggregateFunctionType.count,
-                        AggregateFunctionType.avg,
-                        AggregateFunctionType.sum));
+                AggregateFunctionType.max,
+                AggregateFunctionType.min,
+                AggregateFunctionType.count,
+                AggregateFunctionType.avg,
+                AggregateFunctionType.sum));
         AggregateResult result1 = AggregateResult.build(
                 Arrays.asList(
-                    new String[]{"a","b"},
-                    new String[]{"a","b"},
-                    new String[]{"a","b","c"},
-                    new String[]{"a","b","c"}
+                        new String[]{"a", "b"},
+                        new String[]{"a", "b"},
+                        new String[]{"a", "b", "c"},
+                        new String[]{"a", "b", "c"}
                 ),
                 Arrays.asList(
-                    new double[]{1.0,2.0,3.0,4.0,5.0},
-                    new double[]{2.0,3.0,6.0,5.0,6.0},
-                    new double[]{3.0,3.0,5.0,5.0,6.0},
-                    new double[]{4.0,5.0,5.0,5.0,7.0}
+                        new double[]{1.0, 2.0, 3.0, 4.0, 5.0},
+                        new double[]{2.0, 3.0, 6.0, 5.0, 6.0},
+                        new double[]{3.0, 3.0, 5.0, 5.0, 6.0},
+                        new double[]{4.0, 5.0, 5.0, 5.0, 7.0}
                 ),
-                Arrays.asList(3,6,5,5),
+                Arrays.asList(3, 6, 5, 5),
                 System.currentTimeMillis(),
                 System.currentTimeMillis()
         );
-        callback.update(null,null,result1);
+        callback.update(null, null, result1);
         AggregateResult callbackResult = callback.result();
-        Assert.assertEquals(2,callbackResult.getKeyValues().size());
+        Assert.assertEquals(2, callbackResult.getKeyValues().size());
 
         // == ROW-#0 ==
         // Should be:
         // key      |       max      min        count       avg         sum      | count
         // -----------------------------------------------------------------------------
-        // a,b,c    |       4        3          10          1           13       | 10
+        // a,b      |       2        2          9           1           11       | 9
         GroupbyKeyValue row0 = callbackResult.getKeyValues().get(0);
-//        Assert.assertEquals("a",new String(row0.getKey().getValue().get(0).copyBytes()));
-//        Assert.assertEquals("b",new String(row0.getKey().getValue().get(1).copyBytes()));
-        Assert.assertEquals(new GroupbyKey(Arrays.asList("a".getBytes(),"b".getBytes(),"c".getBytes())),row0.getKey());
-        Assert.assertEquals(4.0,row0.getValue().get(0).get(), 0.00001);
-        Assert.assertEquals(10, ByteUtil.bytesToInt(row0.getValue().getMeta(0).getBytes()));
-        Assert.assertEquals(3.0, row0.getValue().get(1).get(), 0.00001);
-        Assert.assertEquals(10, ByteUtil.bytesToInt(row0.getValue().getMeta(1).getBytes()));
-        Assert.assertEquals(10.0,row0.getValue().get(2).get(), 0.00001);
-        Assert.assertEquals(10, ByteUtil.bytesToInt(row0.getValue().getMeta(2).getBytes()));
-        Assert.assertEquals(1.0,row0.getValue().get(3).get(), 0.00001);
-        Assert.assertEquals(10, ByteUtil.bytesToInt(row0.getValue().getMeta(3).getBytes()));
-        Assert.assertEquals(13.0,row0.getValue().get(4).get(), 0.00001);
-        Assert.assertEquals(10, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
+        Assert.assertEquals(new GroupbyKey(Arrays.asList("a".getBytes(), "b".getBytes())), row0.getKey());
+        Assert.assertEquals(2.0, row0.getValue().get(0).get(), 0.00001);
+        Assert.assertEquals(9, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
+        Assert.assertEquals(2.0, row0.getValue().get(1).get(), 0.00001);
+        Assert.assertEquals(9, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
+        Assert.assertEquals(9.0, row0.getValue().get(2).get(), 0.00001);
+        Assert.assertEquals(9, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
+        Assert.assertEquals(1.0, row0.getValue().get(3).get(), 0.00001);
+        Assert.assertEquals(9, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
+        Assert.assertEquals(11.0, row0.getValue().get(4).get(), 0.00001);
+        Assert.assertEquals(9, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
 
         // == ROW-#1 ==
         // Should be:
         // key      |       max      min        count       avg         sum      | count
         // -----------------------------------------------------------------------------
-        // a,b      |       2        2          9           1           11       | 9
+        // a,b,c    |       4        3          10          1           13       | 10
         GroupbyKeyValue row1 = callbackResult.getKeyValues().get(1);
-        Assert.assertEquals(new GroupbyKey(Arrays.asList("a".getBytes(),"b".getBytes())),row1.getKey());
-        Assert.assertEquals(2.0,row1.getValue().get(0).get(), 0.00001);
-        Assert.assertEquals(9, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
-        Assert.assertEquals(2.0, row1.getValue().get(1).get(), 0.00001);
-        Assert.assertEquals(9, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
-        Assert.assertEquals(9.0,row1.getValue().get(2).get(), 0.00001);
-        Assert.assertEquals(9, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
-        Assert.assertEquals(1.0,row1.getValue().get(3).get(), 0.00001);
-        Assert.assertEquals(9, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
-        Assert.assertEquals(11.0,row1.getValue().get(4).get(), 0.00001);
-        Assert.assertEquals(9, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
+        Assert.assertEquals(new GroupbyKey(Arrays.asList("a".getBytes(), "b".getBytes(), "c".getBytes())), row1.getKey());
+        Assert.assertEquals(4.0, row1.getValue().get(0).get(), 0.00001);
+        Assert.assertEquals(10, ByteUtil.bytesToInt(row1.getValue().getMeta(0).getBytes()));
+        Assert.assertEquals(3.0, row1.getValue().get(1).get(), 0.00001);
+        Assert.assertEquals(10, ByteUtil.bytesToInt(row1.getValue().getMeta(1).getBytes()));
+        Assert.assertEquals(10.0, row1.getValue().get(2).get(), 0.00001);
+        Assert.assertEquals(10, ByteUtil.bytesToInt(row1.getValue().getMeta(2).getBytes()));
+        Assert.assertEquals(1.0, row1.getValue().get(3).get(), 0.00001);
+        Assert.assertEquals(10, ByteUtil.bytesToInt(row1.getValue().getMeta(3).getBytes()));
+        Assert.assertEquals(13.0, row1.getValue().get(4).get(), 0.00001);
+        Assert.assertEquals(10, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
+
     }
 
     @Test
-    public void testAggregateResultTimestamp(){
+    public void testAggregateResultTimestamp() {
         AggregateResult result1 = new AggregateResult();
-        result1.setStartTimestamp(2l);
-        result1.setStopTimestamp(4l);
+        result1.setStartTimestamp(2L);
+        result1.setStopTimestamp(4L);
         AggregateResult result2 = new AggregateResult();
-        result2.setStartTimestamp(1l);
-        result2.setStopTimestamp(3l);
-        AggregateResultCallback  callback = new AggregateResultCallbackImpl(new ArrayList<AggregateFunctionType>());
-        callback.update(null,null,result1);
-        callback.update(null,null,result2);
+        result2.setStartTimestamp(1L);
+        result2.setStopTimestamp(3L);
+        AggregateResultCallback callback = new AggregateResultCallbackImpl(new ArrayList<AggregateFunctionType>());
+        callback.update(null, null, result1);
+        callback.update(null, null, result2);
         AggregateResult result3 = callback.result();
-        Assert.assertEquals(1l,result3.getStartTimestamp());
-        Assert.assertEquals(4l,result3.getStopTimestamp());
+        Assert.assertEquals(1L, result3.getStartTimestamp());
+        Assert.assertEquals(4L, result3.getStopTimestamp());
     }
 
     @Test
-    public void testUpdatePerformance(){
+    public void testUpdatePerformance() {
         AggregateResultCallback callback = new AggregateResultCallbackImpl(
                 Arrays.asList(
                         AggregateFunctionType.max,
@@ -139,10 +136,10 @@ public class TestAggregateResultCallback {
                         AggregateFunctionType.count,
                         AggregateFunctionType.avg));
 
-        for(int i=0;i<1000000;i++) {
+        for (int i = 0; i < 1000000; i++) {
             AggregateResult result1 = new AggregateResult();
             result1.setStartTimestamp(System.currentTimeMillis());
-            List<GroupbyKeyValue> keyValues = new ArrayList<GroupbyKeyValue>();
+            final List<GroupbyKeyValue> keyValues = new ArrayList<GroupbyKeyValue>();
 
             // <a,b> - <1*3, 2*3, 3*3, 4*3>
             GroupbyKey key = new GroupbyKey();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestGroupAggregateClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestGroupAggregateClient.java b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestGroupAggregateClient.java
index 2a9ed0d..86c08fd 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestGroupAggregateClient.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestGroupAggregateClient.java
@@ -31,10 +31,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DoubleWritable;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,201 +52,201 @@ import org.apache.eagle.service.hbase.TestHBaseBase;
  */
 @Ignore
 public class TestGroupAggregateClient extends TestHBaseBase {
-	HTableInterface table;
-	long startTime;
-	long endTime;
-	List<String> rowkeys;
-	AggregateClient client;
-	Scan scan;
-	int num = 200;
-
-	private final static Logger LOG = LoggerFactory.getLogger(TestGroupAggregateClient.class);
-
-	@Before
-	public void setUp(){
-		hbase.createTable("unittest", "f");
-		startTime = System.currentTimeMillis();
-		try {
-			rowkeys = prepareData(num);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-		endTime = System.currentTimeMillis();
-		table = EagleConfigFactory.load().getHTable("unittest");
-		client = new AggregateClientImpl();
-		scan = new Scan();
-		scan.setCaching(200);
-		
-		ListQueryCompiler compiler = null;
-		try {
-			compiler = new ListQueryCompiler("TestLogAPIEntity[@cluster=\"test4UT\" and @datacenter=\"dc1\"]{@field1,@field2}");
-		} catch (Exception e) {
-			Assert.fail(e.getMessage());
-		}
-		scan.setFilter(compiler.filter());
-	}
-	
-	@After
-	public void shutdown(){
-		try {
-			hbase.deleteTable("unittest");
-			new HTableFactory().releaseHTableInterface(table);
-		} catch (IOException e) {
-			LOG.error(e.getMessage(),e);
-		}
-	}
-	
-	private List<String> prepareData(int count) throws Exception {
-		List<TaggedLogAPIEntity> list = new ArrayList<TaggedLogAPIEntity>();
-		EntityDefinition ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
-
-		if (ed == null) {
-			EntityDefinitionManager.registerEntity(TestLogAPIEntity.class);
-			ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
-		}
-		ed.setTimeSeries(true);
-		for(int i=0;i<count;i++){
-			TestLogAPIEntity e = new TestLogAPIEntity();
-			e.setTimestamp(System.currentTimeMillis());
-			e.setField1(1);
-			e.setField2(2);
-			e.setField3(3);
-			e.setField4(4L);
-			e.setField5(5.0);
-			e.setField6(5.0);
-			e.setField7("7");
-			e.setTags(new HashMap<String, String>());
-			e.getTags().put("cluster", "test4UT");
-			e.getTags().put("datacenter", "dc1");
-			e.getTags().put("index", ""+i);
-			e.getTags().put("jobId", "job_"+System.currentTimeMillis());
-			list.add(e);
-
-		}
-		GenericEntityWriter writer = new GenericEntityWriter(ed.getService());
-		LOG.info("Writing "+list.size()+" TestLogAPIEntity entities");
-		List<String> result = writer.write(list);
-		LOG.info("Finish writing test entities");
-		return result;
-	}
-
-	//@Test
-	public void testGroupAggregateCountClient(){
-		try {
-			EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
-			List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),Arrays.asList(AggregateFunctionType.count),Arrays.asList("field2")).getKeyValues();
-			if(LOG.isDebugEnabled()) LOG.debug("COUNT");
-			logGroupbyKeyValue(result);
-			Assert.assertNotNull(result);
-			Assert.assertTrue(result.size()>0);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	//@Test
-	public void testGroupAggregateAvgClient(){
-		try {
-			EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
-			List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),Arrays.asList(AggregateFunctionType.avg),Arrays.asList("field2")).getKeyValues();
-			if(LOG.isDebugEnabled()) LOG.debug("AVG");
-			logGroupbyKeyValue(result);
-			Assert.assertNotNull(result);
-			Assert.assertTrue(result.size()>0);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	//@Test
-	public void testGroupAggregateMaxClient(){
-		try {
-			EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
-			List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),Arrays.asList(AggregateFunctionType.max),Arrays.asList("field1")).getKeyValues();
-			if(LOG.isDebugEnabled()) LOG.debug("MAX");
-			logGroupbyKeyValue(result);
-			Assert.assertNotNull(result);
-			Assert.assertTrue(result.size()>0);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	//@Test
-	public void testGroupAggregateSumClient(){
-		try {
-			EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
-			List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),Arrays.asList(AggregateFunctionType.sum),Arrays.asList("field2")).getKeyValues();
-			if(LOG.isDebugEnabled()) LOG.debug("MAX");
-			logGroupbyKeyValue(result);
-			Assert.assertNotNull(result);
-			Assert.assertTrue(result.size()>0);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	//@Test
-	public void testGroupAggregateMinClient(){
-
-		try {
-			EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
-			List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),Arrays.asList(AggregateFunctionType.min),Arrays.asList("field2")).getKeyValues();
-			if(LOG.isDebugEnabled()) LOG.debug("MIN");
-			logGroupbyKeyValue(result);
-			Assert.assertNotNull(result);
-			Assert.assertTrue(result.size()>0);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	//@Test
-	public void testGroupAggregateMultipleClient(){
-		try {
-			EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
-			List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),
-					Arrays.asList(AggregateFunctionType.min,
-							AggregateFunctionType.max,
-							AggregateFunctionType.avg,
-							AggregateFunctionType.count,
-							AggregateFunctionType.sum),
-					Arrays.asList("field2","field2","field2","field2","field2")).getKeyValues();
-			logGroupbyKeyValue(result);
-			Assert.assertNotNull(result);
-			Assert.assertTrue(result.size() > 0);
-			Assert.assertEquals("test4UT", new String(result.get(0).getKey().getValue().get(0).copyBytes()));
-			Assert.assertEquals("dc1", new String(result.get(0).getKey().getValue().get(1).copyBytes()));
-			Assert.assertEquals(2.0, result.get(0).getValue().get(0).get(), 0.00001);
-			Assert.assertEquals(2.0, result.get(0).getValue().get(1).get(), 0.00001);
-			Assert.assertEquals(2.0, result.get(0).getValue().get(2).get(), 0.00001);
-			Assert.assertTrue(num <= result.get(0).getValue().get(3).get());
-			Assert.assertTrue(2.0 * num <= result.get(0).getValue().get(4).get());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	private void logGroupbyKeyValue(List<GroupbyKeyValue> keyValues){
-		for(GroupbyKeyValue keyValue:keyValues){
-			GroupbyKey key = keyValue.getKey();
-			List<String> keys = new ArrayList<String>();
-			for(BytesWritable bytes:key.getValue()){
-				keys.add(new String(bytes.copyBytes()));
-			}
-			List<Double> vals = new ArrayList<Double>();
-			GroupbyValue val = keyValue.getValue();
-			for(DoubleWritable dw:val.getValue()){
-				vals.add(dw.get());
-			}
-			if(LOG.isDebugEnabled()) LOG.debug("KEY: "+keys+", VALUE: "+vals);
-		}
-	}
+    HTableInterface table;
+    long startTime;
+    long endTime;
+    List<String> rowkeys;
+    AggregateClient client;
+    Scan scan;
+    int num = 200;
+
+    private final static Logger LOG = LoggerFactory.getLogger(TestGroupAggregateClient.class);
+
+    @Before
+    public void setUp() {
+        hbase.createTable("unittest", "f");
+        startTime = System.currentTimeMillis();
+        try {
+            rowkeys = prepareData(num);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+        endTime = System.currentTimeMillis();
+        table = EagleConfigFactory.load().getHTable("unittest");
+        client = new AggregateClientImpl();
+        scan = new Scan();
+        scan.setCaching(200);
+
+        ListQueryCompiler compiler = null;
+        try {
+            compiler = new ListQueryCompiler("TestLogAPIEntity[@cluster=\"test4UT\" and @datacenter=\"dc1\"]{@field1,@field2}");
+        } catch (Exception e) {
+            Assert.fail(e.getMessage());
+        }
+        scan.setFilter(compiler.filter());
+    }
+
+    @After
+    public void shutdown() {
+        try {
+            hbase.deleteTable("unittest");
+            new HTableFactory().releaseHTableInterface(table);
+        } catch (IOException e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    private List<String> prepareData(int count) throws Exception {
+        List<TaggedLogAPIEntity> list = new ArrayList<TaggedLogAPIEntity>();
+        EntityDefinition ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
+
+        if (ed == null) {
+            EntityDefinitionManager.registerEntity(TestLogAPIEntity.class);
+            ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
+        }
+        ed.setTimeSeries(true);
+        for (int i = 0; i < count; i++) {
+            TestLogAPIEntity e = new TestLogAPIEntity();
+            e.setTimestamp(System.currentTimeMillis());
+            e.setField1(1);
+            e.setField2(2);
+            e.setField3(3);
+            e.setField4(4L);
+            e.setField5(5.0);
+            e.setField6(5.0);
+            e.setField7("7");
+            e.setTags(new HashMap<String, String>());
+            e.getTags().put("cluster", "test4UT");
+            e.getTags().put("datacenter", "dc1");
+            e.getTags().put("index", "" + i);
+            e.getTags().put("jobId", "job_" + System.currentTimeMillis());
+            list.add(e);
+
+        }
+        GenericEntityWriter writer = new GenericEntityWriter(ed.getService());
+        LOG.info("Writing " + list.size() + " TestLogAPIEntity entities");
+        List<String> result = writer.write(list);
+        LOG.info("Finish writing test entities");
+        return result;
+    }
+
+    @Test
+    public void testGroupAggregateCountClient() {
+        try {
+            EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+            List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.count), Arrays.asList("field2")).getKeyValues();
+            if (LOG.isDebugEnabled()) LOG.debug("COUNT");
+            logGroupbyKeyValue(result);
+            Assert.assertNotNull(result);
+            Assert.assertTrue(result.size() > 0);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testGroupAggregateAvgClient() {
+        try {
+            EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+            List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.avg), Arrays.asList("field2")).getKeyValues();
+            if (LOG.isDebugEnabled()) LOG.debug("AVG");
+            logGroupbyKeyValue(result);
+            Assert.assertNotNull(result);
+            Assert.assertTrue(result.size() > 0);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testGroupAggregateMaxClient() {
+        try {
+            EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+            List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.max), Arrays.asList("field1")).getKeyValues();
+            if (LOG.isDebugEnabled()) LOG.debug("MAX");
+            logGroupbyKeyValue(result);
+            Assert.assertNotNull(result);
+            Assert.assertTrue(result.size() > 0);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testGroupAggregateSumClient() {
+        try {
+            EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+            List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("field2")).getKeyValues();
+            if (LOG.isDebugEnabled()) LOG.debug("MAX");
+            logGroupbyKeyValue(result);
+            Assert.assertNotNull(result);
+            Assert.assertTrue(result.size() > 0);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testGroupAggregateMinClient() {
+
+        try {
+            EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+            List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.min), Arrays.asList("field2")).getKeyValues();
+            if (LOG.isDebugEnabled()) LOG.debug("MIN");
+            logGroupbyKeyValue(result);
+            Assert.assertNotNull(result);
+            Assert.assertTrue(result.size() > 0);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testGroupAggregateMultipleClient() {
+        try {
+            EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+            List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"),
+                    Arrays.asList(AggregateFunctionType.min,
+                            AggregateFunctionType.max,
+                            AggregateFunctionType.avg,
+                            AggregateFunctionType.count,
+                            AggregateFunctionType.sum),
+                    Arrays.asList("field2", "field2", "field2", "field2", "field2")).getKeyValues();
+            logGroupbyKeyValue(result);
+            Assert.assertNotNull(result);
+            Assert.assertTrue(result.size() > 0);
+            Assert.assertEquals("test4UT", new String(result.get(0).getKey().getValue().get(0).copyBytes()));
+            Assert.assertEquals("dc1", new String(result.get(0).getKey().getValue().get(1).copyBytes()));
+            Assert.assertEquals(2.0, result.get(0).getValue().get(0).get(), 0.00001);
+            Assert.assertEquals(2.0, result.get(0).getValue().get(1).get(), 0.00001);
+            Assert.assertEquals(2.0, result.get(0).getValue().get(2).get(), 0.00001);
+            Assert.assertTrue(num <= result.get(0).getValue().get(3).get());
+            Assert.assertTrue(2.0 * num <= result.get(0).getValue().get(4).get());
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    private void logGroupbyKeyValue(List<GroupbyKeyValue> keyValues) {
+        for (GroupbyKeyValue keyValue : keyValues) {
+            GroupbyKey key = keyValue.getKey();
+            List<String> keys = new ArrayList<String>();
+            for (BytesWritable bytes : key.getValue()) {
+                keys.add(new String(bytes.copyBytes()));
+            }
+            List<Double> vals = new ArrayList<Double>();
+            GroupbyValue val = keyValue.getValue();
+            for (DoubleWritable dw : val.getValue()) {
+                vals.add(dw.get());
+            }
+            if (LOG.isDebugEnabled()) LOG.debug("KEY: " + keys + ", VALUE: " + vals);
+        }
+    }
 }
\ No newline at end of file


Mime
View raw message