eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [08/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"
Date Thu, 19 Nov 2015 10:47:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/ListQueryCompiler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/ListQueryCompiler.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/ListQueryCompiler.java
new file mode 100755
index 0000000..c630c41
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/ListQueryCompiler.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query;
+
+import org.apache.eagle.log.entity.filter.HBaseFilterBuilder;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.log.expression.ExpressionParser;
+import org.apache.eagle.query.aggregate.timeseries.SortOption;
+import org.apache.eagle.query.aggregate.timeseries.SortOptionsParser;
+import org.apache.eagle.query.parser.EagleQueryParseException;
+import org.apache.eagle.query.parser.EagleQueryParser;
+import org.apache.eagle.query.parser.ORExpression;
+import org.apache.eagle.query.parser.TokenConstant;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.apache.eagle.query.aggregate.AggregateFunctionTypeMatcher;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ListQueryCompiler {
+	private final static Logger LOG = LoggerFactory.getLogger(ListQueryCompiler.class);
+	/**
+	 * syntax is <EntityName>[<Filter>]{<Projection>}
+	 */
+	private final static String listRegex = "^([^\\[]+)\\[([^\\]]*)\\]\\{(.+)\\}$";
+	private final static Pattern _listPattern = Pattern.compile(listRegex);
+
+	/**
+	 * syntax is @<fieldname>
+	 */
+	private final static String _fnAnyPattern = "*";
+	private final static Pattern _fnPattern = TokenConstant.ID_PATTERN;
+
+	/**
+	 * syntax is @<expression>
+	 */
+	private final static String expRegex = "^(EXP\\{.*\\})(\\s+AS)?(\\s+.*)?$";
+	private final static Pattern _expPattern = Pattern.compile(expRegex,Pattern.CASE_INSENSITIVE);
+
+	/**
+	 * syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}
+	 */
+
+	/** The regular expression before add EXP{<Expression>} in query **/
+	private final static String aggRegex = "^([^\\[]+)\\[([^\\]]*)\\]<([^>]*)>\\{(.+)\\}$";
+	private final static Pattern _aggPattern = Pattern.compile(aggRegex);
+
+	private final static String sortRegex = "^([^\\[]+)\\[([^\\]]*)\\]<([^>]*)>\\{(.+)\\}\\.\\{(.+)\\}$";
+	private final static Pattern _sortPattern = Pattern.compile(sortRegex);
+	
+	private String _serviceName;
+	private Filter _filter;
+	private List<String> _outputFields;
+	private List<String> _groupbyFields;
+	private List<AggregateFunctionType> _aggregateFunctionTypes;
+	private List<String> _aggregateFields;
+	private List<AggregateFunctionType> _sortFunctionTypes;
+	private List<String> _sortFields;
+	private Map<String,String> _outputAlias;
+
+	/**
+	 * Filed that must be required in filter
+	 *
+	 * @return
+	 */
+	public Set<String> getFilterFields() {
+		return _filterFields;
+	}
+
+	private Set<String> _filterFields;
+	private List<SortOption> _sortOptions;
+	private boolean _hasAgg;
+	private List<String[]> _partitionValues;
+	private boolean _filterIfMissing;
+	private ORExpression _queryExpression;
+	private boolean _outputAll = false;
+
+	public ListQueryCompiler(String query) throws Exception {
+		this(query, false);
+	}
+	
+	public ListQueryCompiler(String query, boolean filterIfMissing) throws Exception{
+		this._filterIfMissing = filterIfMissing;
+		Matcher m = _listPattern.matcher(query);
+		if(m.find()){
+			if(m.groupCount() != 3)
+				throw new IllegalArgumentException("List query syntax is <EntityName>[<Filter>]{<Projection>}");
+			compileCollectionQuery(m);
+			_hasAgg = false;
+			partitionConstraintValidate(query);
+			return;
+		}
+		
+		/** match sort pattern fist, otherwise some sort query will be mismatch as agg pattern */
+		m = _sortPattern.matcher(query);
+		if(m.find()){
+			if(m.groupCount() != 5)
+				throw new IllegalArgumentException("Aggregate query syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}.{<SortOptions>}");
+			compileAggregateQuery(m);
+			_hasAgg = true;
+			partitionConstraintValidate(query);
+			return;
+		}
+		
+		m = _aggPattern.matcher(query);
+		if(m.find()){
+			if(m.groupCount() != 4)
+			//if(m.groupCount() < 4 || m.groupCount() > 5)
+				throw new IllegalArgumentException("Aggregate query syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}.{<SortOptions>}");
+			compileAggregateQuery(m);
+			_hasAgg = true;
+			partitionConstraintValidate(query);
+			return;
+		}
+		
+		throw new IllegalArgumentException("List query syntax is <EntityName>[<Filter>]{<Projection>} \n Aggregate query syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}.{<SortOptions>}");
+	}
+	
+	/**
+	 * TODO: For now we don't support one query to query multiple partitions. In future if partition is defined 
+	 * for the entity, internally We need to spawn multiple queries and send one query for each search condition 
+	 * for each partition
+	 * 
+	 * @param query input query to compile
+	 */
+	private void partitionConstraintValidate(String query) {
+		if (_partitionValues != null && _partitionValues.size() > 1) {
+			final String[] values = _partitionValues.get(0);
+			for (int i = 1; i < _partitionValues.size(); ++i) {
+				final String[] tmpValues = _partitionValues.get(i);
+				for (int j = 0; j < values.length; ++j) {
+					if (values[j] == null || (!values[j].equals(tmpValues[j]))) {
+						final String errMsg = "One query for multiple partitions is NOT allowed for now! Query: " + query;
+						LOG.error(errMsg);
+						throw new IllegalArgumentException(errMsg);
+					}
+				}
+			}
+		}
+	}
+
+	public boolean hasAgg(){
+		return _hasAgg;
+	}
+	
+	public List<String[]> getQueryPartitionValues() {
+		return _partitionValues;
+	}
+	
+	public ORExpression getQueryExpression() {
+		return _queryExpression;
+	}
+	
+	private void checkEntityExistence(String entityName) throws EagleQueryParseException {
+		try {
+			if(EntityDefinitionManager.getEntityByServiceName(entityName) == null)
+				throw new EagleQueryParseException(entityName + " entity does not exist!");
+		} catch (InstantiationException e) {
+			final String errMsg = "Got an InstantiationException: " + e.getMessage();
+			throw new EagleQueryParseException(entityName + " entity does not exist! " + errMsg);
+		} catch (IllegalAccessException e) {
+			final String errMsg = "Got an IllegalAccessException: " + e.getMessage();
+			throw new EagleQueryParseException(entityName + " entity does not exist! " + errMsg);
+		}
+	}
+	
+	public String deleteAtSign(String expression) {
+		return expression.replace("@", "");
+	}
+	
+	private void compileCollectionQuery(Matcher m) throws EagleQueryParseException{
+		_serviceName = m.group(1);
+		checkEntityExistence(_serviceName);
+		if(_outputFields==null) _outputFields = new ArrayList<String>();
+		String qy = m.group(2);
+		_filter = compileQy(qy);
+		String prjFields = m.group(3);
+		String[] tmp = prjFields.split(",");
+		for(String str : tmp){
+			str = str.trim();
+			Matcher fnMatcher = _fnPattern.matcher(str);
+			Matcher expMatcher = _expPattern.matcher(str);
+			if(fnMatcher.find()) {
+				if (fnMatcher.groupCount() == 1)
+					_outputFields.add(fnMatcher.group(1));				
+			}else if(_fnAnyPattern.equals(str)){
+				if(LOG.isDebugEnabled()) LOG.debug("Output all fields");
+				// _outputFields.add(_fnAnyPattern);
+				this._outputAll = true;
+			}else if (expMatcher.find()) {
+				String expr = deleteAtSign(expMatcher.group(1));
+				String alias = expMatcher.group(3);
+				try {
+					String exprContent = TokenConstant.parseExpressionContent(expr);
+					_outputFields.addAll(ExpressionParser.parse(exprContent).getDependentFields());
+					if(alias!=null) {
+						if(_outputAlias == null) _outputAlias = new HashMap<String, String>();
+						_outputAlias.put(exprContent,alias.trim());
+					}
+				} catch (Exception ex){
+					LOG.error("Failed to parse expression: " + expr + ", exception: " + ex.getMessage(), ex);
+				} finally {
+					_outputFields.add(expr);
+				}
+			} else {
+				throw new IllegalArgumentException("Field name syntax must be @<FieldName> or * or Expression in syntax EXP{<Expression>}");
+			}
+		}
+	}
+	
+	private void compileAggregateQuery(Matcher m) throws EagleQueryParseException{
+		_serviceName = m.group(1);
+		checkEntityExistence(_serviceName);
+		String qy = m.group(2);
+		_filter = compileQy(qy);
+		String groupbyFields = m.group(3);
+		// groupbyFields could be empty
+		List<String> groupbyFieldList = null;
+		_groupbyFields = new ArrayList<String>();
+		if(!groupbyFields.isEmpty()){
+			groupbyFieldList = Arrays.asList(groupbyFields.split(","));
+			for(String str : groupbyFieldList){
+				Matcher fnMatcher = _fnPattern.matcher(str.trim());
+				if(!fnMatcher.find() || fnMatcher.groupCount() != 1)
+					throw new IllegalArgumentException("Field name syntax must be @<FieldName>");
+				_groupbyFields.add(fnMatcher.group(1));
+			}
+		}
+		String functions = m.group(4);
+		// functions
+		List<String> functionList = Arrays.asList(functions.split(","));
+		_aggregateFunctionTypes = new ArrayList<AggregateFunctionType>();
+		_aggregateFields = new ArrayList<String>();
+		for(String function : functionList){
+			AggregateFunctionTypeMatcher matcher = AggregateFunctionType.matchAll(function.trim());
+			if(!matcher.find()){
+				throw new IllegalArgumentException("Aggregate function must have format of count|sum|avg|max|min(<fieldname|expression>)");
+			}
+			_aggregateFunctionTypes.add(matcher.type());
+			String aggField = deleteAtSign(matcher.field().trim());
+			try {
+				if(_outputFields == null) _outputFields = new ArrayList<String>();
+				if(TokenConstant.isExpression(aggField)) {
+					_outputFields.addAll(ExpressionParser.parse(TokenConstant.parseExpressionContent(aggField)).getDependentFields());
+				}else{
+					_outputFields.add(aggField);
+				}
+			} catch (Exception ex){
+				LOG.error("Failed to parse expression: " + aggField + ", exception: " + ex.getMessage(), ex);
+			} finally {
+				_aggregateFields.add(aggField);
+			}
+		}
+		
+		// sort options
+		if(m.groupCount() < 5 || m.group(5) == null) // no sort options
+			return;
+		String sortOptions = m.group(5);
+		if(sortOptions != null){
+			LOG.info("SortOptions: " + sortOptions);
+			List<String> sortOptionList = Arrays.asList(sortOptions.split(","));
+			List<String> rawSortFields = new ArrayList<String>();
+			this._sortOptions = SortOptionsParser.parse(groupbyFieldList, functionList, sortOptionList, rawSortFields);
+			this._sortFunctionTypes = new ArrayList<>();
+			this._sortFields = new ArrayList<>();
+			for (String sortField : rawSortFields) {
+				AggregateFunctionTypeMatcher matcher = AggregateFunctionType.matchAll(sortField);
+				if(matcher.find()) {
+					_sortFunctionTypes.add(matcher.type());
+					_sortFields.add(deleteAtSign(matcher.field().trim()));
+				}
+			}
+		}
+	}
+	
+	/**
+	 * 1. syntax level - use antlr to pass the queries
+	 * 2. semantics level - can't distinguish tag or qualifier
+	 * @param qy
+	 * @return
+	 */
+	private Filter compileQy(String qy) throws EagleQueryParseException{
+		try {
+			EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(_serviceName);
+			if(qy == null || qy.isEmpty()){
+				if (ed.getPartitions() == null) {
+					if(LOG.isDebugEnabled()) LOG.warn("Query string is empty, full table scan query: " + qy);
+					// For hbase 0.98+, empty FilterList() will filter all rows, so we need return null instead
+                    return null;
+				} else {
+					final String errMsg = "Entity " + ed.getEntityClass().getSimpleName() + " defined partition, "
+							+ "but query doesn't provide partition condition! Query: " + qy; 
+					LOG.error(errMsg);
+					throw new IllegalArgumentException(errMsg);
+				}
+			}
+			EagleQueryParser parser = new EagleQueryParser(qy);
+			_queryExpression = parser.parse();
+			
+			//TODO: build customize filter for EXP{<Expression>}
+			HBaseFilterBuilder builder = new HBaseFilterBuilder(ed, _queryExpression, _filterIfMissing);
+			FilterList flist = builder.buildFilters();
+			_partitionValues = builder.getPartitionValues();
+			_filterFields = builder.getFilterFields();
+			return flist;
+		} catch (InstantiationException e) {
+			final String errMsg = "Got an InstantiationException: " + e.getMessage();
+			throw new EagleQueryParseException(_serviceName + " entity does not exist! " + errMsg);
+		} catch (IllegalAccessException e) {
+			final String errMsg = "Got an IllegalAccessException: " + e.getMessage();
+			throw new EagleQueryParseException(_serviceName + " entity does not exist! " + errMsg);
+		}
+	}
+	
+	public String serviceName(){
+		return _serviceName;
+	}
+	
+	public List<String> outputFields(){
+		return _outputFields;
+	}
+
+	public Filter filter(){
+		return _filter;
+	}
+	
+	public List<String> groupbyFields(){
+		return _groupbyFields;
+	}
+	
+	public List<AggregateFunctionType> aggregateFunctionTypes(){
+		return _aggregateFunctionTypes;
+	}
+	
+	public List<String> aggregateFields(){
+		return _aggregateFields;
+	}
+	
+	public List<SortOption> sortOptions(){
+		return _sortOptions;
+	}
+
+	public List<AggregateFunctionType> sortFunctions() {
+		return _sortFunctionTypes;
+	}
+	
+	public List<String> sortFields() {
+		return _sortFields;
+	}
+
+	/**
+	 * Output all fields (i.e. has * in out fields)
+	 *
+	 * @return
+	 */
+	public boolean isOutputAll(){ return _outputAll;}
+	public Map<String,String> getOutputAlias(){
+		return _outputAlias;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/QueryConstants.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/QueryConstants.java
new file mode 100644
index 0000000..231cc99
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/QueryConstants.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query;
+
+/**
+ * @since 3/25/15
+ */
+public class QueryConstants {
+    public final static String CHARSET ="UTF-8";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateAPIEntity.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateAPIEntity.java
new file mode 100644
index 0000000..1f3214f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateAPIEntity.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+public class AggregateAPIEntity {
+	private long numDirectDescendants;
+	private long numTotalDescendants;
+	private String key;
+	private SortedMap<String, AggregateAPIEntity> entityList = new TreeMap<String, AggregateAPIEntity>();
+	private List<AggregateAPIEntity> sortedList = new ArrayList<AggregateAPIEntity>();
+
+	public String getKey() {
+		return key;
+	}
+	public void setKey(String key) {
+		this.key = key;
+	}
+	@JsonProperty("sL")
+	public List<AggregateAPIEntity> getSortedList() {
+		return sortedList;
+	}
+	public void setSortedList(List<AggregateAPIEntity> sortedList) {
+		this.sortedList = sortedList;
+	}
+	@JsonProperty("eL")
+	public SortedMap<String, AggregateAPIEntity> getEntityList() {
+		return entityList;
+	}
+	public void setEntityList(SortedMap<String, AggregateAPIEntity> entityList) {
+		this.entityList = entityList;
+	}
+	@JsonProperty("nDD")
+	public long getNumDirectDescendants() {
+		return numDirectDescendants;
+	}
+	public void setNumDirectDescendants(long numDirectDescendants) {
+		this.numDirectDescendants = numDirectDescendants;
+	}
+	@JsonProperty("nTD")
+	public long getNumTotalDescendants() {
+		return numTotalDescendants;
+	}
+	public void setNumTotalDescendants(long numTotalDescendants) {
+		this.numTotalDescendants = numTotalDescendants;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateAPIEntityFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateAPIEntityFactory.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateAPIEntityFactory.java
new file mode 100644
index 0000000..8e18b39
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateAPIEntityFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+public interface AggregateAPIEntityFactory {
+	public AggregateAPIEntity create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateCondition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateCondition.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateCondition.java
new file mode 100755
index 0000000..5555cfd
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateCondition.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *
+ * @since : 11/7/14,2014
+ */
+public class AggregateCondition implements Serializable{
+	private static final long serialVersionUID = 1L;
+	private List<String> groupbyFields;
+	private List<AggregateFunctionType> aggregateFunctionTypes;
+	private List<String> aggregateFields;
+	private boolean timeSeries;
+	private long intervalMS;
+
+	public List<String> getGroupbyFields() {
+		return groupbyFields;
+	}
+
+	public void setGroupbyFields(List<String> groupbyFields) {
+		this.groupbyFields = groupbyFields;
+	}
+
+	public List<AggregateFunctionType> getAggregateFunctionTypes() {
+		return aggregateFunctionTypes;
+	}
+
+	public void setAggregateFunctionTypes(List<AggregateFunctionType> aggregateFunctionTypes) {
+		this.aggregateFunctionTypes = aggregateFunctionTypes;
+	}
+
+	public List<String> getAggregateFields() {
+		return aggregateFields;
+	}
+
+	public void setAggregateFields(List<String> aggregateFields) {
+		this.aggregateFields = aggregateFields;
+	}
+
+	public boolean isTimeSeries() {
+		return timeSeries;
+	}
+
+	public void setTimeSeries(boolean timeSeries) {
+		this.timeSeries = timeSeries;
+	}
+
+	public long getIntervalMS() {
+		return intervalMS;
+	}
+
+	public void setIntervalMS(long intervalMS) {
+		this.intervalMS = intervalMS;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionNotSupportedException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionNotSupportedException.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionNotSupportedException.java
new file mode 100644
index 0000000..df35c8b
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionNotSupportedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+public class AggregateFunctionNotSupportedException extends RuntimeException{
+	static final long serialVersionUID = -4548788354899625887L;
+	public AggregateFunctionNotSupportedException(){
+		super();
+	}
+	
+	public AggregateFunctionNotSupportedException(String message){
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionType.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionType.java
new file mode 100755
index 0000000..8ac3b8c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionType.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public enum AggregateFunctionType{
+	count("^(count)$"),
+	sum("^sum\\((.*)\\)$"),
+	avg("^avg\\((.*)\\)$"),
+	max("^max\\((.*)\\)$"),
+	min("^min\\((.*)\\)$");
+	
+	private Pattern pattern;
+	private AggregateFunctionType(String patternString){
+		this.pattern = Pattern.compile(patternString);
+	}
+
+	/**
+	 * This method is thread safe
+	 * match and retrieve back the aggregated fields, for count, aggregateFields can be null
+	 * @param function
+	 * @return
+	 */
+	public AggregateFunctionTypeMatcher matcher(String function){
+		Matcher m = pattern.matcher(function);
+
+		if(m.find()){
+			return new AggregateFunctionTypeMatcher(this, true, m.group(1));
+		}else{
+			return new AggregateFunctionTypeMatcher(this, false, null);
+		}
+	}
+
+	public static AggregateFunctionTypeMatcher matchAll(String function){
+		for(AggregateFunctionType type : values()){
+			Matcher m = type.pattern.matcher(function);
+			if(m.find()){
+				return new AggregateFunctionTypeMatcher(type, true, m.group(1));
+			}
+		}
+		return new AggregateFunctionTypeMatcher(null, false, null);
+	}
+
+	public static byte[] serialize(AggregateFunctionType type){
+		return type.name().getBytes();
+	}
+
+	public static AggregateFunctionType deserialize(byte[] type){
+		return valueOf(new String(type));
+	}
+
+	public static List<byte[]> toBytesList(List<AggregateFunctionType> types){
+		List<byte[]> result = new ArrayList<byte[]>();
+		for(AggregateFunctionType type:types){
+			result.add(serialize(type));
+		}
+		return result;
+	}
+
+	public static List<AggregateFunctionType> fromBytesList(List<byte[]> types){
+		List<AggregateFunctionType> result = new ArrayList<AggregateFunctionType>();
+		for(byte[] bs:types){
+			result.add(deserialize(bs));
+		}
+		return result;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionTypeMatcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionTypeMatcher.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionTypeMatcher.java
new file mode 100644
index 0000000..6b2bc13
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionTypeMatcher.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+public class AggregateFunctionTypeMatcher {
+	private final AggregateFunctionType type;
+	private final boolean matched;
+	private final String field;
+
+	public AggregateFunctionTypeMatcher(AggregateFunctionType type, boolean matched, String field){
+		this.type = type;
+		this.matched = matched;
+		this.field = field;
+	}
+	
+	public boolean find(){
+		return this.matched;
+	}
+	
+	public String field(){
+		return this.field;
+	}
+	
+	public AggregateFunctionType type(){
+		return this.type;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateParams.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateParams.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateParams.java
new file mode 100644
index 0000000..616184d
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateParams.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AggregateParams{
+	List<String> groupbyFields;
+	boolean counting;
+	List<String> sumFunctionFields = new ArrayList<String>();
+	List<SortFieldOrder> sortFieldOrders = new ArrayList<SortFieldOrder>();
+	
+	public List<SortFieldOrder> getSortFieldOrders() {
+		return sortFieldOrders;
+	}
+	public void setSortFieldOrders(List<SortFieldOrder> sortFieldOrders) {
+		this.sortFieldOrders = sortFieldOrders;
+	}
+	public List<String> getGroupbyFields() {
+		return groupbyFields;
+	}
+	public void setGroupbyFields(List<String> groupbyFields) {
+		this.groupbyFields = groupbyFields;
+	}
+	public boolean isCounting() {
+		return counting;
+	}
+	public void setCounting(boolean counting) {
+		this.counting = counting;
+	}
+	public List<String> getSumFunctionFields() {
+		return sumFunctionFields;
+	}
+	public void setSumFunctionFields(List<String> sumFunctionFields) {
+		this.sumFunctionFields = sumFunctionFields;
+	}
+
+	public static class SortFieldOrder{
+		public static final String SORT_BY_AGGREGATE_KEY = "key";
+		public static final String SORT_BY_COUNT = "count";
+		private String field;
+		private boolean ascendant;
+		
+		public SortFieldOrder(String field, boolean ascendant) {
+			super();
+			this.field = field;
+			this.ascendant = ascendant;
+		}
+		public String getField() {
+			return field;
+		}
+		public void setField(String field) {
+			this.field = field;
+		}
+		public boolean isAscendant() {
+			return ascendant;
+		}
+		public void setAscendant(boolean ascendant) {
+			this.ascendant = ascendant;
+		} 
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateParamsValidator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateParamsValidator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateParamsValidator.java
new file mode 100644
index 0000000..9500574
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateParamsValidator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AggregateParamsValidator {
+	/**
+	 * This method handle the following sytle syntax
+	 * sum(numConfiguredMapSlots), count group by cluster, rack 
+	 * 1. ensure that all gb fields must occur in outputField or outputTag
+	 * 2. ensure that all summarized fields must occur in outputField, 
+	 *    for example, for function=sum(numConfiguredMapSlots), numConfiguredMapSlots must occur in outputField  
+	 * 3. groupby should be pre-appended with a root groupby field  
+	 * @param outputTags
+	 * @param outputFields
+	 * @param groupbys
+	 * @param functions
+	 * @throws IllegalArgumentException
+	 */
+	public static AggregateParams compileAggregateParams(List<String> outputTags, List<String> outputFields, List<String> groupbys, List<String> functions, List<String> sortFieldOrders)
+			throws IllegalArgumentException, AggregateFunctionNotSupportedException{
+		AggregateParams aggParams = new AggregateParams();
+		// ensure that all gb fields must occur in outputField or outputTag
+		for(String groupby : groupbys){
+			if(!outputTags.contains(groupby) && !outputFields.contains(groupby)){
+				throw new IllegalArgumentException(groupby + ", All gb fields should appear in outputField list or outputTag list");
+			}
+		}
+		
+		// parse functions and ensure that all summarized fields must occur in outputField
+		for(String function : functions){
+			AggregateFunctionTypeMatcher m = AggregateFunctionType.count.matcher(function);
+			if(m.find()){
+				aggParams.setCounting(true);
+				continue;
+			}
+
+			m = AggregateFunctionType.sum.matcher(function);
+			if(m.find()){
+				if(!outputFields.contains(m.field())){
+					throw new IllegalArgumentException(m.field() + ", All summary function fields should appear in outputField list");
+				}
+				aggParams.getSumFunctionFields().add(m.field());
+				continue;
+			}
+			
+			throw new AggregateFunctionNotSupportedException("function " + function + " is not supported, only count, sum aggregate functions are now supported");
+		}
+		
+		//  groupby should be pre-appended with a root groupby field
+		List<String> groupbyFields = new ArrayList<String>();
+		groupbyFields.add(Aggregator.GROUPBY_ROOT_FIELD_NAME);
+		groupbyFields.addAll(groupbys);
+		aggParams.setGroupbyFields(groupbyFields);
+
+		// check sort field orders
+		boolean byKeySorting = false;
+		for(String sortFieldOrder : sortFieldOrders){
+			AggregateParams.SortFieldOrder sfo = SortFieldOrderType.matchAll(sortFieldOrder);
+			if(sfo == null){
+				throw new IllegalArgumentException(sortFieldOrder + ", All sort field order should be <field>=(asc|desc)");
+			}
+			if(sfo.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY)){
+				byKeySorting =  true;
+			}else if(!sfo.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_COUNT)){
+				if(!groupbys.contains(sfo.getField()) && !aggParams.getSumFunctionFields().contains(sfo.getField())){
+					throw new IllegalArgumentException(sortFieldOrder + ", All sort field order should appear in gb or function fields");
+				}
+			}
+			aggParams.getSortFieldOrders().add(sfo);
+		}
+		// always add key ascendant to the last aggregation key if not specified
+		if(!byKeySorting){
+			aggParams.getSortFieldOrders().add(new AggregateParams.SortFieldOrder(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY, true));
+		}
+		return aggParams;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateResultAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateResultAPIEntity.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateResultAPIEntity.java
new file mode 100644
index 0000000..c1c87d3
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateResultAPIEntity.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+public class AggregateResultAPIEntity {
+	private boolean success;
+	private String exception;
+	private long elapsedms;
+	private AggregateAPIEntity entity;
+
+	public long getElapsedms() {
+		return elapsedms;
+	}
+	public void setElapsedms(long elapsedms) {
+		this.elapsedms = elapsedms;
+	}
+	public AggregateAPIEntity getEntity() {
+		return entity;
+	}
+	public void setEntity(AggregateAPIEntity entity) {
+		this.entity = entity;
+	}
+	public boolean isSuccess() {
+		return success;
+	}
+	public void setSuccess(boolean success) {
+		this.success = success;
+	}
+	public String getException() {
+		return exception;
+	}
+	public void setException(String exception) {
+		this.exception = exception;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/Aggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/Aggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/Aggregator.java
new file mode 100644
index 0000000..de911e5
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/Aggregator.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+public class Aggregator {
+	private static final Logger LOG = LoggerFactory.getLogger(Aggregator.class);
+	public static final String GROUPBY_ROOT_FIELD_NAME = "site";
+	public static final String GROUPBY_ROOT_FIELD_VALUE = "xyz";
+	public static final String UNASSIGNED_GROUPBY_ROOT_FIELD_NAME = "unassigned";
+	
+	private final AggregateAPIEntityFactory factory;
+	private final AggregateAPIEntity root;
+	private final List<String> groupbys;
+	private final List<String> sumFunctionFields;
+	private final boolean counting;
+	
+	public Aggregator(AggregateAPIEntityFactory factory, AggregateAPIEntity root, List<String> groupbys, boolean counting, List<String> sumFunctionFields){
+		this.factory = factory;
+		this.root = root;
+		this.groupbys = groupbys;
+		this.sumFunctionFields = sumFunctionFields;
+		this.counting = counting;
+	}
+
+	/**
+	 * this locate result can be cached? we don't need check if it's TaggedLogAPIEntity each time when iterating entities
+	 * @param groupby
+	 * @param obj
+	 * @return
+	 * @throws Exception
+	 */
+	private String locateGroupbyField(String groupby, TaggedLogAPIEntity obj){
+		if(groupby.equals(GROUPBY_ROOT_FIELD_NAME)){
+			return GROUPBY_ROOT_FIELD_VALUE;
+		}
+		// check tag first
+		String tagv = obj.getTags().get(groupby);
+		if(tagv != null)
+			return tagv;
+		// check against pojo, or qualifierValues
+		String fn = groupby.substring(0,1).toUpperCase()+groupby.substring(1, groupby.length());
+		try{
+			Method getM = obj.getClass().getMethod("get"+fn);
+			Object value = getM.invoke(obj);
+			return (String)value;
+		}catch(Exception ex){
+			LOG.warn(groupby + " field is in neither tags nor fields, " + ex.getMessage());
+			return null;
+		}
+	}
+	
+	/**
+	 * accumulate a list of entities
+	 * @param entities
+	 * @throws Exception
+	 */
+	public void accumulateAll(List<TaggedLogAPIEntity> entities) throws Exception{
+		for(TaggedLogAPIEntity entity : entities){
+			accumulate(entity);
+		}
+	}
+	
+	/**
+	 * currently only group by tags
+	 * groupbys' first item always is site, which is a reserved field 
+	 */
+	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
+		AggregateAPIEntity current = root;
+		for(String groupby : groupbys){
+			// TODO tagv is empty, so what to do? use a reserved field_name "unassigned" ?
+			// TODO we should support all Pojo with java bean style object
+			String tagv = locateGroupbyField(groupby, entity);
+			if(tagv == null || tagv.isEmpty()){
+				tagv = UNASSIGNED_GROUPBY_ROOT_FIELD_NAME;
+			}
+			Map<String, AggregateAPIEntity> children = current.getEntityList();
+			if(children.get(tagv) == null){
+				children.put(tagv, factory.create());
+				current.setNumDirectDescendants(current.getNumDirectDescendants()+1);
+			}
+			AggregateAPIEntity child = children.get(tagv);
+			// go through all aggregate functions including count, summary etc.			
+			if(counting)
+				count(child);
+			for(String sumFunctionField : sumFunctionFields){
+				sum(child, entity, sumFunctionField);
+			}
+			
+			current = child;
+		}
+		
+	}
+
+	
+	/**
+	 * use java bean specifications?
+	 * reflection is not efficient, let us find out solutions
+	 */
+	private void sum(Object targetObj, TaggedLogAPIEntity srcObj, String fieldName) throws Exception{
+		try{
+			String fn = fieldName.substring(0,1).toUpperCase()+fieldName.substring(1, fieldName.length());
+			Method srcGetMethod = srcObj.getClass().getMethod("get"+fn);
+			Object srcValue = srcGetMethod.invoke(srcObj);
+			if(srcValue == null){
+				return;  // silently don't count this source object
+			}
+			Method targetGetMethod = targetObj.getClass().getMethod("get"+fn);
+			Object targetValue = targetGetMethod.invoke(targetObj);
+			if(targetValue instanceof Long){
+				Method setM = targetObj.getClass().getMethod("set"+fn, long.class);
+				Long tmp1 = (Long)targetValue;
+				// TODO, now source object always have type "java.lang.String", later on we should support various type including integer type
+				Long tmp2 = null;
+				if(srcValue instanceof String){
+					tmp2 = Long.valueOf((String)srcValue);
+				}else if(srcValue instanceof Long){
+					tmp2 = (Long)srcValue;
+				}else{
+					throw new IllegalAggregateFieldTypeException(srcValue.getClass().toString() + " type is not support. The source type must be Long or String");
+				}
+				setM.invoke(targetObj, tmp1.longValue()+tmp2.longValue());
+			}else if(targetValue instanceof Double){
+				Method setM = targetObj.getClass().getMethod("set"+fn, double.class);
+				Double tmp1 = (Double)targetValue;
+				String src = (String) srcValue;
+				Double tmp2 = Double.valueOf(src);
+				setM.invoke(targetObj, tmp1.doubleValue()+tmp2.doubleValue());
+			}else{
+				throw new IllegalAggregateFieldTypeException(targetValue.getClass().toString() + " type is not support. The target type must be long or double");
+			}
+		}catch(Exception ex){
+			LOG.error("Cannot do sum aggregation for field " + fieldName, ex);
+			throw ex;
+		}
+	}
+	
+	/**
+	 * count possible not only count for number of descendants but also count for not-null fields 
+	 * @param targetObj
+	 * @throws Exception
+	 */
+	private void count(AggregateAPIEntity targetObj) throws Exception{
+		targetObj.setNumTotalDescendants(targetObj.getNumTotalDescendants()+1);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/BucketQuery.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/BucketQuery.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/BucketQuery.java
new file mode 100644
index 0000000..a00c5ad
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/BucketQuery.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+public class BucketQuery {
+	public final static String UNASSIGNED_BUCKET = "unassigned"; 
+	private List<String> bucketFields;
+	private int limit;
+	private Map<String, Object> root = new HashMap<String, Object>();
+	
+	public BucketQuery(List<String> bucketFields, int limit){
+		this.bucketFields = bucketFields;
+		this.limit = limit;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public void put(TaggedLogAPIEntity entity){
+		Map<String, Object> current = root;
+		int bucketCount = bucketFields.size();
+		if(bucketCount <= 0)
+			return; // silently return
+		int i = 0;
+		String bucketFieldValue = null;
+		for(; i<bucketCount; i++){
+			String bucketField = bucketFields.get(i);
+			bucketFieldValue = entity.getTags().get(bucketField);
+			if(bucketFieldValue == null || bucketFieldValue.isEmpty()){
+				bucketFieldValue = UNASSIGNED_BUCKET;
+			}
+			// for last bucket, bypass the following logic
+			if(i == bucketCount-1){
+				break;
+			}
+				
+			if(current.get(bucketFieldValue) == null){
+				current.put(bucketFieldValue, new HashMap<String, Object>());
+			}
+			// for the last level of bucket, it is not Map, instead it is List<TaggedLogAPIEntity> 
+			current = (Map<String, Object>)current.get(bucketFieldValue);
+		}
+		List<TaggedLogAPIEntity> bucketContent = (List<TaggedLogAPIEntity>)current.get(bucketFieldValue);
+		if(bucketContent == null){
+			bucketContent = new ArrayList<TaggedLogAPIEntity>();
+			current.put(bucketFieldValue, bucketContent);
+		}
+		
+		if(bucketContent.size() >= limit){
+			return;
+		}else{
+			bucketContent.add(entity);
+		}
+	}
+	
+	public void batchPut(List<TaggedLogAPIEntity> entities){
+		for(TaggedLogAPIEntity entity : entities){
+			put(entity);
+		}
+	}
+	
+	public Map<String, Object> get(){
+		return root;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/IllegalAggregateFieldTypeException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/IllegalAggregateFieldTypeException.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/IllegalAggregateFieldTypeException.java
new file mode 100644
index 0000000..3e3e739
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/IllegalAggregateFieldTypeException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+public class IllegalAggregateFieldTypeException extends RuntimeException{
+	static final long serialVersionUID = -4548788354899625887L;
+	public IllegalAggregateFieldTypeException(){
+		super();
+	}
+	
+	public IllegalAggregateFieldTypeException(String message){
+		super(message + ", only count and sum are support");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/PostAggregateSorting.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/PostAggregateSorting.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/PostAggregateSorting.java
new file mode 100644
index 0000000..b801255
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/PostAggregateSorting.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+import java.lang.reflect.Method;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PostAggregateSorting {
+	private static final Logger LOG = LoggerFactory.getLogger(PostAggregateSorting.class);
+	
+	private static SortedSet<Map.Entry<String, AggregateAPIEntity>> sortByValue(Map<String, AggregateAPIEntity> map, List<AggregateParams.SortFieldOrder> sortedFields) {
+	    SortedSet<Map.Entry<String, AggregateAPIEntity>> sortedEntries = new TreeSet<Map.Entry<String, AggregateAPIEntity>>(new MapKeyValueComparator(sortedFields));
+	    sortedEntries.addAll(map.entrySet());
+	    return sortedEntries;
+	}
+
+	/**
+	 * recursively populate sorted list from entity list
+	 * @param entity
+	 */
+	public static void sort(AggregateAPIEntity entity, List<AggregateParams.SortFieldOrder> sortFieldOrders){
+		// sort should internally add key field to AggregateAPIEntity before the sorting starts as "key" could be sorted against
+		Map<String, AggregateAPIEntity> children = entity.getEntityList();
+		for(Map.Entry<String, AggregateAPIEntity> e : children.entrySet()){
+			e.getValue().setKey(e.getKey());
+		}
+		SortedSet<Map.Entry<String, AggregateAPIEntity>> set = sortByValue(children, sortFieldOrders);
+		for(Map.Entry<String, AggregateAPIEntity> entry : set){
+			entity.getSortedList().add(entry.getValue());
+		}
+		for(Map.Entry<String, AggregateAPIEntity> entry : entity.getEntityList().entrySet()){
+			sort(entry.getValue(), sortFieldOrders);
+		}
+		entity.setEntityList(null);
+	}
+
+	private static class MapKeyValueComparator implements Comparator<Map.Entry<String, AggregateAPIEntity>>{
+		private List<AggregateParams.SortFieldOrder> sortedFieldOrders;
+		public MapKeyValueComparator(List<AggregateParams.SortFieldOrder> sortedFields){
+			this.sortedFieldOrders = sortedFields;
+		}
+		@Override
+        public int compare(Map.Entry<String, AggregateAPIEntity> e1, Map.Entry<String, AggregateAPIEntity> e2){
+			int r = 0;
+			AggregateAPIEntity entity1 = e1.getValue();
+			AggregateAPIEntity entity2 = e2.getValue();
+            for(AggregateParams.SortFieldOrder sortFieldOrder : sortedFieldOrders){
+            	// TODO count should not be literal, compare numTotalDescendants
+            	if(sortFieldOrder.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_COUNT)){
+            		long tmp = entity1.getNumTotalDescendants() - entity2.getNumTotalDescendants();
+            		r = (tmp == 0) ? 0 : ((tmp > 0) ? 1 : -1);
+            	}else if(sortFieldOrder.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY)){
+            		r = entity1.getKey().compareTo(entity2.getKey());
+            	}else{
+            		try{
+	            		String sortedField = sortFieldOrder.getField();
+	            		String tmp1 = sortedField.substring(0, 1).toUpperCase()+sortedField.substring(1);
+	            		Method getMethod1 = entity1.getClass().getMethod("get"+tmp1);
+	            		Object r1 = getMethod1.invoke(entity1);
+	            		Long comp1 = (Long)r1;
+	            		String tmp2 = sortedField.substring(0, 1).toUpperCase()+sortedField.substring(1);
+	            		Method getMethod2 = entity2.getClass().getMethod("get"+tmp2);
+	            		Object r2 = getMethod2.invoke(entity2);
+	            		Long comp2 = (Long)r2;
+	            		r = comp1.compareTo(comp2);
+            		}catch(Exception ex){
+            			LOG.error("Can not get corresponding field for sorting", ex);
+            			r = 0;
+            		}
+            	}
+            	if(r == 0) continue;
+        		if(!sortFieldOrder.isAscendant()){
+        			r = -r;
+        		}
+    			return r;
+            }	
+			return r;
+        }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderType.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderType.java
new file mode 100644
index 0000000..6d47c7f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderType.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public enum SortFieldOrderType {
+	key("^(key)=(asc|desc)$"),
+	count("^(count)=(asc|desc)$"),
+	sum("^sum\\((.*)\\)=(asc|desc)$"),
+	avg("^avg\\((.*)\\)(asc|desc)$"),
+	max("^max\\((.*)\\)(asc|desc)$"),
+	min("^min\\((.*)\\)(asc|desc)$");
+	
+	private Pattern pattern;
+	private SortFieldOrderType(String patternString){
+		this.pattern = Pattern.compile(patternString);
+	}
+
+	/**
+	 * This method is thread safe
+	 * match and retrieve back the aggregated fields, for count, aggregateFields can be null
+	 * @param sortFieldOrder
+	 * @return
+	 */
+	public SortFieldOrderTypeMatcher matcher(String sortFieldOrder){
+		Matcher m = pattern.matcher(sortFieldOrder);
+		
+		if(m.find()){
+			return new SortFieldOrderTypeMatcher(true, m.group(1), m.group(2));
+		}else{
+			return new SortFieldOrderTypeMatcher(false, null, null);
+		}
+	}
+	
+	public static AggregateParams.SortFieldOrder matchAll(String sortFieldOrder){
+		for(SortFieldOrderType type : SortFieldOrderType.values()){
+			SortFieldOrderTypeMatcher m = type.matcher(sortFieldOrder);
+			if(m.find())
+				return m.sortFieldOrder();
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderTypeMatcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderTypeMatcher.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderTypeMatcher.java
new file mode 100644
index 0000000..0b4d408
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderTypeMatcher.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate;
+
+
+public class SortFieldOrderTypeMatcher {
+	private boolean matched;
+	private AggregateParams.SortFieldOrder sortFieldOrder;
+
+	public SortFieldOrderTypeMatcher(boolean matched, String field, String order){
+		this.matched = matched;
+		if(matched){
+			this.sortFieldOrder = new AggregateParams.SortFieldOrder(field, order.equals("asc"));
+		}
+	}
+	
+	public boolean find(){
+		return this.matched;
+	}
+	
+	public AggregateParams.SortFieldOrder sortFieldOrder(){
+		return this.sortFieldOrder;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/Function.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/Function.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/Function.java
new file mode 100755
index 0000000..83c683c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/Function.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+public abstract class Function{
+	private int count = 0;
+	protected void incrCount(int num){ count += num; }
+	public int count(){ return count; }
+	public abstract void run(double v,int count);
+	public void run(double v){ run(v,1); }
+	public abstract double result();
+
+	public static class Avg extends Function {
+		private double total;
+		public Avg(){
+			this.total = 0.0;
+		}
+		@Override
+		public void run(double v,int count){
+			this.incrCount(count);
+			total += v;
+		}
+		@Override
+		public double result(){
+			return this.total/this.count();
+		}
+	}
+
+	public static class Max extends Function {
+		private double maximum;
+		public Max(){
+			// TODO is this a bug, or only positive numeric calculation is supported
+			this.maximum = 0.0;
+		}
+
+		@Override
+		public void run(double v,int count){
+			this.incrCount(count);
+			if(v > maximum){
+				maximum = v;
+			}
+		}
+
+		@Override
+		public double result(){
+			return maximum;
+		}
+	}
+
+	public static class Min extends Function {
+		private double minimum;
+		public Min(){
+			// TODO is this a bug, or only positive numeric calculation is supported
+			this.minimum = Double.MAX_VALUE;
+		}
+		@Override
+		public void run(double v,int count){
+			this.incrCount(count);
+			if(v < minimum){
+				minimum = v;
+			}
+		}
+
+		@Override
+		public double result(){
+			return minimum;
+		}
+	}
+
+	public static class Sum extends Function {
+		private double summary;
+		public Sum(){
+			this.summary = 0.0;
+		}
+		@Override
+		public void run(double v,int count){
+			this.incrCount(count);
+			this.summary += v;
+		}
+
+		@Override
+		public double result(){
+			return this.summary;
+		}
+	}
+
+	public static class Count extends Sum{
+		public Count(){
+			super();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/FunctionFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/FunctionFactory.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/FunctionFactory.java
new file mode 100755
index 0000000..c6d1861
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/FunctionFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class FunctionFactory{
+	public abstract Function createFunction();
+
+	public static class AvgFactory extends FunctionFactory {
+		@Override
+		public Function createFunction(){
+			return new Function.Avg();
+		}
+	}
+
+	public static class MaxFactory extends FunctionFactory {
+		@Override
+		public Function createFunction(){
+			return new Function.Max();
+		}
+	}
+
+	public static class MinFactory extends FunctionFactory {
+		@Override
+		public Function createFunction(){
+			return new Function.Min();
+		}
+	}
+
+	public static class CountFactory extends FunctionFactory {
+		@Override
+		public Function createFunction(){
+			return new Function.Count();
+		}
+	}
+
+	public static class SumFactory extends FunctionFactory {
+		@Override
+		public Function createFunction(){
+			return new Function.Sum();
+		}
+	}
+
+	public static FunctionFactory locateFunctionFactory(AggregateFunctionType funcType){
+		return _functionFactories.get(funcType.name());
+	}
+
+	private static Map<String, FunctionFactory> _functionFactories = new HashMap<String, FunctionFactory>();
+	static{
+		_functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
+		_functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
+		_functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
+		_functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
+		_functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
+	}
+}
+	
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKey.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKey.java
new file mode 100755
index 0000000..3a54d70
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKey.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * <h3>Structure</h3>
+ * <pre>
+ * {
+ *   List[byte[],...]
+ * }
+ * </pre>
+ */
+public class GroupbyKey implements Writable {
+	private final WritableList<BytesWritable> value;
+
+	public void addValue(byte[] value){
+		this.value.add(new BytesWritable(value));
+	}
+	public void addAll(List<BytesWritable> list){
+		this.value.addAll(list);
+	}
+
+	public List<BytesWritable> getValue(){
+		return value;
+	}
+
+	/**
+	 * empty constructor
+	 */
+	public GroupbyKey(){
+		this.value = new WritableList<BytesWritable>(BytesWritable.class);
+	}
+
+	/**
+	 * clear for reuse
+	 */
+	public void clear(){
+		value.clear();
+	}
+
+	/**
+	 * copy constructor
+	 * @param key
+	 */
+	public GroupbyKey(GroupbyKey key){
+		this();
+		ListIterator<BytesWritable> it = key.value.listIterator();
+//		ListIterator<byte[]> it = key.value.listIterator();
+		while(it.hasNext()){
+			this.value.add(it.next());
+		}
+	}
+
+	public GroupbyKey(List<byte[]> bytes){
+		this();
+		for(byte[] bt:bytes){
+			this.addValue(bt);
+		}
+	}
+
+	@Override
+	public boolean equals(Object obj){
+		if(obj == this)
+			return true;
+		if(!(obj instanceof GroupbyKey)){
+			return false;
+		}
+		GroupbyKey that = (GroupbyKey)obj;
+//		ListIterator<byte[]> e1 = this.value.listIterator();
+//		ListIterator<byte[]> e2 = that.value.listIterator();
+		ListIterator<BytesWritable> e1 = this.value.listIterator();
+		ListIterator<BytesWritable> e2 = that.value.listIterator();
+		while(e1.hasNext() && e2.hasNext()){
+			if(!Arrays.equals(e1.next().getBytes(), e2.next().getBytes()))
+				return false;
+		}
+		return !(e1.hasNext() || e2.hasNext());
+	}
+
+	@Override
+	public int hashCode(){
+		ListIterator<BytesWritable> e1 = this.value.listIterator();
+		int hash = 0xFFFFFFFF;
+		while(e1.hasNext()){
+			hash ^= Arrays.hashCode(e1.next().getBytes());
+		}
+		return hash;
+	}
+
+	/**
+	 * Serialize the fields of this object to <code>out</code>.
+	 *
+	 * @param out <code>DataOuput</code> to serialize this object into.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void write(DataOutput out) throws IOException {
+		this.value.write(out);
+	}
+
+	/**
+	 * Deserialize the fields of this object from <code>in</code>.
+	 * <p/>
+	 * <p>For efficiency, implementations should attempt to re-use storage in the
+	 * existing object where possible.</p>
+	 *
+	 * @param in <code>DataInput</code> to deseriablize this object from.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		this.value.readFields(in);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyAggregatable.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyAggregatable.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyAggregatable.java
new file mode 100755
index 0000000..7e20029
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyAggregatable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+import java.util.List;
+
+/**
+ * The generic interface to unify the GroupbyKeyValue-based results of different 
+ * business logic aggregates like RawAggregator or TimeSeriesAggregator
+ *
+ * @see org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator
+ * @see RawAggregator
+ *
+ * @since : 11/3/14,2014
+ *
+ */
+public interface GroupbyKeyAggregatable {
+	/**
+	 * @see RawAggregator#getGroupbyKeyValues()
+	 * @see org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator#getGroupbyKeyValues()
+	 * 
+	 * @return
+	 */
+	public List<GroupbyKeyValue> getGroupbyKeyValues();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyComparator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyComparator.java
new file mode 100755
index 0000000..f976c8c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyComparator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.ListIterator;
+
+public class GroupbyKeyComparator implements Comparator<GroupbyKey>{
+	@Override 
+    public int compare(GroupbyKey key1, GroupbyKey key2){
+		List<BytesWritable> list1 = key1.getValue();
+		List<BytesWritable> list2 = key2.getValue();
+		
+		if(list1 == null || list2 == null || list1.size() != list2.size())
+			throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
+		ListIterator<BytesWritable> e1 = list1.listIterator();
+		ListIterator<BytesWritable> e2 = list2.listIterator();
+		while(e1.hasNext() && e2.hasNext()){
+			int r = Bytes.compareTo(e1.next().copyBytes(), e2.next().copyBytes());
+			if(r != 0)
+				return r;
+		}
+		return 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValue.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValue.java
new file mode 100755
index 0000000..2256761
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValue.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * <h3>Groupby KeyValue Structure</h3>
+ * <pre>
+ * {
+ *  key: GroupbyKey
+ *  value: GroupbyValue
+ * }
+ * </pre>
+ * @see GroupbyKey
+ * @see GroupbyValue
+ *
+ * @since : 11/4/14,2014
+ */
+public class GroupbyKeyValue implements Writable {
+	private GroupbyKey key;
+	private GroupbyValue value;
+	public GroupbyKeyValue(){
+		this.key = new GroupbyKey();
+		this.value = new GroupbyValue();
+	}
+	public GroupbyKeyValue(GroupbyKey key,GroupbyValue value){
+		this.key = key;
+		this.value = value;
+	}
+	public GroupbyKey getKey() {
+		return key;
+	}
+
+	public void setKey(GroupbyKey key) {
+		this.key = key;
+	}
+
+	public GroupbyValue getValue() {
+		return value;
+	}
+
+	public void setValue(GroupbyValue value) {
+		this.value = value;
+	}
+
+	/**
+	 * Serialize the fields of this object to <code>out</code>.
+	 *
+	 * @param out <code>DataOuput</code> to serialize this object into.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void write(DataOutput out) throws IOException {
+		this.key.write(out);
+		this.value.write(out);
+	}
+
+	/**
+	 * Deserialize the fields of this object from <code>in</code>.
+	 * <p/>
+	 * <p>For efficiency, implementations should attempt to re-use storage in the
+	 * existing object where possible.</p>
+	 *
+	 * @param in <code>DataInput</code> to deseriablize this object from.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		this.key.readFields(in);
+		this.value.readFields(in);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValueCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValueCreationListener.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValueCreationListener.java
new file mode 100755
index 0000000..6ca4bec
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValueCreationListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.raw;
+
+/**
+ * @since : 11/11/14,2014
+ */
+public interface GroupbyKeyValueCreationListener {
+	void keyValueCreated(GroupbyKeyValue kv);
+}


Mime
View raw message