eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [09/51] [partial] incubator-eagle git commit: EAGLE-184 Migrate eagle website from https://github.com/eaglemonitoring/eaglemonitoring.github.io to document branch
Date Thu, 03 Mar 2016 18:09:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateClient.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateClient.java
deleted file mode 100755
index cd6a5b9..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateClient.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.hbase.query.coprocessor;
-
-import org.apache.eagle.log.entity.meta.EntityDefinition;
-import org.apache.eagle.query.aggregate.AggregateFunctionType;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Scan;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Coprocessor-based Aggregation Universal Client Interface
- *
- * <h2>Flat or RAW Aggregation:</h2>
- * <pre>
- * AggregateResult aggregate( HTableInterface table, String serviceName, Scan scan, List<String> groupbyFields, List<AggregateFunctionType> aggregateFuncTypes, List<String> aggregatedFields) throws IOException
- * </pre>
- *
- * <h2>Time Series Aggregation:</h2>
- * <pre>
- * AggregateResult aggregate(HTableInterface table, String serviceName, Scan scan, List<String> groupbyFields, List<AggregateFunctionType> aggregateFuncTypes, List<String> aggregatedFields, boolean timeSeries, long startTime, long endTime, long intervalMin) throws IOException
- * </pre>
- * @since : 11/3/14,2014
- */
-public interface AggregateClient
-{
-
-	/**
-	 * Flat Aggregation
-	 *
-	 *
-	 * @param table                   HTable connections
-	 * @param scan                    HBase Scan
-	 * @param groupbyFields           Grouped by fields name
-	 * @param aggregateFuncTypes      Aggregate function types
-	 * @param aggregatedFields        Aggregate field names
-	 * @return                        Return AggregateResult
-	 * @throws Exception
-	 */
-	AggregateResult aggregate(final HTableInterface table,                            // HTable connections
-                              final EntityDefinition entityDefinition,                               // Eagle service name
-                              final Scan scan,                                        // HBase Scan
-                              final List<String> groupbyFields,                       // Grouped by fields name
-                              final List<AggregateFunctionType> aggregateFuncTypes,   // Aggregate function types
-                              final List<String> aggregatedFields                     // Aggregate field names
-    ) throws IOException;
-
-	/**
-	 * Time Series Aggregation
-	 *
-	 * @param table                   HTable connections
-	 * @param entityDefinition        Eagle EntityDefinition
-	 * @param scan                    HBase Scan
-	 * @param groupbyFields           Grouped by fields name
-	 * @param aggregateFuncTypes      Aggregate function types
-	 * @param aggregatedFields        Aggregate field names
-	 * @param timeSeries              Is time series aggregations?
-	 * @param intervalMin             The interval in minutes if it's time series aggregation
-	 * @return                        Return AggregateResult
-	 * @throws Exception
-	 */
-	AggregateResult aggregate(final HTableInterface table,                                // HTable connections
-                              final EntityDefinition entityDefinition,                               // Eagle service name
-                              final Scan scan,                                        // HBase Scan
-                              final List<String> groupbyFields,                       // Grouped by fields name
-                              final List<AggregateFunctionType> aggregateFuncTypes,   // Aggregate function types
-                              final List<String> aggregatedFields,                    // Aggregate field names
-                              final boolean timeSeries,                               // Is time series aggregations?
-                              final long startTime,                                   // startTime
-                              final long endTime,                                     // endTime
-                              final long intervalMin                                   // The interval in minutes if it's time series aggregation
-    ) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocol.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocol.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocol.java
deleted file mode 100755
index fe2c414..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocol.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.hbase.query.coprocessor;
-
-import org.apache.eagle.log.entity.meta.EntityDefinition;
-import org.apache.hadoop.hbase.client.Scan;
-//import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
-
-import java.io.IOException;
-import java.util.List;
-
-
-/**
- * TODO: replace with google-protobuf RPC when HBase is upgraded to version 0.96+
- */
-public interface AggregateProtocol {
-
-	/**
-	 *
-	 *
-	 * @param entityDefinition
-	 * @param scan
-	 * @param groupbyFields
-	 * @param aggregateFuncTypes
-	 * @param aggregatedFields
-	 * @return AggregateResult
-	 * @throws java.io.IOException
-	 */
-	AggregateResult aggregate(EntityDefinition entityDefinition,
-                              Scan scan,
-                              List<String> groupbyFields,
-                              List<byte[]> aggregateFuncTypes,
-                              List<String> aggregatedFields) throws IOException;
-
-	/**
-	 *
-	 * @param entityDefinition
-	 * @param scan
-	 * @param groupbyFields
-	 * @param aggregateFuncTypes
-	 * @param aggregatedFields
-	 * @param intervalMin
-	 * @return AggregateResult
-	 * @throws java.io.IOException
-	 */
-	AggregateResult aggregate(EntityDefinition entityDefinition,
-                              Scan scan,
-                              List<String> groupbyFields,
-                              List<byte[]> aggregateFuncTypes,
-                              List<String> aggregatedFields,
-                              long startTime,
-                              long endTime,
-                              long intervalMin) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
deleted file mode 100755
index 53d27de..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.hbase.query.coprocessor;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.*;
-import org.apache.eagle.log.entity.meta.EntityDefinition;
-import org.apache.eagle.log.entity.meta.Qualifier;
-import org.apache.eagle.query.aggregate.AggregateFunctionType;
-import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
-import org.apache.eagle.query.aggregate.raw.RawAggregator;
-import org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator;
-import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
-import org.apache.eagle.common.DateTimeUtil;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-//public abstract class AbstractAggregateEndPoint extends BaseEndpointCoprocessor{
-public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol implements AggregateProtocol, Coprocessor, CoprocessorService {
-
-    private RegionCoprocessorEnvironment env;
-
-    @Override
-    public Service getService() {
-        return this;
-    }
-
-    public AggregateProtocolEndPoint() {}
-
-    protected void checkNotNull(Object obj,String name) {
-		if(obj==null) throw new NullPointerException(name+" is null");
-	}
-
-    @Override
-    public void start(CoprocessorEnvironment env) throws IOException {
-        if (env instanceof RegionCoprocessorEnvironment) {
-            this.env = (RegionCoprocessorEnvironment)env;
-        } else {
-            throw new CoprocessorException("Must be loaded on a table region!");
-        }
-    }
-
-    @Override
-    public void stop(CoprocessorEnvironment env) throws IOException {
-        // do nothing
-    }
-
-//    @Override
-//	public ProtocolSignature getProtocolSignature(String protocol, long version, int clientMethodsHashCode) throws IOException {
-//		if (AggregateProtocol.class.getName().equals(protocol)) {
-////			return new ProtocolSignature(AggregateProtocol.VERSION, null);
-//			return new ProtocolSignature(98l, null);
-//		}
-//		throw new IOException("Unknown protocol: " + protocol);
-//	}
-
-	protected HRegion getCurrentRegion(){
-		return this.env.getRegion();
-	}
-
-	/**
-	 * <pre>
-	 * Region-unittest,\x82\xB4\x85\xC2\x7F\xFF\xFE\xB6\xC9jNG\xEE!\x5C3\xBB\xAE\xA1:\x05\xA5\xA9x\xB0\xA1"8\x05\xFB(\xD2VY\xDB\x9A\x06\x09\xA9\x98\xC2\xE3\x8D=,1413960230654.aaf2a6c9f2c87c196f43497243bb2424.
-	 * RegionID-unittest,1413960230654
-	 * </pre>
-	 */
-	protected String getLogHeader(){
-		HRegion region = this.getCurrentRegion();
-		return LOG.isDebugEnabled() ? String.format("Region-%s",region.getRegionNameAsString()):
-				String.format("Region-%s,%d",region.getTableDesc().getNameAsString(),region.getRegionId());
-	}
-
-	protected class InternalReadReport {
-		public InternalReadReport(long counter,long startTimestamp,long stopTimestamp){
-			this.counter = counter;
-			this.startTimestamp = startTimestamp;
-			this.stopTimestamp = stopTimestamp;
-		}
-		public long getCounter() {
-			return counter;
-		}
-		public void setCounter(long counter) {
-			this.counter = counter;
-		}
-
-		public long getStartTimestamp() {
-			return startTimestamp;
-		}
-
-		public void setStartTimestamp(long startTimestamp) {
-			this.startTimestamp = startTimestamp;
-		}
-
-		public long getStopTimestamp() {
-			return stopTimestamp;
-		}
-
-		public void setStopTimestamp(long stopTimestamp) {
-			this.stopTimestamp = stopTimestamp;
-		}
-
-		private long counter;
-		private long startTimestamp;
-		private long stopTimestamp;
-	}
-
-	/**
-	 * Asynchronous HBase scan read as entity
-	 *
-	 * @param scan
-	 * @throws java.io.IOException
-	 */
-	protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, EntityCreationListener listener) throws IOException {
-//		_init();
-		long counter = 0;
-		long startTimestamp = 0;
-		long stopTimestamp = 0;
-
-		InternalScanner scanner = this.getCurrentRegion().getScanner(scan);
-		List<Cell> results = new ArrayList<Cell>();
-		try{
-			boolean hasMoreRows;
-			GenericMetricShadowEntity singleMetricEntity = null;
-			do{
-				hasMoreRows = scanner.next(results);
-				Map<String, byte[]> kvMap = new HashMap<String, byte[]>();
-				if(!results.isEmpty()){
-					counter ++;
-					byte[] row = results.get(0).getRow();
-					long timestamp = RowkeyBuilder.getTimestamp(row, ed);
-					
-					// Min
-					if(startTimestamp == 0 || startTimestamp > timestamp ){
-						startTimestamp = timestamp;
-					}
-					
-					// Max
-					if(stopTimestamp == 0 || stopTimestamp < timestamp ){
-						stopTimestamp = timestamp;
-					}
-					
-					for(Cell kv:results){
-						String qualifierName = Bytes.toString(kv.getQualifier());
-//						Qualifier qualifier = null;
-//						if(!ed.isTag(qualifierName)){
-//							qualifier = ed.getQualifierNameMap().get(qualifierName);
-//							if(qualifier == null){
-//								LOG.error("qualifier for   " + qualifierName + " not exist");
-//								throw new NullPointerException("qualifier for field "+qualifierName+" not exist");
-//							}
-//						}
-						if(kv.getValue()!=null) kvMap.put(qualifierName ,kv.getValue());
-					}
-					
-					// LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]");
-					
-					InternalLog internalLog = HBaseInternalLogHelper.buildObject(ed, row, timestamp, kvMap);
-					if(internalLog!=null){
-						TaggedLogAPIEntity logAPIEntity = null;
-						try {
-							logAPIEntity = HBaseInternalLogHelper.buildEntity(internalLog, ed);
-							if(logAPIEntity instanceof GenericMetricEntity){
-								if(singleMetricEntity == null) singleMetricEntity = new GenericMetricShadowEntity();
-								GenericMetricEntity e = (GenericMetricEntity)logAPIEntity;
-								if(e.getValue()!=null) {
-									int count = e.getValue().length;
-									@SuppressWarnings("unused")
-									Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
-									for (int i = 0; i < count; i++) {
-										long ts = logAPIEntity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
-										// exclude those entity which is not within the time range in search condition. [start, end)
-										singleMetricEntity.setTimestamp(ts);
-										singleMetricEntity.setTags(e.getTags());
-										singleMetricEntity.setValue(e.getValue()[i]);
-										// Min
-										if (startTimestamp == 0 || startTimestamp > ts) startTimestamp = ts;
-										// Max
-										if (stopTimestamp == 0 || stopTimestamp < ts) stopTimestamp = ts;
-										listener.entityCreated(singleMetricEntity);
-									}
-								}
-							}else {
-								// LOG.info("DEBUG: rowKey="+logAPIEntity.getEncodedRowkey());
-								listener.entityCreated(logAPIEntity);
-							}
-						} catch (Exception e) {
-							if(internalLog!=null) {
-								LOG.error("Got exception to handle " + internalLog.toString() + ": " + e.getMessage(), e);
-							}
-							throw new IOException(e);
-						}
-					}else{
-						LOG.error("Got null to parse internal log for row: " + row.length + " with fields: " + kvMap);
-					}
-					results.clear();
-				}else{
-					if(LOG.isDebugEnabled()) LOG.warn("Empty batch of KeyValue");
-				}
-			} while(hasMoreRows);
-		}catch(IOException ex){
-			LOG.error(ex.getMessage(),ex);
-			throw ex;
-		} finally {
-            if(scanner != null) {
-                scanner.close();
-            }
-		}
-		return new InternalReadReport(counter,startTimestamp,stopTimestamp);
-	}
-
-	/**
-	 * Asynchronous HBase scan read as RAW qualifier
-	 *
-	 * @param scan
-	 * @param listener
-	 * @throws Exception
-	 */
-	protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, QualifierCreationListener listener) throws IOException {
-//		_init();
-		long counter = 0;
-		long startTimestamp = 0;
-		long stopTimestamp = 0;
-		InternalScanner scanner = this.getCurrentRegion().getScanner(scan);
-		List<Cell> results = new ArrayList<Cell>();
-		try{
-			boolean hasMoreRows;//false by default
-			do{
-				hasMoreRows = scanner.next(results);
-				Map<String, byte[]> kvMap = new HashMap<String, byte[]>();
-				if(!results.isEmpty()){
-					counter ++;
-					byte[] row = results.get(0).getRow();
-//					if(ed.isTimeSeries()){
-					long timestamp = RowkeyBuilder.getTimestamp(row,ed);
-					// Min
-					if(startTimestamp == 0 || startTimestamp > timestamp ){
-						startTimestamp = timestamp;
-					}
-					// Max
-					if(stopTimestamp == 0 || stopTimestamp < timestamp ){
-						stopTimestamp = timestamp;
-					}
-//					}
-					
-					for(Cell kv:results){
-						String qualifierName = Bytes.toString(kv.getQualifier());
-						Qualifier qualifier = null;
-						if(!ed.isTag(qualifierName)){
-							qualifier = ed.getQualifierNameMap().get(qualifierName);
-							if(qualifier == null){
-								LOG.error("qualifier for field " + qualifierName + " not exist");
-								throw new IOException(new NullPointerException("qualifier for field "+qualifierName+" is null"));
-							}
-							qualifierName = qualifier.getDisplayName();
-						}
-						if(kv.getValue()!=null) kvMap.put(qualifierName,kv.getValue());
-					}
-					
-//					LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]");
-
-					if(!kvMap.isEmpty()) listener.qualifierCreated(kvMap);
-					results.clear();
-				}else{
-					if(LOG.isDebugEnabled()) LOG.warn("Empty batch of KeyValue");
-				}
-			} while(hasMoreRows);
-		} catch(IOException ex){
-			LOG.error(ex.getMessage(),ex);
-			throw ex;
-		} finally {
-            if(scanner != null) {
-                scanner.close();
-            }
-		}
-
-		return new InternalReadReport(counter,startTimestamp,stopTimestamp);
-	}
-
-    @Override
-    public void aggregate(RpcController controller, AggregateProtos.AggregateRequest request, RpcCallback<AggregateProtos.AggregateResult> done) {
-        AggregateResult result = null;
-        try {
-            result = this.aggregate(ProtoBufConverter.fromPBEntityDefinition(request.getEntityDefinition()),
-                    ProtoBufConverter.fromPBScan(request.getScan()),
-                    ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()),
-                    ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()),
-                    ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList())
-            );
-        } catch (IOException e) {
-            ResponseConverter.setControllerException(controller, e);
-        }
-        try {
-            done.run(ProtoBufConverter.toPBAggregateResult(result));
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to convert result to PB-based message",e);
-        }
-    }
-
-    @Override
-    public void timeseriesAggregate(RpcController controller, AggregateProtos.TimeSeriesAggregateRequest request, RpcCallback<AggregateProtos.AggregateResult> done) {
-        AggregateResult result = null;
-        try {
-            result = this.aggregate(ProtoBufConverter.fromPBEntityDefinition(request.getEntityDefinition()),
-                    ProtoBufConverter.fromPBScan(request.getScan()),
-                    ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()),
-                    ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()),
-                    ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList()),
-                    request.getStartTime(),
-                    request.getEndTime(),
-                    request.getIntervalMin()
-            );
-        } catch (IOException e) {
-            LOG.error("Failed to convert result to PB-based message",e);
-            ResponseConverter.setControllerException(controller, e);
-        }
-        try {
-            done.run(ProtoBufConverter.toPBAggregateResult(result));
-        } catch (IOException e) {
-            LOG.error("Failed to convert result to PB-based message",e);
-            ResponseConverter.setControllerException(controller, e);
-        }
-    }
-
-    private final static Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class);
-    /**
-     *
-     * @param entityDefinition
-     * @param scan
-     * @param groupbyFields
-     * @param aggregateFuncTypes
-     * @param aggregatedFields
-     * @return
-     * @throws Exception
-     */
-    @Override
-    public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
-//		LOG.info("Using coprocessor instance: "+this);
-        checkNotNull(entityDefinition, "entityDefinition");
-        String serviceName = entityDefinition.getService();
-        LOG.info(this.getLogHeader() +" raw group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields);
-        if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
-        long _start = System.currentTimeMillis();
-        final RawAggregator aggregator = new RawAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,entityDefinition);
-        InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator);
-
-        List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
-        AggregateResult result = new AggregateResult();
-        result.setKeyValues(keyValues);
-        result.setStartTimestamp(report.getStartTimestamp());
-        result.setStopTimestamp(report.getStopTimestamp());
-
-        long _stop = System.currentTimeMillis();
-        LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
-
-        return result;
-    }
-
-    /**
-     * TODO: refactor time series aggregator to remove dependency of business logic entity class
-     *
-     * @param entityDefinition
-     * @param scan
-     * @param groupbyFields
-     * @param aggregateFuncTypes
-     * @param aggregatedFields
-     * @param intervalMin
-     * @return
-     * @throws Exception
-     */
-    @Override
-    public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields, long startTime,long endTime,long intervalMin) throws IOException {
-//		LOG.info("Using coprocessor instance: "+this);
-        checkNotNull(entityDefinition, "entityDefinition");
-        String serviceName = entityDefinition.getService();
-        LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields + " intervalMin: " + intervalMin +
-                " from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime) + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime));
-        if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
-        long _start = System.currentTimeMillis();
-        final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,startTime,endTime,intervalMin);
-        InternalReadReport report = this.asyncStreamRead(entityDefinition, scan,aggregator);
-        List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
-
-        AggregateResult result = new AggregateResult();
-        result.setKeyValues(keyValues);
-        result.setStartTimestamp(report.getStartTimestamp());
-        result.setStopTimestamp(report.getStopTimestamp());
-
-        long _stop = System.currentTimeMillis();
-        LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
-
-        return result;
-    }
-
-//	/**
-//	 * Initialization per aggregate RPC call
-//	 */
-//	private void _init(){
-//		this.startTimestamp = 0;
-//		this.stopTimestamp = 0;
-//	}
-//
-//	// Min
-//	private long startTimestamp;
-//	// Max
-//	private long stopTimestamp;
-//
-//	public long getStartTimestamp() {
-//		return startTimestamp;
-//	}
-//	public long getStopTimestamp() {
-//		return stopTimestamp;
-//	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
deleted file mode 100755
index 84380eb..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.hbase.query.coprocessor;
-
-import org.apache.eagle.query.aggregate.raw.GroupbyKey;
-import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
-import org.apache.eagle.query.aggregate.raw.GroupbyValue;
-import org.apache.eagle.query.aggregate.raw.WritableList;
-import org.apache.hadoop.io.Writable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Aggregated writable result consist of group-by key-values list and additional meta information
- * 
- * <h2>Schema</h2>
- * <pre>
- * {
- *  keyValues: WritableList&lt;GroupbyKeyValue&gt;,
- *  startTimestamp: long,
- *  stopTimestamp: long
- * }
- * </pre>
- */
-public class AggregateResult implements Writable,Serializable{
-
-	private final static Logger LOG = LoggerFactory.getLogger(AggregateResult.class);
-
-	/**
-	 * Automatically generated default serialVersionUID
-	 */
-	private static final long serialVersionUID = 1L;
-	
-	private final WritableList<GroupbyKeyValue> keyValues;
-	
-	private long startTimestamp = 0;
-	
-	public long getStartTimestamp() {
-		return startTimestamp;
-	}
-
-	public void setStartTimestamp(long startTimestamp) {
-		this.startTimestamp = startTimestamp;
-	}
-
-	public long getStopTimestamp() {
-		return stopTimestamp;
-	}
-
-	public void setStopTimestamp(long stopTimestamp) {
-		this.stopTimestamp = stopTimestamp;
-	}
-
-	public WritableList<GroupbyKeyValue> getKeyValues() {
-		return keyValues;
-	}
-	
-	public void setKeyValues(List<GroupbyKeyValue> keyValues){
-		this.keyValues.addAll(keyValues);
-	}
-	
-	private long stopTimestamp;
-	
-	public AggregateResult(){
-		this.keyValues = new WritableList<GroupbyKeyValue>(GroupbyKeyValue.class);
-	}
-	
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		this.startTimestamp = in.readLong();
-		this.stopTimestamp = in.readLong();
-		keyValues.readFields(in);
-	}
-	
-	@Override
-	public void write(DataOutput out) throws IOException {
-		out.writeLong(this.startTimestamp);
-		out.writeLong(this.stopTimestamp);
-		keyValues.write(out);
-	}
-
-
-	public static AggregateResult build(List<String[]> keys,List<double[]> values,List<Integer> counts,long startTimestamp,long stopTimestamp){
-		if(keys.size() > values.size()){
-			throw new IllegalArgumentException("keys' size: "+keys.size()+" not equal with values' size: "+values.size());
-		}
-		AggregateResult result = new AggregateResult();
-		result.setStartTimestamp(startTimestamp);
-		result.setStopTimestamp(stopTimestamp);
-		WritableList<GroupbyKeyValue> keyValues = new WritableList<GroupbyKeyValue>(GroupbyKeyValue.class,keys.size());
-
-		for(int i=0;i<keys.size();i++) {
-			String[] key  = keys.get(i);
-			GroupbyKey gkey = new GroupbyKey();
-			for (String k : key) {
-				gkey.addValue(k.getBytes());
-			}
-			GroupbyValue gvalue = new GroupbyValue();
-			double[] value = values.get(i);
-			for(double v:value){
-				gvalue.add(v);
-				gvalue.addMeta(counts.get(i));
-			}
-			keyValues.add(new GroupbyKeyValue(gkey, gvalue));
-		}
-		result.setKeyValues(keyValues);
-		return result;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
deleted file mode 100755
index a68a592..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.hbase.query.coprocessor;
-
-import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-
-/**
- * <h1>AggregateResultCallback Interface</h1>
- *
- * Merge coprocessor results from different regions and generate final aggregate result
- * <br/>
- *
- * @see org.apache.hadoop.hbase.client.HTableInterface
- * 		coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable) throws IOException, Throwable;
- *
- */
-public interface AggregateResultCallback extends Batch.Callback<AggregateProtos.AggregateResult>{
-	/**
-	 * Generate final result after callback from region servers
-	 *
-	 * @return AggregateResult
-	 */
-    AggregateResult result();
-
-    /**
-     * Compatible for older callback interface in 0.94 or older
-     *
-     * @param region
-     * @param row
-     * @param result
-     */
-    void update(byte[] region, byte[] row, AggregateResult result);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
deleted file mode 100644
index 060d3ba..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.hbase.query.coprocessor;
-
-import org.apache.eagle.log.entity.meta.EntityDefinition;
-import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import com.google.protobuf.ByteString;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.io.Writable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * The protocol adapter for migrating from hbase-0.94 to hbase-0.96+
- *
- * @since 6/3/15
- */
-public final class ProtoBufConverter {
-    public static AggregateResult fromPBResult(AggregateProtos.AggregateResult pbResult) throws IOException {
-        ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbResult.getByteArray().toByteArray());;
-        AggregateResult result = new AggregateResult();
-        result.readFields(byteArrayDataInput);
-        return result;
-    }
-
-    public static AggregateProtos.AggregateRequest toPBRequest(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypesBytes, List<String> aggregatedFields) throws IOException {
-        AggregateProtos.AggregateRequest.Builder builder = AggregateProtos.AggregateRequest.newBuilder()
-                .setEntityDefinition(AggregateProtos.EntityDefinition.newBuilder().setByteArray(writableToByteString(entityDefinition)))
-                .setScan(toPBScan(scan));
-
-        for(String groupbyField:groupbyFields) builder.addGroupbyFields(groupbyField);
-        for(byte[] funcTypeBytes:aggregateFuncTypesBytes) builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes));
-        for(String aggField:aggregatedFields) builder.addAggregatedFields(aggField);
-
-        return builder.build();
-    }
-
-    public static ByteString writableToByteString(Writable writable) throws IOException {
-        ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();;
-        writable.write(dataOutput);
-        return ByteString.copyFrom(dataOutput.toByteArray());
-    }
-
-    public static AggregateProtos.TimeSeriesAggregateRequest toPBTimeSeriesRequest(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypesBytes, List<String> aggregatedFields, long startTime, long endTime, long intervalMin) throws IOException {
-        AggregateProtos.TimeSeriesAggregateRequest.Builder builder = AggregateProtos.TimeSeriesAggregateRequest.newBuilder()
-                .setEntityDefinition(AggregateProtos.EntityDefinition.newBuilder().setByteArray(writableToByteString(entityDefinition)))
-                .setScan(toPBScan(scan));
-
-        for(String groupbyField:groupbyFields) builder.addGroupbyFields(groupbyField);
-        for(byte[] funcTypeBytes:aggregateFuncTypesBytes) builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes));
-        for(String aggField:aggregatedFields) builder.addAggregatedFields(aggField);
-
-        builder.setStartTime(startTime);
-        builder.setEndTime(endTime);
-        builder.setIntervalMin(intervalMin);
-
-        return builder.build();
-    }
-
-    public static EntityDefinition fromPBEntityDefinition(AggregateProtos.EntityDefinition entityDefinition) throws IOException {
-        ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(entityDefinition.getByteArray().toByteArray());;
-        EntityDefinition result = new EntityDefinition();
-        result.readFields(byteArrayDataInput);
-        return result;
-    }
-
-    public static List<String> fromPBStringList(com.google.protobuf.ProtocolStringList groupbyFieldsList) {
-        List<String> result = new ArrayList<>(groupbyFieldsList.size());
-        for(ByteString byteString:groupbyFieldsList.asByteStringList()){
-            result.add(byteString.toStringUtf8());
-        }
-        return result;
-    }
-
-    public static List<byte[]> fromPBByteArrayList(List<ByteString> aggregateFuncTypesList) {
-        List<byte[]> bytesArrayList = new ArrayList<>(aggregateFuncTypesList.size());
-        for(ByteString byteString:aggregateFuncTypesList){
-            bytesArrayList.add(byteString.toByteArray());
-        }
-        return bytesArrayList;
-    }
-
-    /**
-     *
-     * @param scan
-     * @return
-     */
-    public static Scan fromPBScan(ClientProtos.Scan scan) throws IOException {
-        return ProtobufUtil.toScan(scan);
-    }
-
-    public static ClientProtos.Scan toPBScan(Scan scan) throws IOException {
-        return ProtobufUtil.toScan(scan);
-    }
-
-    public static AggregateProtos.AggregateResult toPBAggregateResult(AggregateResult result) throws IOException {
-        ByteArrayDataOutput output = ByteStreams.newDataOutput();
-        result.write(output);
-        return AggregateProtos.AggregateResult.newBuilder()
-                .setByteArray(ByteString.copyFrom(output.toByteArray()))
-                .build();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java
deleted file mode 100755
index af213e9..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//package eagle.storage.hbase.query.coprocessor;
-//
-//import eagle.log.entity.meta.EntityDefinition;
-//import eagle.query.aggregate.AggregateFunctionType;
-//import eagle.query.aggregate.raw.GroupbyKeyValue;
-//import eagle.query.aggregate.raw.RawAggregator;
-//import eagle.query.aggregate.timeseries.TimeSeriesAggregator;
-//import eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
-//import eagle.storage.hbase.query.coprocessor.impl.AbstractAggregateEndPoint;
-//import hadoop.eagle.common.DateTimeUtil;
-//import com.google.protobuf.RpcCallback;
-//import com.google.protobuf.RpcController;
-//import org.apache.hadoop.hbase.client.Scan;
-//import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-//import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-//import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import java.io.IOException;
-//import java.util.List;
-//
-///**
-// * Coprocessor EndPoint of protocol <code>AggregateProtocol</code>
-// *
-// * <br/>
-// * <h2>Deployment:</h2>
-// *
-// * Firstly deploy jar files to cluster on local file system or HDFS.<br/><br/>
-// * Secondly configure in <code>hbase-site.xml</code> as following:
-// * <pre>
-// * &lt;property&gt;
-// *   &lt;name>hbase.coprocessor.region.classes&lt;/name&gt;
-// *   &lt;value>AggregateProtocolEndPoint&lt;/value&gt;
-// * &lt;/property&gt;
-// * </pre>
-// * Or register on related hbase tables
-// * <pre>
-// * hbase(main):005:0>  alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs:///foo.jar|AggregateProtocolEndPoint|1001|'
-// * </pre>
-// *
-// * <h2>Reference:</h2>
-// * <a href="https://blogs.apache.org/hbase/entry/coprocessor_introduction">
-// * 	Coprocessor Introduction
-// * 	(Authors: Trend Micro Hadoop Group: Mingjie Lai, Eugene Koontz, Andrew Purtell)
-// * </a> <br/><br/>
-// *
-// * @see AggregateProtocol
-// *
-//// * @since : 10/31/14,2014
-// */
-//@SuppressWarnings("unused")
-//public class AggregateProtocolEndPoint extends AbstractAggregateEndPoint {
-//	private final static Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class);
-//	/**
-//	 *
-//	 * @param entityDefinition
-//	 * @param scan
-//	 * @param groupbyFields
-//	 * @param aggregateFuncTypes
-//	 * @param aggregatedFields
-//	 * @return
-//	 * @throws Exception
-//	 */
-//	@Override
-//	public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
-////		LOG.info("Using coprocessor instance: "+this);
-//		checkNotNull(entityDefinition, "entityDefinition");
-//		String serviceName = entityDefinition.getService();
-//		LOG.info(this.getLogHeader() +" raw group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields);
-//		if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
-//		long _start = System.currentTimeMillis();
-//		final RawAggregator aggregator = new RawAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,entityDefinition);
-//		InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator);
-//
-//		List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
-//		AggregateResult result = new AggregateResult();
-//		result.setKeyValues(keyValues);
-//		result.setStartTimestamp(report.getStartTimestamp());
-//		result.setStopTimestamp(report.getStopTimestamp());
-//
-//		long _stop = System.currentTimeMillis();
-//		LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
-//
-//		return result;
-//	}
-//
-//	/**
-//	 * TODO: refactor time series aggregator to remove dependency of business logic entity class
-//	 *
-//	 * @param entityDefinition
-//	 * @param scan
-//	 * @param groupbyFields
-//	 * @param aggregateFuncTypes
-//	 * @param aggregatedFields
-//	 * @param intervalMin
-//	 * @return
-//	 * @throws Exception
-//	 */
-//	@Override
-//	public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields, long startTime,long endTime,long intervalMin) throws IOException {
-////		LOG.info("Using coprocessor instance: "+this);
-//		checkNotNull(entityDefinition, "entityDefinition");
-//		String serviceName = entityDefinition.getService();
-//		LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields + " intervalMin: " + intervalMin +
-//				" from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime) + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime));
-//		if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
-//		long _start = System.currentTimeMillis();
-//		final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,startTime,endTime,intervalMin);
-//		InternalReadReport report = this.asyncStreamRead(entityDefinition, scan,aggregator);
-//		List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
-//
-//		AggregateResult result = new AggregateResult();
-//		result.setKeyValues(keyValues);
-//		result.setStartTimestamp(report.getStartTimestamp());
-//		result.setStopTimestamp(report.getStopTimestamp());
-//
-//		long _stop = System.currentTimeMillis();
-//		LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
-//
-//		return result;
-//	}
-//}
\ No newline at end of file


Mime
View raw message