eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [04/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"
Date Thu, 19 Nov 2015 10:47:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestBucketQuery2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestBucketQuery2.java b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestBucketQuery2.java
new file mode 100644
index 0000000..b416319
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestBucketQuery2.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.test;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.query.aggregate.BucketQuery;
+import junit.framework.Assert;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestBucketQuery2 {
+	private static class SampleTaggedLogAPIEntity extends TaggedLogAPIEntity{
+		private String description;
+
+		@SuppressWarnings("unused")
+		public String getDescription() {
+			return description;
+		}
+
+		public void setDescription(String description) {
+			this.description = description;
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+//	@Test
+	public void testBucketQuery(){
+		SampleTaggedLogAPIEntity e1 = new SampleTaggedLogAPIEntity();
+		e1.setTags(new HashMap<String, String>());
+		e1.getTags().put("cluster", "cluster1");
+		e1.getTags().put("rack", "rack123");
+		e1.setDescription("this is description 1");
+		
+		SampleTaggedLogAPIEntity e2 = new SampleTaggedLogAPIEntity();
+		e2.setTags(new HashMap<String, String>());
+		e2.getTags().put("cluster", "cluster1");
+		e2.getTags().put("rack", "rack123");
+		e2.setDescription("this is description 2");
+		
+		List<String> bucketFields = new ArrayList<String>();
+		bucketFields.add("cluster");
+		int limit = 1;
+		
+		BucketQuery query1 = new BucketQuery(bucketFields, limit);
+		query1.put(e1);
+		query1.put(e2);
+		
+		Map<String, Object> map = query1.get();
+		
+		List<TaggedLogAPIEntity> o = (List<TaggedLogAPIEntity>)map.get("cluster1");
+		Assert.assertEquals(limit, o.size());
+		
+		JsonFactory factory = new JsonFactory();
+		ObjectMapper mapper = new ObjectMapper(factory);
+		try{
+			String result = mapper.writeValueAsString(map);
+			System.out.println(result);
+		}catch(Exception ex){
+			ex.printStackTrace();
+			Assert.fail("can not serialize bucket query result");
+		}
+		
+		limit = 2;
+		BucketQuery query2 = new BucketQuery(bucketFields, limit);
+		query2.put(e1);
+		query2.put(e2);
+		Map<String, Object> map2 = query2.get();
+		o = (List<TaggedLogAPIEntity>)map2.get("cluster1");
+		try{
+			String result = mapper.writeValueAsString(map2);
+			System.out.println(result);
+		}catch(Exception ex){
+			ex.printStackTrace();
+			Assert.fail("can not serialize bucket query result");
+		}
+		Assert.assertEquals(limit, o.size());
+		
+		
+		SampleTaggedLogAPIEntity e3 = new SampleTaggedLogAPIEntity();
+		e3.setTags(new HashMap<String, String>());
+		e3.getTags().put("cluster", "cluster1");
+		e3.getTags().put("rack", "rack124");
+		e3.setDescription("this is description 3");
+		bucketFields.add("rack");
+		limit = 2;
+		BucketQuery query3 = new BucketQuery(bucketFields, limit);
+		query3.put(e1);
+		query3.put(e2);
+		query3.put(e3);
+		Map<String, Object> map3 = query3.get();
+		Map<String, Object> o3 = (Map<String, Object>)map3.get("cluster1");
+		List<TaggedLogAPIEntity> o4 = (List<TaggedLogAPIEntity>)o3.get("rack124");
+		Assert.assertEquals(1, o4.size());
+		List<TaggedLogAPIEntity> o5 = (List<TaggedLogAPIEntity>)o3.get("rack123");
+		Assert.assertEquals(o5.size(), 2);
+		
+		try{
+			String result = mapper.writeValueAsString(map3);
+			System.out.println(result);
+		}catch(Exception ex){
+			ex.printStackTrace();
+			Assert.fail("can not serialize bucket query result");
+		}
+		
+		
+		SampleTaggedLogAPIEntity e4 = new SampleTaggedLogAPIEntity();
+		e4.setTags(new HashMap<String, String>());
+		e4.getTags().put("cluster", "cluster1");
+		// rack is set to null
+//		e4.getTags().put("rack", "rack124");
+		e4.setDescription("this is description 3");
+		limit = 2;
+		BucketQuery query4 = new BucketQuery(bucketFields, limit);
+		query4.put(e1);
+		query4.put(e2);
+		query4.put(e3);
+		query4.put(e4);
+		Map<String, Object> map4 = query4.get();
+		Map<String, Object> o6 = (Map<String, Object>)map4.get("cluster1");
+		List<TaggedLogAPIEntity> o7 = (List<TaggedLogAPIEntity>)o6.get("rack124");
+		Assert.assertEquals(1, o7.size());
+		List<TaggedLogAPIEntity> o8 = (List<TaggedLogAPIEntity>)o6.get("rack123");
+		Assert.assertEquals(o8.size(), 2);
+		List<TaggedLogAPIEntity> o9 = (List<TaggedLogAPIEntity>)o6.get("unassigned");
+		Assert.assertEquals(o9.size(), 1);
+		
+		try{
+			String result = mapper.writeValueAsString(map4);
+			System.out.println(result);
+		}catch(Exception ex){
+			ex.printStackTrace();
+			Assert.fail("can not serialize bucket query result");
+		}
+	}
+
+	@Test
+	public void test() {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestFlatAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestFlatAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestFlatAggregator.java
new file mode 100755
index 0000000..3fdf322
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestFlatAggregator.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.query.aggregate.timeseries.FlatAggregator;
+import junit.framework.Assert;
+
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.entity.test.TestEntity;
+
+public class TestFlatAggregator {
+	private static final Logger LOG = LoggerFactory.getLogger(TestFlatAggregator.class);
+	@Test
+	public void testCounting(){
+		
+	}
+	
+	@Test
+	public void testSummary(){
+		
+	}
+	
+	@Test
+	public void testAverage(){
+		
+	}
+	
+	@Test
+	public void testIterativeAggregation(){
+		
+	}
+	
+	@SuppressWarnings("serial")
+	private TestEntity createEntity(final String cluster, final String datacenter, final String rack, int numHosts, long numClusters){
+		TestEntity entity = new TestEntity();
+		Map<String, String> tags = new HashMap<String, String>(){{
+			put("cluster", cluster);
+			put("datacenter", datacenter);
+			put("rack", rack);
+		}}; 
+		entity.setTags(tags);
+		entity.setNumHosts(numHosts);
+		entity.setNumClusters(numClusters);
+		return entity;
+	}
+
+	@Test
+	public void testZeroGroupbyFieldSingleFunctionForSummary(){
+		TestEntity[] entities = new TestEntity[5];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc1", "rack128", 10, 0);
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc1", "rack126", 15, 2);
+		
+		FlatAggregator agg = new FlatAggregator(new ArrayList<String>(), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 1);
+			Assert.assertEquals(result.get(new ArrayList<String>()).get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+
+					entities[2].getNumHosts()+entities[3].getNumHosts()+entities[4].getNumHosts()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new FlatAggregator(new ArrayList<String>(), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numClusters"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 1);
+			Assert.assertEquals(result.get(new ArrayList<String>()).get(0), (double)(entities[0].getNumClusters()+entities[1].getNumClusters()+
+					entities[2].getNumClusters()+entities[3].getNumClusters()+entities[4].getNumClusters()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new FlatAggregator(new ArrayList<String>(), Arrays.asList(AggregateFunctionType.count), Arrays.asList("*"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 1);
+			Assert.assertEquals(result.get(new ArrayList<String>()).get(0), (double)(5));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+	
+	@Test
+	public void testSingleGroupbyFieldSingleFunctionForSummary(){
+		TestEntity[] entities = new TestEntity[5];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc1", "rack128", 10, 0);
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc2", "rack126", 15, 2);
+		
+		FlatAggregator agg = new FlatAggregator(Arrays.asList("cluster"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 2);
+			Assert.assertEquals(result.get(Arrays.asList("cluster1")).get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2")).get(0), (double)(entities[3].getNumHosts()+entities[4].getNumHosts()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new FlatAggregator(Arrays.asList("datacenter"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 2);
+			Assert.assertEquals(result.get(Arrays.asList("dc1")).get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts())+entities[3].getNumHosts());
+			Assert.assertEquals(result.get(Arrays.asList("dc2")).get(0), (double)(entities[4].getNumHosts()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new FlatAggregator(Arrays.asList("cluster"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numClusters"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 2);
+			Assert.assertEquals(result.get(Arrays.asList("cluster1")).get(0), (double)(entities[0].getNumClusters()+entities[1].getNumClusters()+entities[2].getNumClusters()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2")).get(0), (double)(entities[3].getNumClusters()+entities[4].getNumClusters()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new FlatAggregator(Arrays.asList("datacenter"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numClusters"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 2);
+			Assert.assertEquals(result.get(Arrays.asList("dc1")).get(0), (double)(entities[0].getNumClusters()+entities[1].getNumClusters()+entities[2].getNumClusters())+entities[3].getNumClusters());
+			Assert.assertEquals(result.get(Arrays.asList("dc2")).get(0), (double)(entities[4].getNumClusters()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+	
+	
+	@Test
+	public void testSingleGroupbyFieldSingleFunctionForCount(){
+		TestEntity[] entities = new TestEntity[5];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc1", "rack128", 10, 0);
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc2", "rack126", 15, 2);
+		
+		FlatAggregator agg = new FlatAggregator(Arrays.asList("cluster"), Arrays.asList(AggregateFunctionType.count), Arrays.asList("*"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 2);
+			Assert.assertEquals(result.get(Arrays.asList("cluster1")).get(0), (double)(3));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2")).get(0), (double)(2));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new FlatAggregator(Arrays.asList("datacenter"), Arrays.asList(AggregateFunctionType.count), Arrays.asList("*"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 2);
+			Assert.assertEquals(result.get(Arrays.asList("dc1")).get(0), (double)(4));
+			Assert.assertEquals(result.get(Arrays.asList("dc2")).get(0), (double)(1));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+	
+	@Test
+	public void testMultipleFieldsSingleFunctionForSummary(){
+		TestEntity[] entities = new TestEntity[6];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc1", "rack128", 10, 0);
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc1", "rack126", 15, 2);
+		entities[5] = createEntity("cluster2", null, "rack126", 1, 3);
+		
+		FlatAggregator agg = new FlatAggregator(Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(3, result.size());
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "dc1")).get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "dc1")).get(0), (double)(entities[3].getNumHosts()+entities[4].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "unassigned")).get(0), (double)(entities[5].getNumHosts()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new FlatAggregator(Arrays.asList("cluster", "datacenter", "rack"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(5, result.size());
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "dc1", "rack123")).get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "dc1", "rack128")).get(0), (double)(entities[2].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "dc1", "rack125")).get(0), (double)(entities[3].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "dc1", "rack126")).get(0), (double)(entities[4].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "unassigned", "rack126")).get(0), (double)(entities[5].getNumHosts()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+	
+	@Test
+	public void testMultipleFieldsSingleFunctionForCount(){
+		TestEntity[] entities = new TestEntity[6];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc1", "rack128", 10, 0);
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc1", "rack126", 15, 2);
+		entities[5] = createEntity("cluster2", null, "rack126", 1, 3);
+		
+		FlatAggregator agg = new FlatAggregator(Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.count), Arrays.asList("*"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(3, result.size());
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "dc1")).get(0), (double)(3));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "dc1")).get(0), (double)(2));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "unassigned")).get(0), (double)(1));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new FlatAggregator(Arrays.asList("cluster", "datacenter", "rack"), Arrays.asList(AggregateFunctionType.count), Arrays.asList("*"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(5, result.size());
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "dc1", "rack123")).get(0), (double)(2));
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "dc1", "rack128")).get(0), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "dc1", "rack125")).get(0), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "dc1", "rack126")).get(0), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "unassigned", "rack126")).get(0), (double)(1));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+	
+	@Test
+	public void testSingleGroupbyFieldMultipleFunctions(){
+		TestEntity[] entities = new TestEntity[5];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc1", "rack128", 10, 0);
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc1", "rack126", 15, 2);
+		
+		FlatAggregator agg = new FlatAggregator(Arrays.asList("cluster"), Arrays.asList(AggregateFunctionType.sum, AggregateFunctionType.count), 
+				Arrays.asList("numHosts", "*"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 2);
+			Assert.assertEquals(result.get(Arrays.asList("cluster1")).get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster1")).get(1), (double)(3));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2")).get(0), (double)(entities[3].getNumHosts()+entities[4].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2")).get(1), (double)(2));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new FlatAggregator(Arrays.asList("datacenter"), Arrays.asList(AggregateFunctionType.count, AggregateFunctionType.sum), Arrays.asList("*", "numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 1);
+			Assert.assertEquals(result.get(Arrays.asList("dc1")).get(0), (double)(5));
+			Assert.assertEquals(result.get(Arrays.asList("dc1")).get(1), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()+entities[3].getNumHosts())+entities[4].getNumHosts());
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new FlatAggregator(Arrays.asList("datacenter"), Arrays.asList(AggregateFunctionType.count, AggregateFunctionType.sum, AggregateFunctionType.sum), 
+				Arrays.asList("*", "numHosts", "numClusters"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 1);
+			Assert.assertEquals(result.get(Arrays.asList("dc1")).get(0), (double)(5));
+			Assert.assertEquals(result.get(Arrays.asList("dc1")).get(1), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()+entities[3].getNumHosts())+entities[4].getNumHosts());
+			Assert.assertEquals(result.get(Arrays.asList("dc1")).get(2), (double)(entities[0].getNumClusters()+entities[1].getNumClusters()+entities[2].getNumClusters()+entities[3].getNumClusters())+entities[4].getNumClusters());
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+	
+	@Test
+	public void testMultipleGroupbyFieldsMultipleFunctions(){
+		TestEntity[] entities = new TestEntity[5];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc1", "rack128", 10, 0);
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc1", "rack126", 15, 2);
+		
+		FlatAggregator agg = new FlatAggregator(Arrays.asList("cluster", "rack"), Arrays.asList(AggregateFunctionType.sum, AggregateFunctionType.count), 
+				Arrays.asList("numHosts", "*"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = agg.result();
+			Assert.assertEquals(result.size(), 4);
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "rack123")).get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "rack123")).get(1), (double)(2));
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "rack128")).get(0), (double)(entities[2].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "rack128")).get(1), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "rack125")).get(0), (double)(entities[3].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "rack125")).get(1), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "rack126")).get(0), (double)(entities[4].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "rack126")).get(1), (double)(1));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestGroupbyFieldComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestGroupbyFieldComparator.java b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestGroupbyFieldComparator.java
new file mode 100644
index 0000000..5fec950
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestGroupbyFieldComparator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.test;
+
+import java.util.Arrays;
+
+import org.apache.eagle.query.aggregate.timeseries.GroupbyFieldsComparator;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestGroupbyFieldComparator {
+	@Test
+	public void testStringListCompare(){
+		GroupbyFieldsComparator c = new GroupbyFieldsComparator();
+		Assert.assertTrue(c.compare(Arrays.asList("ab"), Arrays.asList("ac"))<0);
+		Assert.assertTrue(c.compare(Arrays.asList("xy"), Arrays.asList("cd"))>0);
+		Assert.assertTrue(c.compare(Arrays.asList("xy"), Arrays.asList("xy"))==0);
+		Assert.assertTrue(c.compare(Arrays.asList("xy", "ab"), Arrays.asList("xy", "ac"))<0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestHierarchicalAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestHierarchicalAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestHierarchicalAggregator.java
new file mode 100755
index 0000000..cbcab0f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestHierarchicalAggregator.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.query.aggregate.timeseries.PostHierarchicalAggregateSort;
+import org.apache.eagle.query.aggregate.timeseries.HierarchicalAggregateEntity;
+import org.apache.eagle.query.aggregate.timeseries.HierarchicalAggregator;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import junit.framework.Assert;
+
+import org.apache.eagle.query.aggregate.timeseries.SortOption;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.entity.test.TestEntity;
+
+
+public class TestHierarchicalAggregator {
+private final static Logger LOG = LoggerFactory.getLogger(TestHierarchicalAggregator.class);
+
+	@SuppressWarnings("serial")
+	private TestEntity createEntity(final String cluster, final String datacenter, final String rack, int numHosts, long numClusters){
+		TestEntity entity = new TestEntity();
+		Map<String, String> tags = new HashMap<String, String>(){{
+			put("cluster", cluster);
+			put("datacenter", datacenter);
+			put("rack", rack);
+		}}; 
+		entity.setTags(tags);
+		entity.setNumHosts(numHosts);
+		entity.setNumClusters(numClusters);
+		return entity;
+	}
+	
+	@SuppressWarnings("serial")
+	private TestEntity createEntityWithoutDatacenter(final String cluster, final String rack, int numHosts, long numClusters){
+		TestEntity entity = new TestEntity();
+		Map<String, String> tags = new HashMap<String, String>(){{
+			put("cluster", cluster);
+			put("rack", rack);
+		}}; 
+		entity.setTags(tags);
+		entity.setNumHosts(numHosts);
+		entity.setNumClusters(numClusters);
+		return entity;
+	}
+
+	private void writeToJson(String message, Object obj){
+		JsonFactory factory = new JsonFactory();
+		ObjectMapper mapper = new ObjectMapper(factory);
+		try{
+			String result = mapper.writeValueAsString(obj);
+			LOG.info(message + ":\n" + result);
+		}catch(Exception ex){
+			LOG.error("Can not write json", ex);
+			Assert.fail("Can not write json");
+		}
+	}
+	
+	@Test
+	public void testZeroGropubyFieldHierarchicalAggregator(){ 
+		TestEntity[] entities = new TestEntity[5];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc1", "rack128", 10, 0);
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc1", "rack126", 15, 2);
+		HierarchicalAggregator agg = new HierarchicalAggregator(new ArrayList<String>(), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			HierarchicalAggregateEntity result = agg.result();
+			writeToJson("After aggregate", result);
+			Assert.assertEquals(result.getChildren().size(), 0);
+			Assert.assertEquals(result.getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()+entities[3].getNumHosts()+entities[4].getNumHosts()));
+
+			// test sort by function1
+			SortOption so = new SortOption();
+			so.setIndex(0);
+			so.setAscendant(true);
+			List<SortOption> sortOptions = Arrays.asList(so);
+			PostHierarchicalAggregateSort.sort(result, sortOptions);
+			writeToJson("After sort" ,result);
+			Assert.assertEquals(null, result.getChildren());
+			Assert.assertEquals(0, result.getSortedList().size());
+			Assert.assertEquals(result.getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()+entities[3].getNumHosts()+entities[4].getNumHosts()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+	
+	@Test
+	public void testSingleGropubyFieldHierarchicalAggregator(){ 
+		TestEntity[] entities = new TestEntity[5];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc2", "rack128", 10, 0);
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc1", "rack126", 15, 2);
+		HierarchicalAggregator agg = new HierarchicalAggregator(Arrays.asList("cluster"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			HierarchicalAggregateEntity result = agg.result();
+			writeToJson("After aggregate" ,result);
+			Assert.assertEquals(result.getChildren().size(), 2);
+			Assert.assertEquals(result.getChildren().get("cluster1").getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(result.getChildren().get("cluster2").getValues().get(0), (double)(entities[3].getNumHosts()+entities[4].getNumHosts()));
+			
+			// test sort by function 1
+			SortOption so = new SortOption();
+			so.setIndex(0);
+			so.setAscendant(true);
+			List<SortOption> sortOptions = Arrays.asList(so);
+			PostHierarchicalAggregateSort.sort(result, sortOptions);
+			writeToJson("After sort" ,result);
+			Assert.assertEquals(null, result.getChildren());
+			Assert.assertEquals(2, result.getSortedList().size(), 2);
+			Iterator<Map.Entry<String, HierarchicalAggregateEntity>> it = result.getSortedList().iterator();
+			Assert.assertEquals(true, it.hasNext());
+			Map.Entry<String, HierarchicalAggregateEntity> entry = it.next();
+			Assert.assertEquals("cluster2", entry.getKey());
+			Assert.assertEquals(entry.getValue().getValues().get(0), (double)(entities[3].getNumHosts()+entities[4].getNumHosts()));
+			
+			Assert.assertEquals(true, it.hasNext());
+			entry = it.next();
+			Assert.assertEquals("cluster1", entry.getKey());
+			Assert.assertEquals(entry.getValue().getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new HierarchicalAggregator(Arrays.asList("datacenter"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			HierarchicalAggregateEntity result = agg.result();
+			writeToJson("After aggregate" , result);
+			Assert.assertEquals(result.getChildren().size(), 2);
+			Assert.assertEquals(result.getChildren().get("dc1").getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[3].getNumHosts()+entities[4].getNumHosts()));
+			Assert.assertEquals(result.getChildren().get("dc2").getValues().get(0), (double)(entities[2].getNumHosts()));
+			
+			// test sort by function 1
+			SortOption so = new SortOption();
+			so.setIndex(0);
+			so.setAscendant(true);
+			List<SortOption> sortOptions = Arrays.asList(so);
+			PostHierarchicalAggregateSort.sort(result, sortOptions);
+			writeToJson("After sort" ,result);
+			Assert.assertEquals(null, result.getChildren());
+			Assert.assertEquals(2, result.getSortedList().size(), 2);
+			Iterator<Map.Entry<String, HierarchicalAggregateEntity>> it = result.getSortedList().iterator();
+			Assert.assertEquals(true, it.hasNext());
+			Map.Entry<String, HierarchicalAggregateEntity> entry = it.next();
+			Assert.assertEquals("dc2", entry.getKey());
+			Assert.assertEquals(entry.getValue().getValues().get(0), (double)(entities[2].getNumHosts()));
+			
+			Assert.assertEquals(true, it.hasNext());
+			entry = it.next();
+			Assert.assertEquals("dc1", entry.getKey());
+			Assert.assertEquals(entry.getValue().getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[3].getNumHosts()+entities[4].getNumHosts()));			
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new HierarchicalAggregator(Arrays.asList("cluster"), Arrays.asList(AggregateFunctionType.sum, AggregateFunctionType.sum), Arrays.asList("numHosts", "numClusters"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			HierarchicalAggregateEntity result = agg.result();
+			writeToJson("After aggregate" , result);
+			Assert.assertEquals(result.getChildren().size(), 2);
+			Assert.assertEquals(2, result.getChildren().get("cluster1").getValues().size());
+			Assert.assertEquals(result.getChildren().get("cluster1").getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(result.getChildren().get("cluster1").getValues().get(1), (double)(entities[0].getNumClusters()+entities[1].getNumClusters()+entities[2].getNumClusters()));
+			Assert.assertEquals(2, result.getChildren().get("cluster2").getValues().size());
+			Assert.assertEquals(result.getChildren().get("cluster2").getValues().get(0), (double)(entities[3].getNumHosts()+entities[4].getNumHosts()));
+			Assert.assertEquals(result.getChildren().get("cluster2").getValues().get(1), (double)(entities[3].getNumClusters()+entities[4].getNumClusters()));
+			
+			// test sort by function 2
+			SortOption so = new SortOption();
+			so.setIndex(1);
+			so.setAscendant(true);
+			List<SortOption> sortOptions = Arrays.asList(so);
+			PostHierarchicalAggregateSort.sort(result, sortOptions);
+			writeToJson("After sort" ,result);
+			Assert.assertEquals(null, result.getChildren());
+			Assert.assertEquals(2, result.getSortedList().size(), 2);
+			Iterator<Map.Entry<String, HierarchicalAggregateEntity>> it = result.getSortedList().iterator();
+			Assert.assertEquals(true, it.hasNext());
+			Map.Entry<String, HierarchicalAggregateEntity> entry = it.next();
+			Assert.assertEquals("cluster1", entry.getKey());
+			Assert.assertEquals(entry.getValue().getValues().get(1), (double)(entities[0].getNumClusters()+entities[1].getNumClusters()+entities[2].getNumClusters()));
+			
+			Assert.assertEquals(true, it.hasNext());
+			entry = it.next();
+			Assert.assertEquals("cluster2", entry.getKey());
+			Assert.assertEquals(entry.getValue().getValues().get(1), (double)(entities[3].getNumClusters()+entities[4].getNumClusters()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+	
+	
+	@Test
+	public void testMultipleGropubyFieldsHierarchicalAggregator(){ 
+		TestEntity[] entities = new TestEntity[5];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc2", "rack128", 10, 0);
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc1", "rack126", 15, 2);
+		HierarchicalAggregator agg = new HierarchicalAggregator(Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			HierarchicalAggregateEntity result = agg.result();
+			writeToJson("After aggregate", result);
+			Assert.assertEquals(2, result.getChildren().size());
+			Assert.assertEquals(66.0, (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()+entities[3].getNumHosts()+entities[4].getNumHosts()));
+			Assert.assertEquals(result.getChildren().get("cluster1").getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(2, result.getChildren().get("cluster1").getChildren().size());
+			Assert.assertEquals(result.getChildren().get("cluster1").getChildren().get("dc1").getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()));
+			Assert.assertEquals(result.getChildren().get("cluster1").getChildren().get("dc2").getValues().get(0), (double)(entities[2].getNumHosts()));
+			
+			Assert.assertEquals(result.getChildren().get("cluster2").getValues().get(0), (double)(entities[3].getNumHosts()+entities[4].getNumHosts()));
+			Assert.assertEquals(1, result.getChildren().get("cluster2").getChildren().size());
+			Assert.assertEquals(result.getChildren().get("cluster2").getChildren().get("dc1").getValues().get(0), (double)(entities[3].getNumHosts()+entities[4].getNumHosts()));
+			
+			// test sort by function 2
+			SortOption so = new SortOption();
+			so.setIndex(0);
+			so.setAscendant(true);
+			List<SortOption> sortOptions = Arrays.asList(so);
+			PostHierarchicalAggregateSort.sort(result, sortOptions);
+			writeToJson("After sort" ,result);
+			Assert.assertEquals(null, result.getChildren());
+			Assert.assertEquals(2, result.getSortedList().size());
+			Iterator<Map.Entry<String, HierarchicalAggregateEntity>> it = result.getSortedList().iterator();
+			Assert.assertEquals(true, it.hasNext());
+			Map.Entry<String, HierarchicalAggregateEntity> entry = it.next();
+			Assert.assertEquals("cluster2", entry.getKey());
+			Assert.assertEquals(entry.getValue().getValues().get(0), (double)(entities[3].getNumHosts()+entities[4].getNumHosts()));
+			
+			Assert.assertEquals(true, it.hasNext());
+			entry = it.next();
+			Assert.assertEquals("cluster1", entry.getKey());
+			Assert.assertEquals(entry.getValue().getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()));			
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+	
+	@Test
+	public void testUnassigned(){ 
+		TestEntity[] entities = new TestEntity[5];
+		entities[0] = createEntityWithoutDatacenter("cluster1", "rack123", 12, 2);
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1);
+		entities[2] = createEntity("cluster1", "dc1", "rack128", 10, 0);
+		entities[3] = createEntityWithoutDatacenter("cluster2", "rack125", 9, 2);
+		entities[4] = createEntity("cluster2", "dc1", "rack126", 15, 2);
+		HierarchicalAggregator agg = new HierarchicalAggregator(Arrays.asList("datacenter"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			HierarchicalAggregateEntity result = agg.result();
+			writeToJson("After aggregate", result);
+			Assert.assertEquals(result.getChildren().size(), 2);
+			Assert.assertEquals(result.getChildren().get("dc1").getValues().get(0), (double)(entities[1].getNumHosts()+entities[2].getNumHosts())+entities[4].getNumHosts());
+			Assert.assertEquals(result.getChildren().get("unassigned").getValues().get(0), (double)(entities[0].getNumHosts()+entities[3].getNumHosts()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		agg = new HierarchicalAggregator(Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"));
+		try{
+			for(TestEntity e : entities){
+				agg.accumulate(e);
+			}
+			HierarchicalAggregateEntity result = agg.result();
+			writeToJson("After aggregate", result);
+			Assert.assertEquals(result.getChildren().size(), 2);
+			Assert.assertEquals(result.getChildren().get("cluster1").getValues().get(0), (double)(entities[0].getNumHosts()+entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(2, result.getChildren().get("cluster1").getChildren().size());
+			Assert.assertEquals(result.getChildren().get("cluster1").getChildren().get("dc1").getValues().get(0), (double)(entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(result.getChildren().get("cluster1").getChildren().get("unassigned").getValues().get(0), (double)(entities[0].getNumHosts()));
+			
+			Assert.assertEquals(result.getChildren().get("cluster2").getValues().get(0), (double)(entities[3].getNumHosts()+entities[4].getNumHosts()));
+			Assert.assertEquals(result.getChildren().get("cluster2").getChildren().get("dc1").getValues().get(0), (double)(entities[4].getNumHosts()));
+			Assert.assertEquals(result.getChildren().get("cluster2").getChildren().get("unassigned").getValues().get(0), (double)(entities[3].getNumHosts()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestPostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestPostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestPostFlatAggregateSort.java
new file mode 100644
index 0000000..9751e27
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestPostFlatAggregateSort.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.query.aggregate.timeseries.PostFlatAggregateSort;
+import junit.framework.Assert;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+import org.apache.eagle.query.aggregate.timeseries.SortOption;
+
+public class TestPostFlatAggregateSort {
+	private static final Logger logger = Logger.getLogger(TestPostFlatAggregateSort.class);
+	@Test
+	public void testSort(){
+		final String aggField1Value1 = "field1value1";
+		final String aggField1Value2 = "field1value2";
+		final String aggField2Value1 = "field2value1";
+		final String aggField2Value2 = "field2value2";
+		final Double d1 = new Double(1);
+		final Double d2 = new Double(2);
+		final Double d3 = new Double(3);
+		final Double d4 = new Double(4);
+		@SuppressWarnings("serial")
+		Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>(){{
+			put(Arrays.asList(aggField1Value1, aggField2Value1), Arrays.asList(d2, d3));
+			put(Arrays.asList(aggField1Value2, aggField2Value2), Arrays.asList(d1, d4));
+		}};
+		
+		// sort by function1
+		SortOption so = new SortOption();
+		so.setIndex(0);
+		so.setAscendant(true);
+		List<SortOption> sortOptions = Arrays.asList(so);
+		List<Map.Entry<List<String>, List<Double>>> set = 
+				PostFlatAggregateSort.sort(result, sortOptions, 0);
+		JsonFactory factory = new JsonFactory();
+		ObjectMapper mapper = new ObjectMapper(factory);
+		Assert.assertEquals(2, set.size());
+		Iterator<Map.Entry<List<String>, List<Double>>> it = set.iterator();
+		Map.Entry<List<String>, List<Double>> e = it.next();
+		Assert.assertTrue(e.getKey().get(0).equals(aggField1Value2));
+		Assert.assertTrue(e.getValue().get(0).equals(d1));
+		e = it.next();
+		Assert.assertTrue(e.getKey().get(0).equals(aggField1Value1));
+		Assert.assertTrue(e.getValue().get(0).equals(d2));
+		try{
+			String value = mapper.writeValueAsString(set);
+			logger.info(value);
+		}catch(Exception ex){
+			logger.error("fail with mapping", ex);
+			Assert.fail("fail with mapping");
+		}
+		
+		
+		// sort by function2
+		so = new SortOption();
+		so.setIndex(1);
+		so.setAscendant(true);
+		sortOptions = Arrays.asList(so);
+		set = PostFlatAggregateSort.sort(result, sortOptions, 0);
+		factory = new JsonFactory();
+		mapper = new ObjectMapper(factory);
+		Assert.assertEquals(2, set.size());
+		it = set.iterator();
+		e = it.next();
+		Assert.assertTrue(e.getKey().get(0).equals(aggField1Value1));
+		Assert.assertTrue(e.getValue().get(0).equals(d2));
+		e = it.next();
+		Assert.assertTrue(e.getKey().get(0).equals(aggField1Value2));
+		Assert.assertTrue(e.getValue().get(0).equals(d1));
+		try{
+			String value = mapper.writeValueAsString(set);
+			logger.info(value);
+		}catch(Exception ex){
+			logger.error("fail with mapping", ex);
+			Assert.fail("fail with mapping");
+		}
+	}
+	
+	@Test
+	public void testDefaultSort(){
+		final String aggField1Value1 = "xyz";
+		final String aggField1Value2 = "xyz";
+		final String aggField2Value1 = "abd";
+		final String aggField2Value2 = "abc";
+		final Double d1 = new Double(1);
+		final Double d2 = new Double(1);
+		@SuppressWarnings("serial")
+		Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>(){{
+			put(Arrays.asList(aggField1Value1, aggField2Value1), Arrays.asList(d2));
+			put(Arrays.asList(aggField1Value2, aggField2Value2), Arrays.asList(d1));
+		}};
+		
+		// sort by function1
+		SortOption so = new SortOption();
+		so.setIndex(0);
+		so.setAscendant(true);
+		List<SortOption> sortOptions = Arrays.asList(so);
+		List<Map.Entry<List<String>, List<Double>>> set = 
+				PostFlatAggregateSort.sort(result, sortOptions, 0);
+		JsonFactory factory = new JsonFactory();
+		ObjectMapper mapper = new ObjectMapper(factory);
+		Assert.assertEquals(2, set.size());
+		Iterator<Map.Entry<List<String>, List<Double>>> it = set.iterator();
+		Map.Entry<List<String>, List<Double>> e = it.next();
+		Assert.assertTrue(e.getKey().get(0).equals(aggField1Value2));
+		Assert.assertTrue(e.getValue().get(0).equals(d1));
+		e = it.next();
+		Assert.assertTrue(e.getKey().get(0).equals(aggField1Value1));
+		Assert.assertTrue(e.getValue().get(0).equals(d2));
+		try{
+			String value = mapper.writeValueAsString(set);
+			logger.info(value);
+		}catch(Exception ex){
+			logger.error("fail with mapping", ex);
+			Assert.fail("fail with mapping");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestTimeSeriesAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestTimeSeriesAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestTimeSeriesAggregator.java
new file mode 100755
index 0000000..d953cfa
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestTimeSeriesAggregator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query.aggregate.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator;
+import junit.framework.Assert;
+
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.entity.test.TestEntity;
+
+public class TestTimeSeriesAggregator {
+	private static final Logger LOG = LoggerFactory.getLogger(TestFlatAggregator.class);
+	@SuppressWarnings("serial")
+	private TestEntity createEntity(final String cluster, final String datacenter, final String rack, int numHosts, long numClusters, long timestamp){
+		TestEntity entity = new TestEntity();
+		Map<String, String> tags = new HashMap<String, String>(){{
+			put("cluster", cluster);
+			put("datacenter", datacenter);
+			put("rack", rack);
+		}}; 
+		entity.setTags(tags);
+		entity.setNumHosts(numHosts);
+		entity.setNumClusters(numClusters);
+		entity.setTimestamp(timestamp);
+		return entity;
+	}
+	
+	@Test
+	public void testTimeSeriesAggregator(){
+		TestEntity[] entities = new TestEntity[8];
+		entities[0] = createEntity("cluster1", "dc1", "rack123", 12, 2, 1386120000*1000); // bucket 0
+		entities[1] = createEntity("cluster1", "dc1", "rack123", 20, 1, 1386121060*1000); // bucket 17
+		entities[2] = createEntity("cluster1", "dc1", "rack128", 10, 0, 1386121070*1000); // bucket 17
+		entities[3] = createEntity("cluster2", "dc1", "rack125", 9,   2, 1386122122*1000); // bucket 35
+		entities[4] = createEntity("cluster2", "dc1", "rack126", 15,  5, 1386123210*1000); // bucket 53
+		entities[5] = createEntity("cluster2", "dc1", "rack234", 25,  1, 1386123480*1000); // bucket 58
+		entities[6] = createEntity("cluster2", "dc1", "rack234", 12,  0, 1386123481*1000); // bucket 58
+		entities[7] = createEntity("cluster1", "dc1", "rack123", 3,    2, 1386123482*1000); // bucket 58
+		
+		TimeSeriesAggregator tsAgg = new TimeSeriesAggregator(Arrays.asList("cluster"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"),
+				1386120000*1000, 1386123600*1000, 60*1000);
+		try{
+			for(TestEntity e : entities){
+				tsAgg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = tsAgg.result();
+			Assert.assertEquals(result.size(), 6);
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "0")).get(0), (double)(entities[0].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "17")).get(0), (double)(entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "35")).get(0), (double)(entities[3].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "53")).get(0), (double)(entities[4].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "58")).get(0), (double)(entities[5].getNumHosts()+entities[6].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "58")).get(0), (double)(entities[7].getNumHosts()));
+			
+			Map<List<String>, List<double[]>> tsResult = tsAgg.getMetric();
+			Assert.assertEquals(tsResult.size(), 2);
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster1")).get(0).length, 60);
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster1")).get(0)[0], (double)(entities[0].getNumHosts()));
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster1")).get(0)[17], (double)(entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster2")).get(0)[35], (double)(entities[3].getNumHosts()));
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster2")).get(0)[53], (double)(entities[4].getNumHosts()));
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster2")).get(0)[58], (double)(entities[5].getNumHosts()+entities[6].getNumHosts()));
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster1")).get(0)[58], (double)(entities[7].getNumHosts()));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		tsAgg = new TimeSeriesAggregator(new ArrayList<String>(), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("numHosts"), 
+				1386120000*1000, 1386123600*1000, 60*1000);
+		try{
+			for(TestEntity e : entities){
+				tsAgg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = tsAgg.result();
+			Assert.assertEquals(result.size(), 5);
+			Assert.assertEquals(result.get(Arrays.asList("0")).get(0), (double)(entities[0].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("17")).get(0), (double)(entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("35")).get(0), (double)(entities[3].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("53")).get(0), (double)(entities[4].getNumHosts()));
+			Assert.assertEquals(result.get(Arrays.asList("58")).get(0), (double)(entities[5].getNumHosts()+entities[6].getNumHosts()+entities[7].getNumHosts()));
+			
+			Map<List<String>, List<double[]>> tsResult = tsAgg.getMetric();
+			Assert.assertEquals(tsResult.size(), 1);
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0).length, 60);
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0)[0], (double)(entities[0].getNumHosts()));
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0)[17], (double)(entities[1].getNumHosts()+entities[2].getNumHosts()));
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0)[35], (double)(entities[3].getNumHosts()));
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0)[53], (double)(entities[4].getNumHosts()));
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0)[58], (double)(entities[5].getNumHosts()+entities[6].getNumHosts()+entities[7].getNumHosts()));		
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		tsAgg = new TimeSeriesAggregator(Arrays.asList("cluster"), Arrays.asList(AggregateFunctionType.count), Arrays.asList("*"), 
+				1386120000*1000, 1386123600*1000, 60*1000);
+		try{
+			for(TestEntity e : entities){
+				tsAgg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = tsAgg.result();
+			Assert.assertEquals(result.size(), 6);
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "0")).get(0), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "17")).get(0), (double)(2));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "35")).get(0), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "53")).get(0), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("cluster2", "58")).get(0), (double)(2));
+			Assert.assertEquals(result.get(Arrays.asList("cluster1", "58")).get(0), (double)(1));
+			
+			Map<List<String>, List<double[]>> tsResult = tsAgg.getMetric();
+			Assert.assertEquals(tsResult.size(), 2);
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster1")).get(0).length, 60);
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster1")).get(0)[0], (double)(1));
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster1")).get(0)[17], (double)(2));
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster2")).get(0)[35], (double)(1));
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster2")).get(0)[53], (double)(1));
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster2")).get(0)[58], (double)(2));
+			Assert.assertEquals(tsResult.get(Arrays.asList("cluster1")).get(0)[58], (double)(1));
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+		
+		tsAgg = new TimeSeriesAggregator(new ArrayList<String>(), Arrays.asList(AggregateFunctionType.count), Arrays.asList("*"), 
+				1386120000*1000, 1386123600*1000, 60*1000);
+		try{
+			for(TestEntity e : entities){
+				tsAgg.accumulate(e);
+			}
+			Map<List<String>, List<Double>> result = tsAgg.result();
+			Assert.assertEquals(result.size(), 5);
+			Assert.assertEquals(result.get(Arrays.asList("0")).get(0), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("17")).get(0), (double)(2));
+			Assert.assertEquals(result.get(Arrays.asList("35")).get(0), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("53")).get(0), (double)(1));
+			Assert.assertEquals(result.get(Arrays.asList("58")).get(0), (double)(3));
+			
+			Map<List<String>, List<double[]>> tsResult = tsAgg.getMetric();
+			Assert.assertEquals(tsResult.size(), 1);
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0).length, 60);
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0)[0], (double)(1));
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0)[17], (double)(2));
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0)[35], (double)(1));
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0)[53], (double)(1));
+			Assert.assertEquals(tsResult.get(new ArrayList<String>()).get(0)[58], (double)(3));		
+		}catch(Exception ex){
+			LOG.error("Can not aggregate", ex);
+			Assert.fail("Can not aggregate");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/eagle/service/common/EagleExceptionWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/eagle/service/common/EagleExceptionWrapper.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/eagle/service/common/EagleExceptionWrapper.java
deleted file mode 100644
index 936970f..0000000
--- a/eagle-core/eagle-query/eagle-service-base/src/main/java/eagle/service/common/EagleExceptionWrapper.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.common;
-
-public class EagleExceptionWrapper {
-	private final static int MAX_DEPTH = 10;
-	
-	public static String wrap(Exception ex){
-		return wrap(ex, EagleExceptionWrapper.MAX_DEPTH);
-	}
-	
-	public static String wrap(Exception ex, int maxdepth){
-		int d = maxdepth;
-		if(d <= 0)
-			d = EagleExceptionWrapper.MAX_DEPTH;
-		int index = 0;
-		StringBuffer sb = new StringBuffer();
-		sb.append(ex);
-		sb.append(System.getProperty("line.separator"));
-		for(StackTraceElement element : ex.getStackTrace()){
-			sb.append(element.toString());
-			sb.append(System.getProperty("line.separator"));
-			if(++index >= d)
-				break;
-		}
-		return sb.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/eagle/service/common/SplitFullScanEntityReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/eagle/service/common/SplitFullScanEntityReader.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/eagle/service/common/SplitFullScanEntityReader.java
deleted file mode 100755
index 14b323c..0000000
--- a/eagle-core/eagle-query/eagle-service-base/src/main/java/eagle/service/common/SplitFullScanEntityReader.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.common;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.log.entity.GenericEntityBatchReader;
-import eagle.log.entity.RowkeyBuilder;
-import eagle.log.entity.SearchCondition;
-import eagle.log.entity.meta.EntityDefinition;
-import eagle.log.entity.meta.EntityDefinitionManager;
-import eagle.query.ListQueryCompiler;
-import eagle.common.DateTimeUtil;
-import eagle.common.EagleBase64Wrapper;
-
-/**
- * Support stream based entity read. Internally it splits entity fetching to multiple threads to improve 
- * the performance. However, it doesn't support multi-threading for client to read entities from result set.
- * 
- */
-public class SplitFullScanEntityReader<ENTITY extends TaggedLogAPIEntity> {
-
-	// class members
-	public static final int DEFAULT_BUFFER_SIZE = 10 * 1000;
-	public static final int MAX_WRITE_TIME_OUT_IN_SECONDS = 60;
-	private static final Logger LOG = LoggerFactory.getLogger(SplitFullScanEntityReader.class);
-	private static final TaggedLogAPIEntity COMPLETED_ENTITY = new TaggedLogAPIEntity();
-
-	// instance members
-	private final int splits;
-	private final String query;
-	private final long startTime;
-	private final long endTime;
-	private final String startRowkey;
-	private final int pageSize;
-	private final int bufferSize;
-	
-	public SplitFullScanEntityReader(String query,
-			String startTime, String endTime,
-			int splits, String startRowkey, int pageSize) {
-		this(
-				query, 
-				DateTimeUtil.humanDateToSecondsWithoutException(startTime) * 1000,
-				DateTimeUtil.humanDateToSecondsWithoutException(endTime) * 1000,
-				splits,
-				startRowkey,
-				pageSize
-			);
-	}
-	
-	public SplitFullScanEntityReader(String query, long startTime, long endTime,
-			int splits, String startRowkey, int pageSize) {
-		this(query, startTime, endTime, splits, startRowkey, pageSize, 
-				DEFAULT_BUFFER_SIZE);
-	}
-	
-	public SplitFullScanEntityReader(String query, long startTime, long endTime,
-			int splits, String startRowkey, int pageSize, int bufferSize) {
-		this.query = query;
-		this.startTime = startTime;
-		this.endTime = endTime;
-		this.splits = splits;
-		this.startRowkey = startRowkey;
-		this.pageSize = pageSize;
-		this.bufferSize = bufferSize;
-	}
-	
-	public EntityResultSet<ENTITY> read() throws Exception {
-		final EntityResultSet<ENTITY> resultSet = new EntityResultSet<ENTITY>(new ArrayBlockingQueue<TaggedLogAPIEntity>(bufferSize));
-		final List<GenericEntityBatchReader> readers = createSplitThreads();
-		
-		final int size = readers.size();
-		if (size > 0) {
-			final AtomicInteger threadCount = new AtomicInteger(size);
-			final AtomicInteger entityCount = new AtomicInteger(0);
-			for (GenericEntityBatchReader reader : readers) {
-				final EntityFetchThread<ENTITY> thread = new EntityFetchThread<ENTITY>(reader, threadCount, entityCount, resultSet);
-				thread.start();
-			}
-		} else {
-			resultSet.getQueue().add(COMPLETED_ENTITY);
-		}
-		return resultSet;
-	}
-
-	protected List<GenericEntityBatchReader> createSplitThreads() throws Exception {
-		
-		final List<GenericEntityBatchReader> readers = new ArrayList<GenericEntityBatchReader>();
-		final ListQueryCompiler comp = new ListQueryCompiler(query);
-		final EntityDefinition entityDef = EntityDefinitionManager.getEntityByServiceName(comp.serviceName());
-		if (entityDef == null) {
-			throw new IllegalArgumentException("Invalid entity name: " + comp.serviceName());
-		}
-		
-		// TODO: For now we don't support one query to query multiple partitions. In future 
-		// if partition is defined for the entity, internally We need to spawn multiple
-		// queries and send one query for each search condition for each partition
-		final List<String[]> partitionValues = comp.getQueryPartitionValues();
-		partitionConstraintValidate(partitionValues, query);
-		
-		long lastTimestamp = Long.MAX_VALUE;
-		if (startRowkey != null) {
-			final byte[] lastRowkey = EagleBase64Wrapper.decode(startRowkey);
-			lastTimestamp = RowkeyBuilder.getTimestamp(lastRowkey, entityDef);
-		}
-		
-		final long duration = (endTime - startTime) / splits;
-		for (int i = 0; i < splits; ++i) {
-			
-			final long slotStartTime = startTime + (i * duration);
-			if (slotStartTime > lastTimestamp) {
-				// ignore this slot
-				continue;
-			}
-			final long slotEndTime = startTime + ((i + 1) * duration);
-			final SearchCondition condition = new SearchCondition();
-			final String slotStartTimeString = DateTimeUtil.secondsToHumanDate(slotStartTime / 1000);
-			final String slotEndTimeString = DateTimeUtil.secondsToHumanDate(slotEndTime / 1000);
-			condition.setStartTime(slotStartTimeString);
-			condition.setEndTime(slotEndTimeString);
-			
-			condition.setFilter(comp.filter());
-			condition.setQueryExpression(comp.getQueryExpression());
-			if (partitionValues != null) {
-				condition.setPartitionValues(Arrays.asList(partitionValues.get(0)));
-			}
-			// Should be careful to the startRowkey setting. Only set startRowkey when 
-			// lastTimestamp is within the slot time range.
-			if (startRowkey != null && lastTimestamp >= startTime && lastTimestamp < endTime) {
-				condition.setStartRowkey(startRowkey);
-			}
-			condition.setPageSize(pageSize);
-			
-			if (comp.hasAgg()) {
-				List<String> groupbyFields = comp.groupbyFields();
-				List<String> outputFields = new ArrayList<String>();
-				if(groupbyFields != null){
-					outputFields.addAll(groupbyFields);
-				}
-				outputFields.addAll(comp.aggregateFields());
-				condition.setOutputFields(outputFields);
-			} else {
-				condition.setOutputFields(comp.outputFields());
-			}
-			readers.add(new GenericEntityBatchReader(comp.serviceName(), condition));
-		}
-		return readers;
-	}
-	
-
-	private static void partitionConstraintValidate(List<String[]> partitionValues, String query) {
-		if (partitionValues != null && partitionValues.size() > 1) {
-			final String[] values = partitionValues.get(0);
-			for (int i = 1; i < partitionValues.size(); ++i) {
-				final String[] tmpValues = partitionValues.get(i);
-				for (int j = 0; j < values.length; ++j) {
-					if (values[j] == null || (!values[j].equals(tmpValues[j]))) {
-						final String errMsg = "One query for multiple partitions is NOT allowed for now! Query: " + query;
-						LOG.error(errMsg);
-						throw new IllegalArgumentException(errMsg);
-					}
-				}
-			}
-		}
-	}
-	
-	
-	@SuppressWarnings("unchecked")
-	public static class EntityResultSet<ENTITY extends TaggedLogAPIEntity> {
-		private static final long DEFAULT_TIMEOUT_IN_MS = 1000;
-
-		private boolean fetchCompleted = false;
-		private final BlockingQueue<TaggedLogAPIEntity> queue;
-		private volatile Exception ex = null;
-
-		public EntityResultSet(BlockingQueue<TaggedLogAPIEntity> queue) {
-			this.queue = queue;
-		}
-		
-		public boolean hasMoreData() {
-			return queue.size() > 0 || (!fetchCompleted);
-		}
-		
-		public ENTITY next(long timeout, TimeUnit unit) throws InterruptedException {
-			if (fetchCompleted) {
-				return null;
-			}
-			final TaggedLogAPIEntity entity = queue.poll(timeout, unit);
-			if (COMPLETED_ENTITY.equals(entity)) {
-				fetchCompleted = true;
-				return null;
-			}
-			return (ENTITY)entity;
-		}
-		
-		public ENTITY next() throws Exception {
-			TaggedLogAPIEntity entity = null;
-			while (!fetchCompleted) {
-				try {
-					entity = queue.poll(DEFAULT_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
-					if (COMPLETED_ENTITY.equals(entity)) {
-						fetchCompleted = true;
-						if (ex != null) {
-							throw ex;
-						}
-						return null;
-					}
-					if (entity != null) {
-						return (ENTITY)entity;
-					}
-				} catch (InterruptedException ex) {
-					// Just ignore
-				}
-			}
-			return null;
-		}
-		
-		void setException(Exception ex) {
-			this.ex = ex;
-		}
-		
-		BlockingQueue<TaggedLogAPIEntity> getQueue() {
-			return queue;
-		}
-	}
-	
-	private static class EntityFetchThread<ENTITY extends TaggedLogAPIEntity> extends Thread {
-		
-		private final GenericEntityBatchReader reader;
-		private final AtomicInteger threadCount;
-		private final AtomicInteger entityCount;
-		private final EntityResultSet<ENTITY> resultSet;
-		
-		private EntityFetchThread(GenericEntityBatchReader reader, AtomicInteger threadCount, AtomicInteger entityCount, EntityResultSet<ENTITY> resultSet) {
-			this.reader = reader;
-			this.threadCount = threadCount;
-			this.entityCount = entityCount;
-			this.resultSet = resultSet;
-		}
-		
-	    @Override
-	    public void run() {
-	    	try {
-	    		final List<ENTITY> entities = reader.read();
-	    		entityCount.addAndGet(entities.size());
-	    		for (ENTITY entity : entities) {
-	    			if (!resultSet.getQueue().offer(entity, MAX_WRITE_TIME_OUT_IN_SECONDS, TimeUnit.SECONDS)) {
-	    				resultSet.setException(new IOException("Write entity to queue timeout"));
-		    			resultSet.getQueue().add(COMPLETED_ENTITY);
-	    			}
-	    		}
-	    	} catch (Exception ex) {
-				resultSet.setException(ex);
-    			resultSet.getQueue().add(COMPLETED_ENTITY);
-	    	} finally {
-	    		final int count = threadCount.decrementAndGet();
-	    		if (count == 0) {
-	    			resultSet.getQueue().add(COMPLETED_ENTITY);
-	    			LOG.info("Total fetched " + entityCount.get() + " entities");
-	    		}
-	    	}
-	    }
-	}
-}


Mime
View raw message