Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DD313200B3E for ; Wed, 7 Sep 2016 19:42:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DB73B160AD3; Wed, 7 Sep 2016 17:42:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E105E160ABF for ; Wed, 7 Sep 2016 19:42:24 +0200 (CEST) Received: (qmail 16028 invoked by uid 500); 7 Sep 2016 17:42:24 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 16019 invoked by uid 99); 7 Sep 2016 17:42:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Sep 2016 17:42:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9B43BC0D53 for ; Wed, 7 Sep 2016 17:42:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id f2Ni0QSLZRKF for ; Wed, 7 Sep 2016 17:42:02 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 727EC60D9F for ; Wed, 7 Sep 2016 17:42:00 +0000 (UTC) Received: (qmail 11993 invoked by uid 99); 7 Sep 2016 17:41:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Sep 2016 17:41:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 070F9E35D5; Wed, 7 Sep 2016 17:41:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yonzhang2012@apache.org To: commits@eagle.incubator.apache.org Date: Wed, 07 Sep 2016 17:42:45 -0000 Message-Id: <6827094f32b94b2993e098c9145ea0ff@git.apache.org> In-Reply-To: <7201ad3b09274d489e31c344a4940ff4@git.apache.org> References: <7201ad3b09274d489e31c344a4940ff4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [49/52] [abbrv] incubator-eagle git commit: [EAGLE-520] Fix and decouple co-processor from eagle aggreation query service archived-at: Wed, 07 Sep 2016 17:42:27 -0000 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java index 53d27de..7ef8b80 100755 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java +++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.storage.hbase.query.coprocessor; +import org.apache.eagle.common.DateTimeUtil; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.*; import org.apache.eagle.log.entity.meta.EntityDefinition; @@ -25,7 +26,7 @@ import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue; import org.apache.eagle.query.aggregate.raw.RawAggregator; import org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator; import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos; -import org.apache.eagle.common.DateTimeUtil; + import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; @@ -49,8 +50,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -//public abstract class AbstractAggregateEndPoint extends BaseEndpointCoprocessor{ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol implements AggregateProtocol, Coprocessor, CoprocessorService { + private static final Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class); private RegionCoprocessorEnvironment env; @@ -59,16 +60,19 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol return this; } - public AggregateProtocolEndPoint() {} + public AggregateProtocolEndPoint() { + } - protected void checkNotNull(Object obj,String name) { - if(obj==null) throw new NullPointerException(name+" is null"); - } + protected void checkNotNull(Object obj, String name) { + if (obj == null) { + throw new NullPointerException(name + " is null"); + } + } @Override public void start(CoprocessorEnvironment env) throws IOException { if (env instanceof RegionCoprocessorEnvironment) { - this.env = (RegionCoprocessorEnvironment)env; + this.env = (RegionCoprocessorEnvironment) env; } else { throw new CoprocessorException("Must be loaded on a table region!"); } @@ -79,302 +83,296 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol // do nothing } -// @Override -// public ProtocolSignature getProtocolSignature(String protocol, long version, int clientMethodsHashCode) throws IOException { -// if (AggregateProtocol.class.getName().equals(protocol)) { -//// return new ProtocolSignature(AggregateProtocol.VERSION, null); -// return new ProtocolSignature(98l, null); -// } -// throw new IOException("Unknown protocol: " + protocol); -// } - - protected HRegion getCurrentRegion(){ - return this.env.getRegion(); - } - - /** - *
-	 * Region-unittest,\x82\xB4\x85\xC2\x7F\xFF\xFE\xB6\xC9jNG\xEE!\x5C3\xBB\xAE\xA1:\x05\xA5\xA9x\xB0\xA1"8\x05\xFB(\xD2VY\xDB\x9A\x06\x09\xA9\x98\xC2\xE3\x8D=,1413960230654.aaf2a6c9f2c87c196f43497243bb2424.
-	 * RegionID-unittest,1413960230654
-	 * 
- */ - protected String getLogHeader(){ - HRegion region = this.getCurrentRegion(); - return LOG.isDebugEnabled() ? String.format("Region-%s",region.getRegionNameAsString()): - String.format("Region-%s,%d",region.getTableDesc().getNameAsString(),region.getRegionId()); - } - - protected class InternalReadReport { - public InternalReadReport(long counter,long startTimestamp,long stopTimestamp){ - this.counter = counter; - this.startTimestamp = startTimestamp; - this.stopTimestamp = stopTimestamp; - } - public long getCounter() { - return counter; - } - public void setCounter(long counter) { - this.counter = counter; - } - - public long getStartTimestamp() { - return startTimestamp; - } - - public void setStartTimestamp(long startTimestamp) { - this.startTimestamp = startTimestamp; - } - - public long getStopTimestamp() { - return stopTimestamp; - } - - public void setStopTimestamp(long stopTimestamp) { - this.stopTimestamp = stopTimestamp; - } - - private long counter; - private long startTimestamp; - private long stopTimestamp; - } - - /** - * Asynchronous HBase scan read as entity - * - * @param scan - * @throws java.io.IOException - */ - protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, EntityCreationListener listener) throws IOException { -// _init(); - long counter = 0; - long startTimestamp = 0; - long stopTimestamp = 0; - - InternalScanner scanner = this.getCurrentRegion().getScanner(scan); - List results = new ArrayList(); - try{ - boolean hasMoreRows; - GenericMetricShadowEntity singleMetricEntity = null; - do{ - hasMoreRows = scanner.next(results); - Map kvMap = new HashMap(); - if(!results.isEmpty()){ - counter ++; - byte[] row = results.get(0).getRow(); - long timestamp = RowkeyBuilder.getTimestamp(row, ed); - - // Min - if(startTimestamp == 0 || startTimestamp > timestamp ){ - startTimestamp = timestamp; - } - - // Max - if(stopTimestamp == 0 || stopTimestamp < timestamp ){ - stopTimestamp = timestamp; - } - - for(Cell kv:results){ - String qualifierName = Bytes.toString(kv.getQualifier()); -// Qualifier qualifier = null; -// if(!ed.isTag(qualifierName)){ -// qualifier = ed.getQualifierNameMap().get(qualifierName); -// if(qualifier == null){ -// LOG.error("qualifier for " + qualifierName + " not exist"); -// throw new NullPointerException("qualifier for field "+qualifierName+" not exist"); -// } -// } - if(kv.getValue()!=null) kvMap.put(qualifierName ,kv.getValue()); - } - - // LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]"); - - InternalLog internalLog = HBaseInternalLogHelper.buildObject(ed, row, timestamp, kvMap); - if(internalLog!=null){ - TaggedLogAPIEntity logAPIEntity = null; - try { - logAPIEntity = HBaseInternalLogHelper.buildEntity(internalLog, ed); - if(logAPIEntity instanceof GenericMetricEntity){ - if(singleMetricEntity == null) singleMetricEntity = new GenericMetricShadowEntity(); - GenericMetricEntity e = (GenericMetricEntity)logAPIEntity; - if(e.getValue()!=null) { - int count = e.getValue().length; - @SuppressWarnings("unused") - Class cls = ed.getMetricDefinition().getSingleTimestampEntityClass(); - for (int i = 0; i < count; i++) { - long ts = logAPIEntity.getTimestamp() + i * ed.getMetricDefinition().getInterval(); - // exclude those entity which is not within the time range in search condition. [start, end) - singleMetricEntity.setTimestamp(ts); - singleMetricEntity.setTags(e.getTags()); - singleMetricEntity.setValue(e.getValue()[i]); - // Min - if (startTimestamp == 0 || startTimestamp > ts) startTimestamp = ts; - // Max - if (stopTimestamp == 0 || stopTimestamp < ts) stopTimestamp = ts; - listener.entityCreated(singleMetricEntity); - } - } - }else { - // LOG.info("DEBUG: rowKey="+logAPIEntity.getEncodedRowkey()); - listener.entityCreated(logAPIEntity); - } - } catch (Exception e) { - if(internalLog!=null) { - LOG.error("Got exception to handle " + internalLog.toString() + ": " + e.getMessage(), e); - } - throw new IOException(e); - } - }else{ - LOG.error("Got null to parse internal log for row: " + row.length + " with fields: " + kvMap); - } - results.clear(); - }else{ - if(LOG.isDebugEnabled()) LOG.warn("Empty batch of KeyValue"); - } - } while(hasMoreRows); - }catch(IOException ex){ - LOG.error(ex.getMessage(),ex); - throw ex; - } finally { - if(scanner != null) { + protected HRegion getCurrentRegion() { + return this.env.getRegion(); + } + + /** + *
+     * Region-unittest
+     *  ,\x82\xB4\x85\xC2\x7F\xFF\xFE\xB6\xC9jNG\xEE!\x5C3\xBB\xAE\xA1
+     *  :\x05\xA5\xA9x\xB0\xA1"8\x05\xFB(\xD2VY\xDB\x9A\x06\x09\xA9\x98\xC2\xE3\x8D=,1413960230654.aaf2a6c9f2c87c196f43497243bb2424.
+     *
+     * RegionID-unittest,1413960230654
+     * 
+ */ + protected String getLogHeader() { + HRegion region = this.getCurrentRegion(); + return LOG.isDebugEnabled() ? String.format("Region-%s", region.getRegionNameAsString()) : + String.format("Region-%s,%d", region.getTableDesc().getNameAsString(), region.getRegionId()); + } + + protected class InternalReadReport { + public InternalReadReport(long counter, long startTimestamp, long stopTimestamp) { + this.counter = counter; + this.startTimestamp = startTimestamp; + this.stopTimestamp = stopTimestamp; + } + + public long getCounter() { + return counter; + } + + public void setCounter(long counter) { + this.counter = counter; + } + + public long getStartTimestamp() { + return startTimestamp; + } + + public void setStartTimestamp(long startTimestamp) { + this.startTimestamp = startTimestamp; + } + + public long getStopTimestamp() { + return stopTimestamp; + } + + public void setStopTimestamp(long stopTimestamp) { + this.stopTimestamp = stopTimestamp; + } + + private long counter; + private long startTimestamp; + private long stopTimestamp; + } + + /** + * Asynchronous HBase scan read as entity. + */ + protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, EntityCreationListener listener) throws IOException { + long counter = 0; + long startTimestamp = 0; + long stopTimestamp = 0; + + InternalScanner scanner = this.getCurrentRegion().getScanner(scan); + List results = new ArrayList(); + try { + boolean hasMoreRows; + GenericMetricShadowEntity singleMetricEntity = null; + do { + hasMoreRows = scanner.next(results); + Map kvMap = new HashMap(); + if (!results.isEmpty()) { + counter++; + byte[] row = results.get(0).getRow(); + long timestamp = RowkeyBuilder.getTimestamp(row, ed); + + // Min + if (startTimestamp == 0 || startTimestamp > timestamp) { + startTimestamp = timestamp; + } + + // Max + if (stopTimestamp == 0 || stopTimestamp < timestamp) { + stopTimestamp = timestamp; + } + + for (Cell kv : results) { + String qualifierName = Bytes.toString(kv.getQualifier()); + // Qualifier qualifier = null; + // if(!ed.isTag(qualifierName)){ + // qualifier = ed.getQualifierNameMap().get(qualifierName); + // if(qualifier == null){ + // LOG.error("qualifier for " + qualifierName + " not exist"); + // throw new NullPointerException("qualifier for field "+qualifierName+" not exist"); + // } + // } + if (kv.getValue() != null) { + kvMap.put(qualifierName, kv.getValue()); + } + } + + // LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]"); + + InternalLog internalLog = HBaseInternalLogHelper.buildObject(ed, row, timestamp, kvMap); + if (internalLog != null) { + TaggedLogAPIEntity logAPIEntity = null; + try { + logAPIEntity = HBaseInternalLogHelper.buildEntity(internalLog, ed); + if (logAPIEntity instanceof GenericMetricEntity) { + if (singleMetricEntity == null) { + singleMetricEntity = new GenericMetricShadowEntity(); + } + GenericMetricEntity e = (GenericMetricEntity) logAPIEntity; + if (e.getValue() != null) { + int count = e.getValue().length; + @SuppressWarnings("unused") + Class cls = ed.getMetricDefinition().getSingleTimestampEntityClass(); + for (int i = 0; i < count; i++) { + long ts = logAPIEntity.getTimestamp() + i * ed.getMetricDefinition().getInterval(); + // exclude those entity which is not within the time range in search condition. [start, end) + singleMetricEntity.setTimestamp(ts); + singleMetricEntity.setTags(e.getTags()); + singleMetricEntity.setValue(e.getValue()[i]); + // Min + if (startTimestamp == 0 || startTimestamp > ts) { + startTimestamp = ts; + } + // Max + if (stopTimestamp == 0 || stopTimestamp < ts) { + stopTimestamp = ts; + } + listener.entityCreated(singleMetricEntity); + } + } + } else { + // LOG.info("DEBUG: rowKey="+logAPIEntity.getEncodedRowkey()); + listener.entityCreated(logAPIEntity); + } + } catch (Exception e) { + if (internalLog != null) { + LOG.error("Got exception to handle " + internalLog.toString() + ": " + e.getMessage(), e); + } + throw new IOException(e); + } + } else { + LOG.error("Got null to parse internal log for row: " + row.length + " with fields: " + kvMap); + } + results.clear(); + } else { + if (LOG.isDebugEnabled()) { + LOG.warn("Empty batch of KeyValue"); + } + } + } + while (hasMoreRows); + } catch (IOException ex) { + LOG.error(ex.getMessage(), ex); + throw ex; + } finally { + if (scanner != null) { scanner.close(); } - } - return new InternalReadReport(counter,startTimestamp,stopTimestamp); - } - - /** - * Asynchronous HBase scan read as RAW qualifier - * - * @param scan - * @param listener - * @throws Exception - */ - protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, QualifierCreationListener listener) throws IOException { -// _init(); - long counter = 0; - long startTimestamp = 0; - long stopTimestamp = 0; - InternalScanner scanner = this.getCurrentRegion().getScanner(scan); - List results = new ArrayList(); - try{ - boolean hasMoreRows;//false by default - do{ - hasMoreRows = scanner.next(results); - Map kvMap = new HashMap(); - if(!results.isEmpty()){ - counter ++; - byte[] row = results.get(0).getRow(); -// if(ed.isTimeSeries()){ - long timestamp = RowkeyBuilder.getTimestamp(row,ed); - // Min - if(startTimestamp == 0 || startTimestamp > timestamp ){ - startTimestamp = timestamp; - } - // Max - if(stopTimestamp == 0 || stopTimestamp < timestamp ){ - stopTimestamp = timestamp; - } -// } - - for(Cell kv:results){ - String qualifierName = Bytes.toString(kv.getQualifier()); - Qualifier qualifier = null; - if(!ed.isTag(qualifierName)){ - qualifier = ed.getQualifierNameMap().get(qualifierName); - if(qualifier == null){ - LOG.error("qualifier for field " + qualifierName + " not exist"); - throw new IOException(new NullPointerException("qualifier for field "+qualifierName+" is null")); - } - qualifierName = qualifier.getDisplayName(); - } - if(kv.getValue()!=null) kvMap.put(qualifierName,kv.getValue()); - } - -// LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]"); - - if(!kvMap.isEmpty()) listener.qualifierCreated(kvMap); - results.clear(); - }else{ - if(LOG.isDebugEnabled()) LOG.warn("Empty batch of KeyValue"); - } - } while(hasMoreRows); - } catch(IOException ex){ - LOG.error(ex.getMessage(),ex); - throw ex; - } finally { - if(scanner != null) { + } + return new InternalReadReport(counter, startTimestamp, stopTimestamp); + } + + /** + * Asynchronous HBase scan read as RAW qualifier. + */ + protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, QualifierCreationListener listener) throws IOException { + long counter = 0; + long startTimestamp = 0; + long stopTimestamp = 0; + InternalScanner scanner = this.getCurrentRegion().getScanner(scan); + List results = new ArrayList(); + try { + boolean hasMoreRows;//false by default + do { + hasMoreRows = scanner.next(results); + Map kvMap = new HashMap<>(); + if (!results.isEmpty()) { + counter++; + byte[] row = results.get(0).getRow(); + long timestamp = RowkeyBuilder.getTimestamp(row, ed); + // Min + if (startTimestamp == 0 || startTimestamp > timestamp) { + startTimestamp = timestamp; + } + // Max + if (stopTimestamp == 0 || stopTimestamp < timestamp) { + stopTimestamp = timestamp; + } + + for (Cell kv : results) { + String qualifierName = Bytes.toString(kv.getQualifier()); + Qualifier qualifier = null; + if (!ed.isTag(qualifierName)) { + qualifier = ed.getQualifierNameMap().get(qualifierName); + if (qualifier == null) { + LOG.error("qualifier for field " + qualifierName + " not exist"); + throw new IOException(new NullPointerException("qualifier for field " + qualifierName + " is null")); + } + qualifierName = qualifier.getDisplayName(); + } + if (kv.getValue() != null) { + kvMap.put(qualifierName, kv.getValue()); + } + } + + if (!kvMap.isEmpty()) { + listener.qualifierCreated(kvMap); + } + results.clear(); + } else { + if (LOG.isDebugEnabled()) { + LOG.warn("Empty batch of KeyValue"); + } + } + } + while (hasMoreRows); + } catch (IOException ex) { + LOG.error(ex.getMessage(), ex); + throw ex; + } finally { + if (scanner != null) { scanner.close(); } - } + } - return new InternalReadReport(counter,startTimestamp,stopTimestamp); - } + return new InternalReadReport(counter, startTimestamp, stopTimestamp); + } @Override - public void aggregate(RpcController controller, AggregateProtos.AggregateRequest request, RpcCallback done) { + public void timeseriesAggregate(RpcController controller, AggregateProtos.TimeSeriesAggregateRequest request, RpcCallback done) { AggregateResult result = null; try { result = this.aggregate(ProtoBufConverter.fromPBEntityDefinition(request.getEntityDefinition()), - ProtoBufConverter.fromPBScan(request.getScan()), - ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()), - ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()), - ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList()) + ProtoBufConverter.fromPBScan(request.getScan()), + ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()), + ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()), + ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList()), + request.getStartTime(), + request.getEndTime(), + request.getIntervalMin() ); } catch (IOException e) { + LOG.error("Failed to convert result to PB-based message", e); ResponseConverter.setControllerException(controller, e); } try { done.run(ProtoBufConverter.toPBAggregateResult(result)); } catch (IOException e) { - throw new RuntimeException("Failed to convert result to PB-based message",e); + LOG.error("Failed to convert result to PB-based message", e); + ResponseConverter.setControllerException(controller, e); } } @Override - public void timeseriesAggregate(RpcController controller, AggregateProtos.TimeSeriesAggregateRequest request, RpcCallback done) { + public void aggregate(RpcController controller, AggregateProtos.AggregateRequest request, RpcCallback done) { AggregateResult result = null; try { result = this.aggregate(ProtoBufConverter.fromPBEntityDefinition(request.getEntityDefinition()), - ProtoBufConverter.fromPBScan(request.getScan()), - ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()), - ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()), - ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList()), - request.getStartTime(), - request.getEndTime(), - request.getIntervalMin() + ProtoBufConverter.fromPBScan(request.getScan()), + ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()), + ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()), + ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList()) ); } catch (IOException e) { - LOG.error("Failed to convert result to PB-based message",e); ResponseConverter.setControllerException(controller, e); } try { done.run(ProtoBufConverter.toPBAggregateResult(result)); } catch (IOException e) { - LOG.error("Failed to convert result to PB-based message",e); - ResponseConverter.setControllerException(controller, e); + throw new RuntimeException("Failed to convert result to PB-based message", e); } } - private final static Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class); - /** - * - * @param entityDefinition - * @param scan - * @param groupbyFields - * @param aggregateFuncTypes - * @param aggregatedFields - * @return - * @throws Exception - */ @Override - public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List groupbyFields, List aggregateFuncTypes, List aggregatedFields) throws IOException { -// LOG.info("Using coprocessor instance: "+this); + public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List groupbyFields, + List aggregateFuncTypes, List aggregatedFields) throws IOException { checkNotNull(entityDefinition, "entityDefinition"); String serviceName = entityDefinition.getService(); - LOG.info(this.getLogHeader() +" raw group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields); - if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON()); - long _start = System.currentTimeMillis(); - final RawAggregator aggregator = new RawAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,entityDefinition); + LOG.info(this.getLogHeader() + " raw group aggregate on service: " + serviceName + + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields); + if (LOG.isDebugEnabled()) { + LOG.debug("SCAN: " + scan.toJSON()); + } + final long startTimestamp = System.currentTimeMillis(); + final RawAggregator aggregator = new RawAggregator(groupbyFields, + AggregateFunctionType.fromBytesList(aggregateFuncTypes), aggregatedFields, entityDefinition); InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator); List keyValues = aggregator.getGroupbyKeyValues(); @@ -384,34 +382,31 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol result.setStopTimestamp(report.getStopTimestamp()); long _stop = System.currentTimeMillis(); - LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start))); + LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", + this.getLogHeader(), report.getCounter(), keyValues.size(), report.getStartTimestamp(), + report.getStopTimestamp(), (_stop - startTimestamp))); return result; } - /** - * TODO: refactor time series aggregator to remove dependency of business logic entity class - * - * @param entityDefinition - * @param scan - * @param groupbyFields - * @param aggregateFuncTypes - * @param aggregatedFields - * @param intervalMin - * @return - * @throws Exception - */ @Override - public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List groupbyFields, List aggregateFuncTypes, List aggregatedFields, long startTime,long endTime,long intervalMin) throws IOException { -// LOG.info("Using coprocessor instance: "+this); + public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List groupbyFields, + List aggregateFuncTypes, List aggregatedFields, long startTime, long endTime, long intervalMin) throws IOException { checkNotNull(entityDefinition, "entityDefinition"); String serviceName = entityDefinition.getService(); - LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields + " intervalMin: " + intervalMin + - " from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime) + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime)); - if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON()); - long _start = System.currentTimeMillis(); - final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,startTime,endTime,intervalMin); - InternalReadReport report = this.asyncStreamRead(entityDefinition, scan,aggregator); + LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName + + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + + " fields: " + aggregatedFields + " intervalMin: " + intervalMin + + " from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime) + + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime)); + if (LOG.isDebugEnabled()) { + LOG.debug("SCAN: " + scan.toJSON()); + } + + final long startTimestamp = System.currentTimeMillis(); + final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields, + AggregateFunctionType.fromBytesList(aggregateFuncTypes), aggregatedFields, startTime, endTime, intervalMin); + InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator); List keyValues = aggregator.getGroupbyKeyValues(); AggregateResult result = new AggregateResult(); @@ -420,28 +415,9 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol result.setStopTimestamp(report.getStopTimestamp()); long _stop = System.currentTimeMillis(); - LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start))); + LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", + this.getLogHeader(), report.getCounter(), keyValues.size(), report.getStartTimestamp(), report.getStopTimestamp(), (_stop - startTimestamp))); return result; } - -// /** -// * Initialization per aggregate RPC call -// */ -// private void _init(){ -// this.startTimestamp = 0; -// this.stopTimestamp = 0; -// } -// -// // Min -// private long startTimestamp; -// // Max -// private long stopTimestamp; -// -// public long getStartTimestamp() { -// return startTimestamp; -// } -// public long getStopTimestamp() { -// return stopTimestamp; -// } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java index 84380eb..a49ad57 100755 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java +++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java @@ -31,8 +31,7 @@ import java.io.Serializable; import java.util.List; /** - * Aggregated writable result consist of group-by key-values list and additional meta information - * + * Aggregated writable result consist of group-by key-values list and additional meta information. *

Schema

*
  * {
@@ -42,88 +41,88 @@ import java.util.List;
  * }
  * 
*/ -public class AggregateResult implements Writable,Serializable{ - - private final static Logger LOG = LoggerFactory.getLogger(AggregateResult.class); - - /** - * Automatically generated default serialVersionUID - */ - private static final long serialVersionUID = 1L; - - private final WritableList keyValues; - - private long startTimestamp = 0; - - public long getStartTimestamp() { - return startTimestamp; - } - - public void setStartTimestamp(long startTimestamp) { - this.startTimestamp = startTimestamp; - } - - public long getStopTimestamp() { - return stopTimestamp; - } - - public void setStopTimestamp(long stopTimestamp) { - this.stopTimestamp = stopTimestamp; - } - - public WritableList getKeyValues() { - return keyValues; - } - - public void setKeyValues(List keyValues){ - this.keyValues.addAll(keyValues); - } - - private long stopTimestamp; - - public AggregateResult(){ - this.keyValues = new WritableList(GroupbyKeyValue.class); - } - - @Override - public void readFields(DataInput in) throws IOException { - this.startTimestamp = in.readLong(); - this.stopTimestamp = in.readLong(); - keyValues.readFields(in); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeLong(this.startTimestamp); - out.writeLong(this.stopTimestamp); - keyValues.write(out); - } - - - public static AggregateResult build(List keys,List values,List counts,long startTimestamp,long stopTimestamp){ - if(keys.size() > values.size()){ - throw new IllegalArgumentException("keys' size: "+keys.size()+" not equal with values' size: "+values.size()); - } - AggregateResult result = new AggregateResult(); - result.setStartTimestamp(startTimestamp); - result.setStopTimestamp(stopTimestamp); - WritableList keyValues = new WritableList(GroupbyKeyValue.class,keys.size()); - - for(int i=0;i keyValues; + + private long startTimestamp = 0; + + public long getStartTimestamp() { + return startTimestamp; + } + + public void setStartTimestamp(long startTimestamp) { + this.startTimestamp = startTimestamp; + } + + public long getStopTimestamp() { + return stopTimestamp; + } + + public void setStopTimestamp(long stopTimestamp) { + this.stopTimestamp = stopTimestamp; + } + + public WritableList getKeyValues() { + return keyValues; + } + + public void setKeyValues(List keyValues) { + this.keyValues.addAll(keyValues); + } + + private long stopTimestamp; + + public AggregateResult() { + this.keyValues = new WritableList(GroupbyKeyValue.class); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.startTimestamp = in.readLong(); + this.stopTimestamp = in.readLong(); + keyValues.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(this.startTimestamp); + out.writeLong(this.stopTimestamp); + keyValues.write(out); + } + + + public static AggregateResult build(List keys, List values, List counts, long startTimestamp, long stopTimestamp) { + if (keys.size() > values.size()) { + throw new IllegalArgumentException("keys' size: " + keys.size() + " not equal with values' size: " + values.size()); + } + AggregateResult result = new AggregateResult(); + result.setStartTimestamp(startTimestamp); + result.setStopTimestamp(stopTimestamp); + WritableList keyValues = new WritableList(GroupbyKeyValue.class, keys.size()); + + for (int i = 0; i < keys.size(); i++) { + String[] key = keys.get(i); + GroupbyKey gkey = new GroupbyKey(); + for (String k : key) { + gkey.addValue(k.getBytes()); + } + GroupbyValue gvalue = new GroupbyValue(); + double[] value = values.get(i); + for (double v : value) { + gvalue.add(v); + gvalue.addMeta(counts.get(i)); + } + keyValues.add(new GroupbyKeyValue(gkey, gvalue)); + } + result.setKeyValues(keyValues); + return result; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java index a68a592..306a6d1 100755 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java +++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java @@ -21,28 +21,20 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; /** *

AggregateResultCallback Interface

- * * Merge coprocessor results from different regions and generate final aggregate result *
- * - * @see org.apache.hadoop.hbase.client.HTableInterface - * coprocessorExec(Class protocol, byte[] startKey, byte[] endKey, Batch.Call callable) throws IOException, Throwable; - * + * @see org.apache.hadoop.hbase.client.HTableInterface#coprocessorService(Class, byte[], byte[], Batch.Call, Batch.Callback) */ -public interface AggregateResultCallback extends Batch.Callback{ - /** - * Generate final result after callback from region servers - * - * @return AggregateResult - */ +public interface AggregateResultCallback extends Batch.Callback { + /** + * Generate final result after callback from region servers. + * + * @return AggregateResult + */ AggregateResult result(); /** - * Compatible for older callback interface in 0.94 or older - * - * @param region - * @param row - * @param result + * Compatible for older callback interface in 0.94 or older. */ void update(byte[] region, byte[] row, AggregateResult result); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java index 060d3ba..c3c57ed 100644 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java +++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java @@ -32,44 +32,63 @@ import java.util.ArrayList; import java.util.List; /** - * The protocol adapter for migrating from hbase-0.94 to hbase-0.96+ - * - * @since 6/3/15 + * The protocol adapter for hbase-0.98 and protobuffer-2.5 */ public final class ProtoBufConverter { public static AggregateResult fromPBResult(AggregateProtos.AggregateResult pbResult) throws IOException { - ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbResult.getByteArray().toByteArray());; + ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbResult.getByteArray().toByteArray()); + ; AggregateResult result = new AggregateResult(); result.readFields(byteArrayDataInput); return result; } - public static AggregateProtos.AggregateRequest toPBRequest(EntityDefinition entityDefinition, Scan scan, List groupbyFields, List aggregateFuncTypesBytes, List aggregatedFields) throws IOException { + public static AggregateProtos.AggregateRequest toPBRequest( + EntityDefinition entityDefinition, Scan scan, List groupbyFields, + List aggregateFuncTypesBytes, List aggregatedFields) throws IOException { AggregateProtos.AggregateRequest.Builder builder = AggregateProtos.AggregateRequest.newBuilder() .setEntityDefinition(AggregateProtos.EntityDefinition.newBuilder().setByteArray(writableToByteString(entityDefinition))) .setScan(toPBScan(scan)); - for(String groupbyField:groupbyFields) builder.addGroupbyFields(groupbyField); - for(byte[] funcTypeBytes:aggregateFuncTypesBytes) builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes)); - for(String aggField:aggregatedFields) builder.addAggregatedFields(aggField); + for (String groupbyField : groupbyFields) { + builder.addGroupbyFields(groupbyField); + } + + for (byte[] funcTypeBytes : aggregateFuncTypesBytes) { + builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes)); + } + + for (String aggField : aggregatedFields) { + builder.addAggregatedFields(aggField); + } return builder.build(); } public static ByteString writableToByteString(Writable writable) throws IOException { - ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();; + ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput(); writable.write(dataOutput); return ByteString.copyFrom(dataOutput.toByteArray()); } - public static AggregateProtos.TimeSeriesAggregateRequest toPBTimeSeriesRequest(EntityDefinition entityDefinition, Scan scan, List groupbyFields, List aggregateFuncTypesBytes, List aggregatedFields, long startTime, long endTime, long intervalMin) throws IOException { - AggregateProtos.TimeSeriesAggregateRequest.Builder builder = AggregateProtos.TimeSeriesAggregateRequest.newBuilder() + public static AggregateProtos.TimeSeriesAggregateRequest toPBTimeSeriesRequest( + EntityDefinition entityDefinition, Scan scan, List groupbyFields, + List aggregateFuncTypesBytes, List aggregatedFields, + long startTime, long endTime, long intervalMin) throws IOException { + AggregateProtos.TimeSeriesAggregateRequest.Builder builder = + AggregateProtos.TimeSeriesAggregateRequest.newBuilder() .setEntityDefinition(AggregateProtos.EntityDefinition.newBuilder().setByteArray(writableToByteString(entityDefinition))) .setScan(toPBScan(scan)); - for(String groupbyField:groupbyFields) builder.addGroupbyFields(groupbyField); - for(byte[] funcTypeBytes:aggregateFuncTypesBytes) builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes)); - for(String aggField:aggregatedFields) builder.addAggregatedFields(aggField); + for (String groupbyField : groupbyFields) { + builder.addGroupbyFields(groupbyField); + } + for (byte[] funcTypeBytes : aggregateFuncTypesBytes) { + builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes)); + } + for (String aggField : aggregatedFields) { + builder.addAggregatedFields(aggField); + } builder.setStartTime(startTime); builder.setEndTime(endTime); @@ -79,33 +98,36 @@ public final class ProtoBufConverter { } public static EntityDefinition fromPBEntityDefinition(AggregateProtos.EntityDefinition entityDefinition) throws IOException { - ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(entityDefinition.getByteArray().toByteArray());; + ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(entityDefinition.getByteArray().toByteArray()); + ; EntityDefinition result = new EntityDefinition(); result.readFields(byteArrayDataInput); return result; } - public static List fromPBStringList(com.google.protobuf.ProtocolStringList groupbyFieldsList) { - List result = new ArrayList<>(groupbyFieldsList.size()); - for(ByteString byteString:groupbyFieldsList.asByteStringList()){ - result.add(byteString.toStringUtf8()); - } - return result; + // /** + // * For protobuffer-2.6 + // */ + // public static List fromPBStringList(com.google.protobuf.ProtocolStringList groupbyFieldsList) { + // List result = new ArrayList<>(groupbyFieldsList.size()); + // for(ByteString byteString:groupbyFieldsList.asByteStringList()){ + // result.add(byteString.toStringUtf8()); + // } + // return result; + // } + + public static List fromPBStringList(List groupbyFieldsList) { + return groupbyFieldsList; } public static List fromPBByteArrayList(List aggregateFuncTypesList) { List bytesArrayList = new ArrayList<>(aggregateFuncTypesList.size()); - for(ByteString byteString:aggregateFuncTypesList){ + for (ByteString byteString : aggregateFuncTypesList) { bytesArrayList.add(byteString.toByteArray()); } return bytesArrayList; } - /** - * - * @param scan - * @return - */ public static Scan fromPBScan(ClientProtos.Scan scan) throws IOException { return ProtobufUtil.toScan(scan); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java deleted file mode 100755 index af213e9..0000000 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -//package eagle.storage.hbase.query.coprocessor; -// -//import eagle.log.entity.meta.EntityDefinition; -//import eagle.query.aggregate.AggregateFunctionType; -//import eagle.query.aggregate.raw.GroupbyKeyValue; -//import eagle.query.aggregate.raw.RawAggregator; -//import eagle.query.aggregate.timeseries.TimeSeriesAggregator; -//import eagle.storage.hbase.query.coprocessor.generated.AggregateProtos; -//import eagle.storage.hbase.query.coprocessor.impl.AbstractAggregateEndPoint; -//import hadoop.eagle.common.DateTimeUtil; -//import com.google.protobuf.RpcCallback; -//import com.google.protobuf.RpcController; -//import org.apache.hadoop.hbase.client.Scan; -//import org.apache.hadoop.hbase.protobuf.ResponseConverter; -//import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -//import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.io.IOException; -//import java.util.List; -// -///** -// * Coprocessor EndPoint of protocol AggregateProtocol -// * -// *
-// *

Deployment:

-// * -// * Firstly deploy jar files to cluster on local file system or HDFS.

-// * Secondly configure in hbase-site.xml as following: -// *
-// * <property>
-// *   <name>hbase.coprocessor.region.classes</name>
-// *   <value>AggregateProtocolEndPoint</value>
-// * </property>
-// * 
-// * Or register on related hbase tables -// *
-// * hbase(main):005:0>  alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs:///foo.jar|AggregateProtocolEndPoint|1001|'
-// * 
-// * -// *

Reference:

-// * -// * Coprocessor Introduction -// * (Authors: Trend Micro Hadoop Group: Mingjie Lai, Eugene Koontz, Andrew Purtell) -// *

-// * -// * @see AggregateProtocol -// * -//// * @since : 10/31/14,2014 -// */ -//@SuppressWarnings("unused") -//public class AggregateProtocolEndPoint extends AbstractAggregateEndPoint { -// private final static Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class); -// /** -// * -// * @param entityDefinition -// * @param scan -// * @param groupbyFields -// * @param aggregateFuncTypes -// * @param aggregatedFields -// * @return -// * @throws Exception -// */ -// @Override -// public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List groupbyFields, List aggregateFuncTypes, List aggregatedFields) throws IOException { -//// LOG.info("Using coprocessor instance: "+this); -// checkNotNull(entityDefinition, "entityDefinition"); -// String serviceName = entityDefinition.getService(); -// LOG.info(this.getLogHeader() +" raw group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields); -// if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON()); -// long _start = System.currentTimeMillis(); -// final RawAggregator aggregator = new RawAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,entityDefinition); -// InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator); -// -// List keyValues = aggregator.getGroupbyKeyValues(); -// AggregateResult result = new AggregateResult(); -// result.setKeyValues(keyValues); -// result.setStartTimestamp(report.getStartTimestamp()); -// result.setStopTimestamp(report.getStopTimestamp()); -// -// long _stop = System.currentTimeMillis(); -// LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start))); -// -// return result; -// } -// -// /** -// * TODO: refactor time series aggregator to remove dependency of business logic entity class -// * -// * @param entityDefinition -// * @param scan -// * @param groupbyFields -// * @param aggregateFuncTypes -// * @param aggregatedFields -// * @param intervalMin -// * @return -// * @throws Exception -// */ -// @Override -// public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List groupbyFields, List aggregateFuncTypes, List aggregatedFields, long startTime,long endTime,long intervalMin) throws IOException { -//// LOG.info("Using coprocessor instance: "+this); -// checkNotNull(entityDefinition, "entityDefinition"); -// String serviceName = entityDefinition.getService(); -// LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields + " intervalMin: " + intervalMin + -// " from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime) + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime)); -// if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON()); -// long _start = System.currentTimeMillis(); -// final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,startTime,endTime,intervalMin); -// InternalReadReport report = this.asyncStreamRead(entityDefinition, scan,aggregator); -// List keyValues = aggregator.getGroupbyKeyValues(); -// -// AggregateResult result = new AggregateResult(); -// result.setKeyValues(keyValues); -// result.setStartTimestamp(report.getStartTimestamp()); -// result.setStopTimestamp(report.getStopTimestamp()); -// -// long _stop = System.currentTimeMillis(); -// LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start))); -// -// return result; -// } -//} \ No newline at end of file