eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [07/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"
Date Thu, 19 Nov 2015 10:47:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyValue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyValue.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyValue.java
new file mode 100755
index 0000000..b7f2c43
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyValue.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+import org.apache.eagle.common.ByteUtil;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * <h3>Strucutre</h3>
+ * <pre>
+ * {
+ *   value: List[byte[],...],
+ *   meta : List[byte[],...] // byte[] may be serialized value
+ *                           // of any type of value like:
+ *                           // Integer,Double or Object
+ *                           // and so on
+ * }
+ * </pre>
+ *
+ * TODO: Add self-described serializer or deserializer for meta bytes array, so that any side of the RPC will know how to read/write meta information
+ *
+ * @since : 11/4/14,2014
+ */
+public class GroupbyValue implements Writable{
+	private final WritableList<DoubleWritable> value;
+	private WritableList<BytesWritable> meta;
+	private int initialCapacity=1;
+	public GroupbyValue(){
+		this(1);
+	}
+	/**
+	 * Constructs an empty list with the specified initial capacity.
+	 *
+	 * @param   initialCapacity   the initial capacity of the list
+	 * @exception IllegalArgumentException if the specified initial capacity
+	 *            is negative
+	 */
+	public GroupbyValue(int initialCapacity ){
+		this.initialCapacity = initialCapacity;
+		this.value = new WritableList<DoubleWritable>(DoubleWritable.class,this.initialCapacity);
+		this.meta = new WritableList<BytesWritable>(BytesWritable.class,this.initialCapacity);
+	}
+
+	public WritableList<DoubleWritable> getValue(){
+		return this.value;
+	}
+
+	public WritableList<BytesWritable> getMeta(){
+		return this.meta;
+	}
+
+	public DoubleWritable get(int index){
+		return this.value.get(index);
+	}
+
+	public BytesWritable getMeta(int index){
+		if(this.meta==null) return null;
+		return this.meta.get(index);
+	}
+
+	// Values
+	public void add(DoubleWritable value){
+		this.value.add(value);
+	}
+	public void add(Double value){
+		this.value.add(new DoubleWritable(value));
+	}
+
+	public void set(int index,DoubleWritable value){
+		this.value.set(index, value);
+	}
+
+	//////////////
+	// Meta
+	/////////////
+	public void addMeta(BytesWritable meta){
+		this.meta.add(meta);
+	}
+
+	public void addMeta(int meta){
+		this.meta.add(new BytesWritable(ByteUtil.intToBytes(meta)));
+	}
+
+	public void setMeta(int index,BytesWritable meta){
+		this.meta.set(index,meta);
+	}
+	public void setMeta(int index,int meta){
+		this.meta.set(index, new BytesWritable(ByteUtil.intToBytes(meta)));
+	}
+
+	/**
+	 * Serialize the fields of this object to <code>out</code>.
+	 *
+	 * @param out <code>DataOuput</code> to serialize this object into.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(this.initialCapacity);
+		this.value.write(out);
+		this.meta.write(out);
+	}
+
+	/**
+	 * Deserialize the fields of this object from <code>in</code>.
+	 * <p/>
+	 * <p>For efficiency, implementations should attempt to re-use storage in the
+	 * existing object where possible.</p>
+	 *
+	 * @param in <code>DataInput</code> to deseriablize this object from.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		this.initialCapacity = in.readInt();
+		this.value.readFields(in);
+		this.meta.readFields(in);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawAggregator.java
new file mode 100755
index 0000000..0468074
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawAggregator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+import org.apache.eagle.log.entity.QualifierCreationListener;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+public class RawAggregator implements QualifierCreationListener,GroupbyKeyAggregatable {
+	private List<String> groupbyFields;
+	private GroupbyKey key;
+	private static final byte[] UNASSIGNED = "unassigned".getBytes();
+	private RawGroupbyBucket bucket;
+
+	public RawAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFunctionTypes, List<String> aggregatedFields, EntityDefinition ed){
+		this.groupbyFields = groupbyFields;
+		key = new GroupbyKey();
+		bucket = new RawGroupbyBucket(aggregateFunctionTypes, aggregatedFields, ed);
+	}
+
+	@Override
+	public void qualifierCreated(Map<String, byte[]> qualifiers){
+		key.clear();
+		ListIterator<String> it = groupbyFields.listIterator();
+		while(it.hasNext()){
+			byte[] groupbyFieldValue = qualifiers.get(it.next());
+			if(groupbyFieldValue == null){
+				key.addValue(UNASSIGNED);
+			}else{
+				key.addValue(groupbyFieldValue);
+			}
+		}
+		GroupbyKey newKey = null;
+		if(bucket.exists(key)){
+			newKey = key;
+		}else{
+			newKey = new GroupbyKey(key);
+		}
+		
+		bucket.addDatapoint(newKey, qualifiers);
+	}
+
+	/**
+	 * @return
+	 */
+	public Map<List<String>, List<Double>> result(){
+		return bucket.result();
+	}
+
+	public List<GroupbyKeyValue> getGroupbyKeyValues(){
+		return bucket.groupbyKeyValues();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawGroupbyBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawGroupbyBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawGroupbyBucket.java
new file mode 100755
index 0000000..47b84a0
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawGroupbyBucket.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+import org.apache.eagle.log.entity.EntityQualifierUtils;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.apache.eagle.log.expression.ExpressionParser;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.apache.eagle.query.parser.TokenConstant;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class RawGroupbyBucket {
+	private final static Logger LOG = LoggerFactory.getLogger(RawGroupbyBucket.class);
+
+	private List<String> aggregatedFields;
+	private EntityDefinition entityDefinition;
+
+	
+	private List<AggregateFunctionType> types;
+	private SortedMap<GroupbyKey, List<Function>> group2FunctionMap =
+			new TreeMap<GroupbyKey, List<Function>>(new GroupbyKeyComparator());
+
+	public RawGroupbyBucket(List<AggregateFunctionType> types, List<String> aggregatedFields, EntityDefinition ed){
+		this.types = types;
+		this.aggregatedFields = aggregatedFields;
+		this.entityDefinition = ed;
+	}
+
+	public boolean exists(GroupbyKey key){
+		return group2FunctionMap.containsKey(key);
+	}
+
+	public void addDatapoint(GroupbyKey groupbyKey, Map<String, byte[]> values){
+		// locate groupby bucket
+		List<Function> functions = group2FunctionMap.get(groupbyKey);
+		if(functions == null){
+			functions = new ArrayList<Function>();
+			for(AggregateFunctionType type : types){
+				FunctionFactory ff = FunctionFactory.locateFunctionFactory(type);
+				if(ff == null){
+					LOG.error("FunctionFactory of AggregationFunctionType:"+type+" is null");
+				}else{
+					functions.add(ff.createFunction());
+				}
+			}
+			group2FunctionMap.put(groupbyKey, functions);
+		}
+		ListIterator<Function> e1 = functions.listIterator();
+		ListIterator<String> e2 = aggregatedFields.listIterator();
+		while(e1.hasNext() && e2.hasNext()){
+			Function f = e1.next();
+			String aggregatedField = e2.next();
+			byte[] v = values.get(aggregatedField);
+			if(f instanceof Function.Count){ // handle count
+				if(entityDefinition.getMetricDefinition()==null) {
+					f.run(1.0);
+					continue;
+				}else if(v == null){
+					aggregatedField = GenericMetricEntity.VALUE_FIELD;
+					v = values.get(aggregatedField);
+				}
+			}
+			if(v != null){
+				Qualifier q = entityDefinition.getDisplayNameMap().get(aggregatedField);
+				EntitySerDeser<?> serDeser = q.getSerDeser();
+				// double d = 0.0;
+				if(serDeser instanceof IntSerDeser){
+					double d= (Integer)serDeser.deserialize(v);
+					f.run(d);
+				}else if(serDeser instanceof LongSerDeser){
+					double d = (Long)serDeser.deserialize(v);
+					f.run(d);
+				}else if(serDeser instanceof DoubleSerDeser){
+					double d = (Double)serDeser.deserialize(v);
+					f.run(d);
+				// TODO: support numeric array type that is not metric
+				}else if(serDeser instanceof DoubleArraySerDeser){
+					double[] d = ((DoubleArraySerDeser) serDeser).deserialize(v);
+					if(f instanceof Function.Count){
+						f.run(d.length);
+					} else {
+						for(double i:d) f.run(i);
+					}
+				}else if(serDeser instanceof IntArraySerDeser){
+					int[] d = ((IntArraySerDeser) serDeser).deserialize(v);
+					if(f instanceof Function.Count){
+						f.run(d.length);
+					}else{
+						for(int i:d) f.run(i);
+					}
+				}else{
+					if(LOG.isDebugEnabled()) LOG.debug("EntitySerDeser of field "+aggregatedField+" is not IntSerDeser or LongSerDeser or DoubleSerDeser or IntArraySerDeser or DoubleArraySerDeser, default as 0.0");
+				}
+			}else if(TokenConstant.isExpression(aggregatedField)){
+				String expression = TokenConstant.parseExpressionContent(aggregatedField);
+				try {
+					Map<String,Double> doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(values, entityDefinition);
+					if(entityDefinition.getMetricDefinition() == null) {
+						double value = ExpressionParser.eval(expression,doubleMap);
+						// LOG.info("DEBUG: Eval "+expression +" = "+value);
+						f.run(value);
+					}else{
+						Qualifier qualifier = entityDefinition.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
+						EntitySerDeser _serDeser = qualifier.getSerDeser();
+						byte[] valueBytes = values.get(GenericMetricEntity.VALUE_FIELD);
+						if( _serDeser instanceof DoubleArraySerDeser){
+							double[] d = (double[]) _serDeser.deserialize(valueBytes);
+							if(f instanceof Function.Count) {
+								f.run(d.length);
+							}else{
+								for(double i:d){
+									doubleMap.put(GenericMetricEntity.VALUE_FIELD,i);
+									f.run(ExpressionParser.eval(expression, doubleMap));
+								}
+							}
+						}else if(_serDeser instanceof IntArraySerDeser){
+							int[] d = (int[]) _serDeser.deserialize(valueBytes);
+							if(f instanceof Function.Count) {
+								f.run(d.length);
+							}else {
+								for (double i : d) {
+									doubleMap.put(GenericMetricEntity.VALUE_FIELD, i);
+									f.run(ExpressionParser.eval(expression, doubleMap));
+								}
+							}
+						}else{
+							double value = ExpressionParser.eval(expression,doubleMap);
+							f.run(value);
+						}
+					}
+				} catch (Exception e) {
+					LOG.error("Got exception to evaluate expression: "+expression+", exception: "+e.getMessage(),e);
+				}
+			}
+		}
+	}
+
+	/**
+	 * expensive operation - create objects and format the result
+	 * @return
+	 */
+	public List<GroupbyKeyValue> groupbyKeyValues(){
+		List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>();
+		for(Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()){
+			GroupbyValue value = new GroupbyValue();
+			for(Function f : entry.getValue()){
+				value.add(new DoubleWritable(f.result()));
+				value.addMeta(f.count());
+			}
+			results.add(new GroupbyKeyValue(entry.getKey(),value));
+		}
+		return results;
+	}
+
+	/**
+	 * expensive operation - create objects and format the result
+	 * @return
+	 */
+	public Map<List<String>, List<Double>> result(){
+		Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
+		for(Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()){
+			List<Double> values = new ArrayList<Double>();
+			for(Function f : entry.getValue()){
+				values.add(f.result());
+			}
+			GroupbyKey key = entry.getKey();
+			List<BytesWritable> list1 = key.getValue();
+			List<String> list2 = new ArrayList<String>();
+			for(BytesWritable e : list1){
+				list2.add(new String(e.copyBytes()));
+			}
+			result.put(list2, values);
+		}
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/WritableList.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/WritableList.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/WritableList.java
new file mode 100755
index 0000000..f9932a5
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/WritableList.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.ParameterizedType;
+import java.util.ArrayList;
+
+/**
+ * @since : 11/6/14,2014
+ */
+public class WritableList<E extends Writable> extends ArrayList<E> implements Writable{
+	private Class<E> itemTypeClass;
+
+	public WritableList(Class<E> typeClass){
+		this.itemTypeClass = typeClass;
+	}
+
+	public WritableList(Class<E> typeClass,int initialCapacity){
+		super(initialCapacity);
+		this.itemTypeClass = typeClass;
+	}
+
+
+	/**
+	 * <h3> Get item class by </h3>
+	 * <pre>
+	 * (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+	 * </pre>
+	 */
+	@Deprecated
+	public WritableList(){
+		this.itemTypeClass = (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+	}
+
+	private void check() throws IOException{
+		if(this.itemTypeClass == null){
+			throw new IOException("Class Type of WritableArrayList<E extends Writable> is null");
+		}
+	}
+
+	public Class<E> getItemClass(){
+		return itemTypeClass;
+	}
+
+	/**
+	 * Serialize the fields of this object to <code>out</code>.
+	 *
+	 * @param out <code>DataOuput</code> to serialize this object into.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void write(DataOutput out) throws IOException {
+		this.check();
+		out.writeInt(this.size());
+		for(Writable item: this){
+			item.write(out);
+		}
+	}
+
+	/**
+	 * Deserialize the fields of this object from <code>in</code>.
+	 * <p/>
+	 * <p>For efficiency, implementations should attempt to re-use storage in the
+	 * existing object where possible.</p>
+	 *
+	 * @param in <code>DataInput</code> to deseriablize this object from.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		this.check();
+		int size = in.readInt();
+		for(int i=0;i<size;i++){
+			try {
+				E item = itemTypeClass.newInstance();
+				item.readFields(in);
+				this.add(item);
+			} catch (InstantiationException e) {
+				throw new IOException("Got exception to create instance for class: "+itemTypeClass+": "+e.getMessage(),e);
+			} catch (IllegalAccessException e) {
+				throw new IOException("Got exception to create instance for class: "+itemTypeClass+": "+e.getMessage(),e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/AbstractAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/AbstractAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/AbstractAggregator.java
new file mode 100755
index 0000000..0837382
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/AbstractAggregator.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.EntityCreationListener;
+import org.apache.eagle.log.expression.ExpressionParser;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.apache.eagle.query.aggregate.IllegalAggregateFieldTypeException;
+import org.apache.eagle.query.parser.TokenConstant;
+import org.apache.commons.beanutils.PropertyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.beans.PropertyDescriptor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractAggregator implements Aggregator, EntityCreationListener{
+	private final static Logger LOG = LoggerFactory.getLogger(AbstractAggregator.class);
+
+	private static final String UNASSIGNED = "unassigned";
+	protected List<String> groupbyFields;
+	protected List<AggregateFunctionType> aggregateFunctionTypes;
+	protected List<String> aggregatedFields;
+	// a cache to know immediately if groupby field should come from tags(true) or qualifiers(false)
+	private Boolean[] _groupbyFieldPlacementCache;
+	private Method[] _aggregateFieldReflectedMethodCache;
+
+	public AbstractAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
+		this.groupbyFields = groupbyFields;
+		this.aggregateFunctionTypes = aggregateFuntionTypes;
+		this.aggregatedFields = aggregatedFields;
+		_aggregateFieldReflectedMethodCache = new Method[this.aggregatedFields.size()];
+		_groupbyFieldPlacementCache = new Boolean[this.groupbyFields.size()];
+	}
+	
+	@Override
+	public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
+		accumulate(entity);
+	}
+	
+	public abstract Object result();
+	
+	protected String createGroupFromTags(TaggedLogAPIEntity entity, String groupbyField, int i){
+		String groupbyFieldValue = entity.getTags().get(groupbyField);
+		if(groupbyFieldValue != null){
+			_groupbyFieldPlacementCache[i] = true;
+			return groupbyFieldValue;
+		}
+		return null;
+	}
+	
+	protected String createGroupFromQualifiers(TaggedLogAPIEntity entity, String groupbyField, int i){
+		try{
+			PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, groupbyField);
+			if(pd == null)
+				return null;
+//			_groupbyFieldPlacementCache.put(groupbyField, false);
+			_groupbyFieldPlacementCache[i] = false;
+			return (String)(pd.getReadMethod().invoke(entity));
+		}catch(NoSuchMethodException ex){
+			return null;
+		}catch(InvocationTargetException ex){
+			return null;
+		}catch(IllegalAccessException ex){
+			return null;
+		}
+	}
+	
+	protected String determineGroupbyFieldValue(TaggedLogAPIEntity entity, String groupbyField, int i){
+		Boolean placement = _groupbyFieldPlacementCache[i];
+		String groupbyFieldValue = null; 
+		if(placement != null){
+			groupbyFieldValue = placement.booleanValue() ? createGroupFromTags(entity, groupbyField, i) : createGroupFromQualifiers(entity, groupbyField, i); 
+		}else{
+			groupbyFieldValue = createGroupFromTags(entity, groupbyField, i);
+			if(groupbyFieldValue == null){
+				groupbyFieldValue = createGroupFromQualifiers(entity, groupbyField, i);
+			}
+		}
+		groupbyFieldValue = (groupbyFieldValue == null ? UNASSIGNED : groupbyFieldValue);
+		return groupbyFieldValue;
+	}
+	
+	/**
+	 * TODO For count aggregation, special treatment is the value is always 0 unless we support count(*) or count(<fieldname>) which counts number of rows or 
+	 * number of non-null field
+	 * For other aggregation, like sum,min,max,avg, we should resort to qualifiers
+	 * @param entity
+	 * @return
+	 */
+	protected List<Double> createPreAggregatedValues(TaggedLogAPIEntity entity) throws Exception{
+		List<Double> values = new ArrayList<Double>();
+		int functionIndex = 0;
+		for(AggregateFunctionType type : aggregateFunctionTypes){
+			if(type.name().equals(AggregateFunctionType.count.name())){
+				values.add(new Double(1));
+			}else{
+				// find value in qualifier by checking java bean
+				String aggregatedField = aggregatedFields.get(functionIndex);
+				if(TokenConstant.isExpression(aggregatedField)){
+					try {
+						String expr = TokenConstant.parseExpressionContent(aggregatedField);
+						values.add(ExpressionParser.eval(expr, entity));
+					}catch (Exception ex){
+						LOG.error("Failed to evaluate expression-based aggregation: " + aggregatedField, ex);
+						throw ex;
+					}
+				}else {
+					try {
+						Method m = _aggregateFieldReflectedMethodCache[functionIndex];
+						if (m == null) {
+//						pd = PropertyUtils.getPropertyDescriptor(entity, aggregatedField);
+//						if (pd == null) {
+//							final String errMsg = "Field/tag " + aggregatedField + " is not defined for entity " + entity.getClass().getSimpleName();
+//							logger.error(errMsg);
+//							throw new Exception(errMsg);
+//						}
+//						Object obj = pd.getReadMethod().invoke(entity);
+							String tmp = aggregatedField.substring(0, 1).toUpperCase() + aggregatedField.substring(1);
+							m = entity.getClass().getMethod("get" + tmp);
+							_aggregateFieldReflectedMethodCache[functionIndex] = m;
+						}
+						Object obj = m.invoke(entity);
+						values.add(numberToDouble(obj));
+					} catch (Exception ex) {
+						LOG.error("Cannot do aggregation for field " + aggregatedField, ex);
+						throw ex;
+					}
+				}
+			}
+			functionIndex++;
+		}
+		return values;
+	}
+	
+	/**
+	 * TODO this is a hack, we need elegant way to convert type to a broad precision
+     *
+	 * @param obj
+	 * @return
+	 */
+	protected Double numberToDouble(Object obj){
+		if(obj instanceof Double)
+			return (Double)obj;
+		if(obj instanceof Integer){
+			return new Double(((Integer)obj).doubleValue());
+		}
+		if(obj instanceof Long){
+			return new Double(((Long)obj).doubleValue());
+		}
+		// TODO hack to support string field for demo purpose, should be removed
+		if(obj == null){
+			return new Double(0.0);
+		}
+		if(obj instanceof String){
+			try{
+				return new Double((String)obj);
+			}catch(Exception ex){
+				LOG.warn("Datapoint ignored because it can not be converted to correct number for " + obj, ex);
+				return new Double(0.0);
+			}
+		}
+		
+		throw new IllegalAggregateFieldTypeException(obj.getClass().toString() + " type is not support. The aggregated field must be numeric type, int, long or double");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/Aggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/Aggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/Aggregator.java
new file mode 100755
index 0000000..1e70e91
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/Aggregator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+public interface Aggregator {
+    /**
+     * Accumulate callback
+     *
+     * @param entity accumulated entity instance
+     * @throws Exception
+     */
+	public void accumulate(TaggedLogAPIEntity entity) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java
new file mode 100644
index 0000000..7e35bec
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import org.apache.eagle.log.entity.EntityCreationListener;
+
+public class EntityCreationListenerFactory {
+	public static EntityCreationListener synchronizedEntityCreationListener(EntityCreationListener listener){
+		return new SynchronizedEntityCreationListener(listener);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
new file mode 100755
index 0000000..e12fea3
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+
+/**
+ * Not thread safe
+ */
+public class FlatAggregator extends AbstractAggregator{
+	protected GroupbyBucket bucket;
+
+    /**
+     * @param groupbyFields
+     * @param aggregateFuntionTypes
+     * @param aggregatedFields
+     */
+	public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
+		super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+		bucket = new GroupbyBucket(this.aggregateFunctionTypes);
+	}
+	
+	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
+		List<String> groupbyFieldValues = createGroup(entity);
+		List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+		bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
+	}
+	
+	public Map<List<String>, List<Double>> result(){
+		return bucket.result(); 
+	}
+	
+	protected List<String> createGroup(TaggedLogAPIEntity entity){
+		List<String> groupbyFieldValues = new ArrayList<String>();
+		int i = 0;
+		for(String groupbyField : groupbyFields){
+			String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++);
+			groupbyFieldValues.add(groupbyFieldValue);
+		}
+		return groupbyFieldValues;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
new file mode 100755
index 0000000..ea57edb
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import org.apache.eagle.query.QueryConstants;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.apache.eagle.query.aggregate.raw.GroupbyKey;
+import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
+import org.apache.eagle.query.aggregate.raw.GroupbyValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GroupbyBucket {
+	private final static Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class);
+	
+	public static Map<String, FunctionFactory> _functionFactories = 
+			new HashMap<>();
+    
+	// TODO put this logic to AggregatorFunctionType
+	static{
+		_functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
+		_functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
+		_functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
+		_functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
+		_functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
+	}
+	
+	private List<AggregateFunctionType> types;
+//	private SortedMap<List<String>, List<Function>> group2FunctionMap = 
+//			new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator());
+	
+	private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator());
+	
+	public GroupbyBucket(List<AggregateFunctionType> types){
+		this.types = types;
+	}
+	
+	public void addDatapoint(List<String> groupbyFieldValues, List<Double> values){
+		// LOG.info("DEBUG: addDatapoint: groupby=["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]");
+		
+		// locate groupby bucket
+		List<Function> functions = group2FunctionMap.get(groupbyFieldValues);
+		if(functions == null){
+			functions = new ArrayList<Function>();
+			for(AggregateFunctionType type : types){
+				functions.add(_functionFactories.get(type.name()).createFunction());
+			}
+			group2FunctionMap.put(groupbyFieldValues, functions);
+		}
+		int functionIndex = 0;
+		for(Double v : values){
+			functions.get(functionIndex).run(v);
+			functionIndex++;
+		}
+	}
+	
+	public Map<List<String>, List<Double>> result(){
+		Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
+		for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
+			List<Double> values = new ArrayList<Double>();
+			for(Function f : entry.getValue()){
+				values.add(f.result());
+			}
+			result.put(entry.getKey(), values);
+		}
+		return result;
+	}
+
+	public List<GroupbyKeyValue> getGroupbyKeyValue(){
+		List<GroupbyKeyValue>  results = new ArrayList<GroupbyKeyValue>();
+		
+		for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
+			GroupbyKey key = new GroupbyKey();
+			for(String keyStr:entry.getKey()){
+				try {
+					key.addValue(keyStr.getBytes(QueryConstants.CHARSET));
+				} catch (UnsupportedEncodingException e) {
+					LOG.error(e.getMessage(),e);
+				}
+			}
+			GroupbyValue value = new GroupbyValue();
+			for(Function f : entry.getValue()){
+				value.add(f.result());
+				value.addMeta(f.count());
+			}
+			results.add(new GroupbyKeyValue(key,value));
+		}
+		
+		return results;
+	}
+	
+	public static interface FunctionFactory{
+		public Function createFunction();
+	}
+	
+	public static abstract class Function{
+		protected int count;
+
+		public abstract void run(double v);
+		public abstract double result();
+		public int count(){
+			return count;
+		}
+		public void incrCount(){
+			count ++;
+		}
+	}
+
+	private static class CountFactory implements FunctionFactory{
+		@Override
+		public Function createFunction(){
+			return new Count();
+		}
+	}
+	
+	
+	private static class Count extends Sum{
+		public Count(){
+			super();
+		}
+	}
+	
+	private static class SumFactory implements FunctionFactory{
+		@Override
+		public Function createFunction(){
+			return new Sum();
+		}
+	}
+	
+	private static class Sum extends Function{
+		private double summary;
+		public Sum(){
+			this.summary = 0.0;
+		}
+		@Override
+		public void run(double v){
+			this.incrCount();
+			this.summary += v;
+		}
+		
+		@Override
+		public double result(){
+			return this.summary;
+		}
+	}
+	
+	private static class MinFactory implements FunctionFactory{
+		@Override
+		public Function createFunction(){
+			return new Min();
+		}
+	}
+	public static class Min extends Function{
+		private double minimum;
+		public Min(){
+			// TODO is this a bug, or only positive numeric calculation is supported
+			this.minimum = Double.MAX_VALUE;
+		}
+
+		@Override
+		public void run(double v){
+			if(v < minimum){
+				minimum = v;
+			}
+			this.incrCount();
+		}
+		
+		@Override
+		public double result(){
+			return minimum;
+		}
+	}
+	
+	private static class MaxFactory implements FunctionFactory{
+		@Override
+		public Function createFunction(){
+			return new Max();
+		}
+	}
+	public static class Max extends Function{
+		private double maximum;
+		public Max(){
+			// TODO is this a bug, or only positive numeric calculation is supported
+			this.maximum = 0.0;
+		}
+		@Override
+		public void run(double v){
+			if(v > maximum){
+				maximum = v;
+			}
+			this.incrCount();
+		}
+		
+		@Override
+		public double result(){
+			return maximum;
+		}
+	}
+	
+	private static class AvgFactory implements FunctionFactory{
+		@Override
+		public Function createFunction(){
+			return new Avg();
+		}
+	}
+	public static class Avg extends Function{
+		private double total;
+		public Avg(){
+			this.total = 0.0;
+		}
+		@Override
+		public void run(double v){
+			total += v;
+			this.incrCount();
+		}
+		@Override
+		public double result(){
+			return this.total/this.count;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
new file mode 100644
index 0000000..6635483
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * this is default comparator for aggregation. The behavior is to sort by groupby fields ascendantly
+ */
+public class GroupbyFieldsComparator implements Comparator<List<String>>{
+	@Override 
+    public int compare(List<String> list1, List<String> list2){
+		if(list1 == null || list2 == null || list1.size() != list2.size())
+			throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
+		int r = 0;
+		int index = 0;
+		for(String s1 : list1){
+			r = s1.compareTo(list2.get(index++));
+			if(r != 0)
+				return r;
+		}
+		return r;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
new file mode 100644
index 0000000..9e78233
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+public class HierarchicalAggregateEntity {
+	private String key;
+	private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>();
+	private List<Double> values = new ArrayList<Double>();
+	private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>();
+	private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null;
+
+	public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() {
+		return sortedList;
+	}
+	public void setSortedList(
+			SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) {
+		this.sortedList = sortedList;
+	}
+	public List<GroupbyBucket.Function> getTmpValues() {
+		return tmpValues;
+	}
+	public void setTmpValues(List<GroupbyBucket.Function> tmpValues) {
+		this.tmpValues = tmpValues;
+	}
+	public String getKey() {
+		return key;
+	}
+	public void setKey(String key) {
+		this.key = key;
+	}
+	public List<Double> getValues() {
+		return values;
+	}
+	public void setValues(List<Double> values) {
+		this.values = values;
+	}
+	public SortedMap<String, HierarchicalAggregateEntity> getChildren() {
+		return children;
+	}
+	public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) {
+		this.children = children;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
new file mode 100755
index 0000000..ecb80ac
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import java.util.List;
+import java.util.SortedMap;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+
+public class HierarchicalAggregator extends AbstractAggregator{
+	private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity();
+
+	public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
+		super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+	}
+
+	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
+		List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+		// aggregate to root first
+		addDatapoint(root, preAggregatedValues);
+		// go through hierarchical tree
+		HierarchicalAggregateEntity current = root;
+		int i = 0;
+		for(String groupbyField : groupbyFields){
+			// determine groupbyFieldValue from tag or fields
+			String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i);
+			SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren();
+			if(children.get(groupbyFieldValue) == null){
+				HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity();
+				children.put(groupbyFieldValue, tmp);
+			}
+			children.get(groupbyFieldValue).setKey(groupbyFieldValue);
+			addDatapoint(children.get(groupbyFieldValue), preAggregatedValues);
+			current = children.get(groupbyFieldValue);
+		}
+	}
+
+	private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values){
+		List<GroupbyBucket.Function> functions = entity.getTmpValues();
+		// initialize list of function
+		if(functions.isEmpty()){
+			for(AggregateFunctionType type : aggregateFunctionTypes){
+				functions.add(GroupbyBucket._functionFactories.get(type.name()).createFunction());
+			}
+		}
+		int functionIndex = 0;
+		for(Double v : values){
+			functions.get(functionIndex).run(v);
+			functionIndex++;
+		}
+	}
+
+	private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity){
+		for(GroupbyBucket.Function f : entity.getTmpValues()){
+			entity.getValues().add(f.result());
+		}
+		for(HierarchicalAggregateEntity child : entity.getChildren().values()){
+			finalizeHierarchicalAggregateEntity(child);
+		}
+		entity.setTmpValues(null);
+	}
+
+	public HierarchicalAggregateEntity result(){
+		finalizeHierarchicalAggregateEntity(root);
+		return this.root;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
new file mode 100644
index 0000000..f62d2c2
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class PostFlatAggregateSort {
+	private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) {
+	    SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions));
+	    sortedEntries.addAll(map.entrySet());
+	    return sortedEntries;
+	}
+
+	/**
+	 * sort aggregated results with sort options
+	 * @param aggregatedResult aggregated result set, but it is not sorted
+	 * @sortOptions sorting options
+	 * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned
+	 */
+	public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN){
+		SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions);
+		List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>();
+		for (Map.Entry<List<String>, List<Double>> entry : allList) {
+			result.add(entry);
+			if (topN > 0 && result.size() >= topN) {
+				break;
+			}
+		}
+		return result;
+	}
+
+	private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>>{
+		private List<SortOption> sortOptions;
+		public MapEntryComparator(List<SortOption> sortOptions){
+			this.sortOptions = sortOptions;
+		}
+		/**
+		 * default to sort by all groupby fields
+		 */
+		@Override
+        public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2){
+			int r = 0;
+			List<String> keyList1 = e1.getKey();
+			List<Double> valueList1 = e1.getValue();
+			List<String> keyList2 = e2.getKey();
+			List<Double> valueList2 = e2.getValue();
+			for(SortOption so : sortOptions){
+				int index = so.getIndex();
+				if (index == -1) {
+					continue;
+				}
+				if(!so.isInGroupby()){  // sort fields come from functions
+					Double value1 = valueList1.get(index);
+					Double value2 = valueList2.get(index);
+					r = value1.compareTo(value2);
+				}else{  // sort fields come from groupby fields
+					String key1 = keyList1.get(index);
+					String key2 = keyList2.get(index);
+					r = key1.compareTo(key2);
+				}
+				if(r == 0) continue;
+				if(!so.isAscendant()){
+					r = -r;
+				}
+				return r;
+			}
+			// default to sort by groupby fields ascendently
+			if(r ==0){ // TODO is this check necessary
+				return new GroupbyFieldsComparator().compare(keyList1, keyList2);
+			}
+			return r;
+        }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
new file mode 100644
index 0000000..7b0997b
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class PostHierarchicalAggregateSort {
+
+	private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) {
+	    SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions));
+	    sortedEntries.addAll(entity.getChildren().entrySet());
+	    return sortedEntries;
+	}
+
+	/**
+	 * sort aggregated results with sort options
+     *
+     * @param result
+     * @param sortOptions
+     * @return
+     */
+	public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions){
+		SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions);
+		result.setSortedList(tmp);
+		result.setChildren(null);
+		for(Map.Entry<String, HierarchicalAggregateEntity> entry : tmp){
+			sort(entry.getValue(), sortOptions);
+		}
+		return result;
+	}
+
+	private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>>{
+		private List<SortOption> sortOptions;
+
+		public MapEntryComparator(List<SortOption> sortOptions){
+			this.sortOptions = sortOptions;
+		}
+
+		/**
+		 * default to sort by all groupby fields
+		 */
+		@Override
+        public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2){
+			int r = 0;
+			String key1 = e1.getKey();
+			List<Double> valueList1 = e1.getValue().getValues();
+			String key2 = e2.getKey();
+			List<Double> valueList2 = e2.getValue().getValues();
+			for(SortOption so : sortOptions){
+				int index = so.getIndex();
+				if (index == -1) {
+					continue;
+				}
+				if(!so.isInGroupby()){  // sort fields come from functions
+					Double value1 = valueList1.get(index);
+					Double value2 = valueList2.get(index);
+					r = value1.compareTo(value2);
+				}  
+				// sort fields come from groupby fields, then silently ignored
+				
+				if(r == 0) continue;
+				if(!so.isAscendant()){
+					r = -r;
+				}
+				return r;
+			}
+			// default to sort by groupby fields ascendently
+			if(r ==0){
+				return key1.compareTo(key2);
+			}
+			return r;
+        }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
new file mode 100644
index 0000000..d1578ac
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+/**
+ * sum(field1), max(field2) groupby(field3, field4) sort by field1 asc, field3 desc
+ * There are 2 SortOption object, then
+ * the 1st one is inGroupby=false, index=0, ascendent=true
+ * the 2nd one is inGroupby=true, index=1, ascendent=false
+ *
+ */
+public class SortOption {
+	private boolean inGroupby; // sort field defaultly is not from groupby fields 
+	private int index; // index relative to list of groupby fields or list of functions
+	private boolean ascendant; //asc or desc
+
+	public boolean isInGroupby() {
+		return inGroupby;
+	}
+	public void setInGroupby(boolean inGroupby) {
+		this.inGroupby = inGroupby;
+	}
+	public int getIndex() {
+		return index;
+	}
+	public void setIndex(int index) {
+		this.index = index;
+	}
+	public boolean isAscendant() {
+		return ascendant;
+	}
+	public void setAscendant(boolean ascendant) {
+		this.ascendant = ascendant;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
new file mode 100644
index 0000000..1360e0c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SortOptionsParser {
+	private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class);
+	private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$");
+		
+	public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields){
+		List<SortOption> list = new ArrayList<SortOption>();
+		for(String sortOption : sortOptions){
+			Matcher m = pattern.matcher(sortOption);
+			if(!m.find()){
+				throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc");
+			}
+			String field = m.group(1);
+			if (sortFields != null) {
+				sortFields.add(field);
+			}
+			SortOption so = new SortOption();
+			list.add(so);
+			so.setAscendant(m.group(2).equals("asc") ? true : false);
+			int index = aggregatedFields.indexOf(field); 
+			if(index > -1){
+				so.setInGroupby(false);
+				so.setIndex(index);
+				continue;
+			}
+			if(groupbyFields != null){  // if groupbyFields is not provided, ignore this sort field
+				index = groupbyFields.indexOf(field);
+				if(index > -1){
+					so.setInGroupby(true);
+					so.setIndex(index);
+					continue;
+				}
+			}
+			logNonExistingSortByField(field);
+			so.setInGroupby(false);
+			so.setIndex(-1);
+		}
+		return list;
+	}
+	
+	private static void logNonExistingSortByField(String sortByField){
+		LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
new file mode 100755
index 0000000..d8b781e
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+public class SynchronizedAggregator implements Aggregator{
+	private Object mutex = new Object();
+	private Aggregator agg;
+	
+	public SynchronizedAggregator(Aggregator agg){
+		this.agg = agg;
+	}
+	
+	@Override
+	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
+		synchronized(mutex){
+			agg.accumulate(entity);
+		}
+	}
+}	

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
new file mode 100644
index 0000000..7c1412e
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.EntityCreationListener;
+
+public class SynchronizedEntityCreationListener implements EntityCreationListener{
+	private Object mutex = new Object();
+	private EntityCreationListener listener;
+	
+	public SynchronizedEntityCreationListener(EntityCreationListener listener){
+		this.listener = listener;
+	}
+	
+	@Override
+	public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
+		synchronized(mutex){
+			listener.entityCreated(entity);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
new file mode 100755
index 0000000..5bebe13
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.apache.eagle.query.aggregate.raw.GroupbyKeyAggregatable;
+import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would 
+ * save memory for holding all the data in the memory
+ *
+ * <h3>Aggregate Bucket Structure</h3>
+ * <pre>
+ * {
+ *  ["key<SUB>1</SUB>","key<SUB>2</SUB>",...,(entity.getTimestamp() - startTime)/intervalms]:[value<SUB>1</SUB>,value<SUB>2</SUB>,...,value<SUB>n</SUB>]
+ * }
+ * </pre>
+ *
+ */
+public class TimeSeriesAggregator extends FlatAggregator implements GroupbyKeyAggregatable {
+	private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class);
+	private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000;
+	private long startTime;
+	private long endTime;
+	private long intervalms;
+	private int numFunctions;
+	private int ignoredEntityCounter = 0;
+	
+	public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields,
+			long startTime, long endTime, long intervalms){
+		super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+		// guard to avoid too many data points returned
+//		validateTimeRange(startTime, endTime, intervalms);
+		this.startTime = startTime;
+		this.endTime = endTime;
+		this.intervalms = intervalms;
+		this.numFunctions = aggregateFuntionTypes.size();
+	}
+
+//	@Deprecated
+//	public static void validateTimeRange(long startTime, long endTime, long intervalms){
+//		if(startTime >= endTime || intervalms <= 0){
+//			throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms);
+//		}
+//		if((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT){
+//			throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT + ", current # of datapoints is " + (endTime-startTime)/intervalms);
+//		}
+//	}
+	
+	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
+		List<String> groupbyFieldValues = createGroup(entity);
+		// TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side
+		// guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime
+		if(entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime){
+			if(LOG.isDebugEnabled()) LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime);
+			this.ignoredEntityCounter ++;
+			return;
+		}
+		// time series bucket index
+		long located =(entity.getTimestamp() - startTime)/intervalms; 
+		groupbyFieldValues.add(String.valueOf(located));
+		List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+		bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
+	}
+	
+	public Map<List<String>, List<Double>> result(){
+		if(this.ignoredEntityCounter > 0)
+			LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
+		return bucket.result();
+	}
+
+	/**
+	 * Support new aggregate result
+	 *
+	 * @return
+	 */
+	@Override
+	public List<GroupbyKeyValue> getGroupbyKeyValues(){
+		if(this.ignoredEntityCounter > 0)
+			LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
+		return bucket.getGroupbyKeyValue();
+	}
+	
+	public Map<List<String>, List<double[]>> getMetric(){
+		// groupbyfields+timeseriesbucket --> aggregatedvalues for different function
+		Map<List<String>, List<Double>> result = bucket.result();
+//		Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
+//		/**
+//		 * bug fix: startTime is inclusive and endTime is exclusive
+//		 */
+////		int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
+//		int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
+//		for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
+//			// get groups
+//			List<String> groupbyFields = entry.getKey();
+//			List<String> copy = new ArrayList<String>(groupbyFields);
+//			String strTimeseriesIndex = copy.remove(copy.size()-1);
+//			List<double[]> functionValues = timeseriesDatapoints.get(copy);
+//			if(functionValues == null){
+//				functionValues = new ArrayList<double[]>();
+//				timeseriesDatapoints.put(copy, functionValues);
+//				for(int i=0; i<numFunctions; i++){
+//					functionValues.add(new double[numDatapoints]);
+//				}
+//			}
+//			int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
+//			int functionIndex = 0;
+//			for(double[] values : functionValues){
+//				values[timeseriesIndex] = entry.getValue().get(functionIndex);
+//				functionIndex++;
+//			}
+//		}
+//		return timeseriesDatapoints;
+		return toMetric(result,(int)((endTime-1-startTime)/intervalms + 1),this.numFunctions);
+	}
+
+	public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions){
+		Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
+		/**
+		 * bug fix: startTime is inclusive and endTime is exclusive
+		 */
+//		int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
+//		int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
+		for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
+			// get groups
+			List<String> groupbyFields = entry.getKey();
+			List<String> copy = new ArrayList<String>(groupbyFields);
+			String strTimeseriesIndex = copy.remove(copy.size()-1);
+			List<double[]> functionValues = timeseriesDatapoints.get(copy);
+			if(functionValues == null){
+				functionValues = new ArrayList<double[]>();
+				timeseriesDatapoints.put(copy, functionValues);
+				for(int i=0; i<numFunctions; i++){
+					functionValues.add(new double[numDatapoints]);
+				}
+			}
+			int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
+			int functionIndex = 0;
+			for(double[] values : functionValues){
+				values[timeseriesIndex] = entry.getValue().get(functionIndex);
+				functionIndex++;
+			}
+		}
+		return timeseriesDatapoints;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
new file mode 100644
index 0000000..d662658
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * only numeric aggregation is supported and number type supported is double
+ */
+public class TimeSeriesBucket {
+	private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class);
+	private long startTime;
+	private long endTime;
+	private long interval;
+	
+	// map of aggregation function to aggregated values 
+	List<double[]> aggregatedValues = new ArrayList<double[]>();
+	
+	// align from the startTime
+	/**
+	 * 
+	 * @param startTime milliseconds
+	 * @param endTime milliseconds
+	 * @param intervalMillseconds
+	 * @param aggFunctions
+	 */
+	public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions){
+		int count =(int)((endTime-startTime)/intervalms);
+		for(int i=0; i<numAggFunctions; i++){
+			aggregatedValues.add(new double[count]);
+		}
+	}
+	
+	/**
+	 * add datapoint which has a list of values for different aggregate functions
+	 * for example, sum(numHosts), count(*), avg(timespan) etc
+	 * @param timestamp
+	 * @param values
+	 */
+	public void addDataPoint(long timestamp, List<Double> values){
+		// locate timeseries bucket
+		if(timestamp < startTime || timestamp > endTime){
+			LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime);
+			return;
+		}
+		int located =(int)((timestamp - startTime)/interval);
+		int index = 0;
+		for(Double src : values){
+			double[] timeSeriesValues = aggregatedValues.get(index);
+			timeSeriesValues[located] += src;
+			index++;
+		}
+	}
+	
+	public List<double[]> aggregatedValues(){
+		return this.aggregatedValues;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
new file mode 100644
index 0000000..c0a6e06
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.timeseries;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class TimeSeriesPostFlatAggregateSort {
+	// private static final Logger logger =
+	// LoggerFactory.getLogger(PostFlatAggregateSort.class);
+
+	private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(
+			Map<List<String>, List<Double>> mapForSort,
+			List<SortOption> sortOptions) {
+		SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(
+				new MapEntryComparator(sortOptions));
+		sortedEntries.addAll(mapForSort.entrySet());
+		return sortedEntries;
+	}
+
+	/**
+	 * sort aggregated results with sort options
+	 * 
+	 * @param entity
+	 */
+	public static List<Map.Entry<List<String>, List<double[]>>> sort(
+			Map<List<String>, List<Double>> mapForSort,
+			Map<List<String>, List<double[]>> valueMap,
+			List<SortOption> sortOptions, int topN) {
+
+		processIndex(sortOptions);
+		List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
+		SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue(
+				mapForSort, sortOptions);
+		for (Map.Entry<List<String>, List<Double>> entry : sortedSet) {
+			List<String> key = entry.getKey();
+			List<double[]> value = valueMap.get(key);
+			if (value != null) {
+				Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value);
+				result.add(newEntry);
+				if (topN > 0 && result.size() >= topN) {
+					break;
+				}
+			}
+		}
+		return result;
+	}
+
+	private static void processIndex(List<SortOption> sortOptions) {
+		for (int i = 0; i < sortOptions.size(); ++i) {
+			SortOption so = sortOptions.get(i);
+			so.setIndex(i);
+		}
+	}
+
+	private static class MapEntryComparator implements
+			Comparator<Map.Entry<List<String>, List<Double>>> {
+		private List<SortOption> sortOptions;
+
+		public MapEntryComparator(List<SortOption> sortOptions) {
+			this.sortOptions = sortOptions;
+		}
+
+		/**
+		 * default to sort by all groupby fields
+		 */
+		@Override
+		public int compare(Map.Entry<List<String>, List<Double>> e1,
+				Map.Entry<List<String>, List<Double>> e2) {
+			int r = 0;
+			List<String> keyList1 = e1.getKey();
+			List<Double> valueList1 = e1.getValue();
+			List<String> keyList2 = e2.getKey();
+			List<Double> valueList2 = e2.getValue();
+			for (SortOption so : sortOptions) {
+				int index = so.getIndex();
+				if (index == -1) {
+					continue;
+				}
+				if (!so.isInGroupby()) { // sort fields come from functions
+					Double value1 = valueList1.get(index);
+					Double value2 = valueList2.get(index);
+					r = value1.compareTo(value2);
+				} else { // sort fields come from groupby fields
+					String key1 = keyList1.get(index);
+					String key2 = keyList2.get(index);
+					r = key1.compareTo(key2);
+				}
+				if (r == 0)
+					continue;
+				if (!so.isAscendant()) {
+					r = -r;
+				}
+				return r;
+			}
+			// default to sort by groupby fields ascendently
+			if (r == 0) { // TODO is this check necessary
+				return new GroupbyFieldsComparator()
+						.compare(keyList1, keyList2);
+			}
+			return r;
+		}
+	}
+
+	static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable {
+		private final K key;
+		private final V value;
+
+		ImmutableEntry(K key, V value) {
+			this.key = key;
+			this.value = value;
+		}
+
+		@Override
+		public K getKey() {
+			return key;
+		}
+
+		@Override
+		public V getValue() {
+			return value;
+		}
+
+		@Override
+		public final V setValue(V value) {
+			throw new UnsupportedOperationException();
+		}
+
+		private static final long serialVersionUID = 0;
+	}
+
+}


Mime
View raw message