eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [02/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"
Date Thu, 19 Nov 2015 10:47:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/common/SplitFullScanEntityReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/common/SplitFullScanEntityReader.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/common/SplitFullScanEntityReader.java
new file mode 100755
index 0000000..ae9ecef
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/common/SplitFullScanEntityReader.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.common;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericEntityBatchReader;
+import org.apache.eagle.log.entity.RowkeyBuilder;
+import org.apache.eagle.log.entity.SearchCondition;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.query.ListQueryCompiler;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.EagleBase64Wrapper;
+
+/**
+ * Support stream based entity read. Internally it splits entity fetching to multiple threads to improve 
+ * the performance. However, it doesn't support multi-threading for client to read entities from result set.
+ * 
+ */
+public class SplitFullScanEntityReader<ENTITY extends TaggedLogAPIEntity> {
+
+	// class members
+	public static final int DEFAULT_BUFFER_SIZE = 10 * 1000;
+	public static final int MAX_WRITE_TIME_OUT_IN_SECONDS = 60;
+	private static final Logger LOG = LoggerFactory.getLogger(SplitFullScanEntityReader.class);
+	private static final TaggedLogAPIEntity COMPLETED_ENTITY = new TaggedLogAPIEntity();
+
+	// instance members
+	private final int splits;
+	private final String query;
+	private final long startTime;
+	private final long endTime;
+	private final String startRowkey;
+	private final int pageSize;
+	private final int bufferSize;
+	
+	public SplitFullScanEntityReader(String query,
+			String startTime, String endTime,
+			int splits, String startRowkey, int pageSize) {
+		this(
+				query, 
+				DateTimeUtil.humanDateToSecondsWithoutException(startTime) * 1000,
+				DateTimeUtil.humanDateToSecondsWithoutException(endTime) * 1000,
+				splits,
+				startRowkey,
+				pageSize
+			);
+	}
+	
+	public SplitFullScanEntityReader(String query, long startTime, long endTime,
+			int splits, String startRowkey, int pageSize) {
+		this(query, startTime, endTime, splits, startRowkey, pageSize, 
+				DEFAULT_BUFFER_SIZE);
+	}
+	
+	public SplitFullScanEntityReader(String query, long startTime, long endTime,
+			int splits, String startRowkey, int pageSize, int bufferSize) {
+		this.query = query;
+		this.startTime = startTime;
+		this.endTime = endTime;
+		this.splits = splits;
+		this.startRowkey = startRowkey;
+		this.pageSize = pageSize;
+		this.bufferSize = bufferSize;
+	}
+	
+	public EntityResultSet<ENTITY> read() throws Exception {
+		final EntityResultSet<ENTITY> resultSet = new EntityResultSet<ENTITY>(new ArrayBlockingQueue<TaggedLogAPIEntity>(bufferSize));
+		final List<GenericEntityBatchReader> readers = createSplitThreads();
+		
+		final int size = readers.size();
+		if (size > 0) {
+			final AtomicInteger threadCount = new AtomicInteger(size);
+			final AtomicInteger entityCount = new AtomicInteger(0);
+			for (GenericEntityBatchReader reader : readers) {
+				final EntityFetchThread<ENTITY> thread = new EntityFetchThread<ENTITY>(reader, threadCount, entityCount, resultSet);
+				thread.start();
+			}
+		} else {
+			resultSet.getQueue().add(COMPLETED_ENTITY);
+		}
+		return resultSet;
+	}
+
+	protected List<GenericEntityBatchReader> createSplitThreads() throws Exception {
+		
+		final List<GenericEntityBatchReader> readers = new ArrayList<GenericEntityBatchReader>();
+		final ListQueryCompiler comp = new ListQueryCompiler(query);
+		final EntityDefinition entityDef = EntityDefinitionManager.getEntityByServiceName(comp.serviceName());
+		if (entityDef == null) {
+			throw new IllegalArgumentException("Invalid entity name: " + comp.serviceName());
+		}
+		
+		// TODO: For now we don't support one query to query multiple partitions. In future 
+		// if partition is defined for the entity, internally We need to spawn multiple
+		// queries and send one query for each search condition for each partition
+		final List<String[]> partitionValues = comp.getQueryPartitionValues();
+		partitionConstraintValidate(partitionValues, query);
+		
+		long lastTimestamp = Long.MAX_VALUE;
+		if (startRowkey != null) {
+			final byte[] lastRowkey = EagleBase64Wrapper.decode(startRowkey);
+			lastTimestamp = RowkeyBuilder.getTimestamp(lastRowkey, entityDef);
+		}
+		
+		final long duration = (endTime - startTime) / splits;
+		for (int i = 0; i < splits; ++i) {
+			
+			final long slotStartTime = startTime + (i * duration);
+			if (slotStartTime > lastTimestamp) {
+				// ignore this slot
+				continue;
+			}
+			final long slotEndTime = startTime + ((i + 1) * duration);
+			final SearchCondition condition = new SearchCondition();
+			final String slotStartTimeString = DateTimeUtil.secondsToHumanDate(slotStartTime / 1000);
+			final String slotEndTimeString = DateTimeUtil.secondsToHumanDate(slotEndTime / 1000);
+			condition.setStartTime(slotStartTimeString);
+			condition.setEndTime(slotEndTimeString);
+			
+			condition.setFilter(comp.filter());
+			condition.setQueryExpression(comp.getQueryExpression());
+			if (partitionValues != null) {
+				condition.setPartitionValues(Arrays.asList(partitionValues.get(0)));
+			}
+			// Should be careful to the startRowkey setting. Only set startRowkey when 
+			// lastTimestamp is within the slot time range.
+			if (startRowkey != null && lastTimestamp >= startTime && lastTimestamp < endTime) {
+				condition.setStartRowkey(startRowkey);
+			}
+			condition.setPageSize(pageSize);
+			
+			if (comp.hasAgg()) {
+				List<String> groupbyFields = comp.groupbyFields();
+				List<String> outputFields = new ArrayList<String>();
+				if(groupbyFields != null){
+					outputFields.addAll(groupbyFields);
+				}
+				outputFields.addAll(comp.aggregateFields());
+				condition.setOutputFields(outputFields);
+			} else {
+				condition.setOutputFields(comp.outputFields());
+			}
+			readers.add(new GenericEntityBatchReader(comp.serviceName(), condition));
+		}
+		return readers;
+	}
+	
+
+	private static void partitionConstraintValidate(List<String[]> partitionValues, String query) {
+		if (partitionValues != null && partitionValues.size() > 1) {
+			final String[] values = partitionValues.get(0);
+			for (int i = 1; i < partitionValues.size(); ++i) {
+				final String[] tmpValues = partitionValues.get(i);
+				for (int j = 0; j < values.length; ++j) {
+					if (values[j] == null || (!values[j].equals(tmpValues[j]))) {
+						final String errMsg = "One query for multiple partitions is NOT allowed for now! Query: " + query;
+						LOG.error(errMsg);
+						throw new IllegalArgumentException(errMsg);
+					}
+				}
+			}
+		}
+	}
+	
+	
+	@SuppressWarnings("unchecked")
+	public static class EntityResultSet<ENTITY extends TaggedLogAPIEntity> {
+		private static final long DEFAULT_TIMEOUT_IN_MS = 1000;
+
+		private boolean fetchCompleted = false;
+		private final BlockingQueue<TaggedLogAPIEntity> queue;
+		private volatile Exception ex = null;
+
+		public EntityResultSet(BlockingQueue<TaggedLogAPIEntity> queue) {
+			this.queue = queue;
+		}
+		
+		public boolean hasMoreData() {
+			return queue.size() > 0 || (!fetchCompleted);
+		}
+		
+		public ENTITY next(long timeout, TimeUnit unit) throws InterruptedException {
+			if (fetchCompleted) {
+				return null;
+			}
+			final TaggedLogAPIEntity entity = queue.poll(timeout, unit);
+			if (COMPLETED_ENTITY.equals(entity)) {
+				fetchCompleted = true;
+				return null;
+			}
+			return (ENTITY)entity;
+		}
+		
+		public ENTITY next() throws Exception {
+			TaggedLogAPIEntity entity = null;
+			while (!fetchCompleted) {
+				try {
+					entity = queue.poll(DEFAULT_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
+					if (COMPLETED_ENTITY.equals(entity)) {
+						fetchCompleted = true;
+						if (ex != null) {
+							throw ex;
+						}
+						return null;
+					}
+					if (entity != null) {
+						return (ENTITY)entity;
+					}
+				} catch (InterruptedException ex) {
+					// Just ignore
+				}
+			}
+			return null;
+		}
+		
+		void setException(Exception ex) {
+			this.ex = ex;
+		}
+		
+		BlockingQueue<TaggedLogAPIEntity> getQueue() {
+			return queue;
+		}
+	}
+	
+	private static class EntityFetchThread<ENTITY extends TaggedLogAPIEntity> extends Thread {
+		
+		private final GenericEntityBatchReader reader;
+		private final AtomicInteger threadCount;
+		private final AtomicInteger entityCount;
+		private final EntityResultSet<ENTITY> resultSet;
+		
+		private EntityFetchThread(GenericEntityBatchReader reader, AtomicInteger threadCount, AtomicInteger entityCount, EntityResultSet<ENTITY> resultSet) {
+			this.reader = reader;
+			this.threadCount = threadCount;
+			this.entityCount = entityCount;
+			this.resultSet = resultSet;
+		}
+		
+	    @Override
+	    public void run() {
+	    	try {
+	    		final List<ENTITY> entities = reader.read();
+	    		entityCount.addAndGet(entities.size());
+	    		for (ENTITY entity : entities) {
+	    			if (!resultSet.getQueue().offer(entity, MAX_WRITE_TIME_OUT_IN_SECONDS, TimeUnit.SECONDS)) {
+	    				resultSet.setException(new IOException("Write entity to queue timeout"));
+		    			resultSet.getQueue().add(COMPLETED_ENTITY);
+	    			}
+	    		}
+	    	} catch (Exception ex) {
+				resultSet.setException(ex);
+    			resultSet.getQueue().add(COMPLETED_ENTITY);
+	    	} finally {
+	    		final int count = threadCount.decrementAndGet();
+	    		if (count == 0) {
+	    			resultSet.getQueue().add(COMPLETED_ENTITY);
+	    			LOG.info("Total fetched " + entityCount.get() + " entities");
+	    		}
+	    	}
+	    }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
new file mode 100644
index 0000000..43302c8
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
@@ -0,0 +1,589 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.generic;
+
+import com.sun.jersey.core.header.FormDataContentDisposition;
+import com.sun.jersey.multipart.FormDataParam;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.storage.DataStorage;
+import org.apache.eagle.storage.DataStorageManager;
+import org.apache.eagle.storage.exception.IllegalDataStorageException;
+import org.apache.eagle.storage.operation.*;
+import org.apache.eagle.storage.result.ModifyResult;
+import org.apache.eagle.storage.result.QueryResult;
+import com.sun.jersey.api.json.JSONWithPadding;
+import org.apache.commons.lang.time.StopWatch;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.type.TypeFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.GenericEntity;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @since 3/18/15
+ */
+@Path(GenericEntityServiceResource.ROOT_PATH)
+@SuppressWarnings("unchecked")
+public class GenericEntityServiceResource {
+    public final static String ROOT_PATH = "/entities";
+    public final static String JSONP_PATH = "jsonp";
+    public final static String DELETE_ENTITIES_PATH = "delete";
+    public final static String ROWKEY_PATH = "rowkey";
+
+    public final static String FIRST_TIMESTAMP = "firstTimestamp";
+    public final static String LAST_TIMESTAMP = "lastTimestamp";
+    public final static String ELAPSEDMS = "elapsedms";
+    public final static String TOTAL_RESULTS = "totalResults";
+
+    private final static Logger LOG = LoggerFactory.getLogger(GenericEntityServiceResource.class);
+
+    private List<? extends TaggedLogAPIEntity> unmarshalEntitiesByServie(InputStream inputStream,EntityDefinition entityDefinition) throws IllegalAccessException, InstantiationException, IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.readValue(inputStream, TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, entityDefinition.getEntityClass()));
+    }
+
+    private List<String> unmarshalAsStringlist(InputStream inputStream) throws IllegalAccessException, InstantiationException, IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.readValue(inputStream, TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, String.class));
+    }
+
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity create(InputStream inputStream,
+                                                 @QueryParam("serviceName") String serviceName){
+        GenericServiceAPIResponseEntity<String> response = new GenericServiceAPIResponseEntity<String>();
+        Map<String,Object> meta = new HashMap<>();
+        StopWatch stopWatch = new StopWatch();
+        try {
+            stopWatch.start();
+            EntityDefinition entityDefinition = EntityDefinitionManager.getEntityByServiceName(serviceName);
+
+            if(entityDefinition == null){
+                throw new IllegalArgumentException("entity definition of service "+serviceName+" not found");
+            }
+
+            List<? extends TaggedLogAPIEntity> entities = unmarshalEntitiesByServie(inputStream, entityDefinition);
+            DataStorage dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+            CreateStatement createStatement = new CreateStatement(entities,entityDefinition);
+            ModifyResult<String> result = createStatement.execute(dataStorage);
+            if(result.isSuccess()) {
+                List<String> keys =result.getIdentifiers();
+                if(keys != null) {
+                    response.setObj(keys, String.class);
+                    response.setObj(keys, String.class);
+                    meta.put(TOTAL_RESULTS,keys.size());
+                }else{
+                    meta.put(TOTAL_RESULTS,0);
+                }
+                meta.put(ELAPSEDMS,stopWatch.getTime());
+                response.setMeta(meta);
+                response.setSuccess(true);
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            response.setException(e);
+        }finally {
+            stopWatch.stop();
+        }
+        return response;
+    }
+
+    @POST
+    @Consumes({MediaType.MULTIPART_FORM_DATA})
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity create(@FormDataParam("file") InputStream fileInputStream,
+                                                  @FormDataParam("file") FormDataContentDisposition cdh,
+                                                  @QueryParam("serviceName") String serviceName) {
+        GenericServiceAPIResponseEntity<String> response = new GenericServiceAPIResponseEntity<String>();
+        Map<String,Object> meta = new HashMap<>();
+        StopWatch stopWatch = new StopWatch();
+        try {
+            stopWatch.start();
+            EntityDefinition entityDefinition = EntityDefinitionManager.getEntityByServiceName(serviceName);
+
+            if(entityDefinition == null){
+                throw new IllegalArgumentException("entity definition of service "+serviceName+" not found");
+            }
+
+            List<? extends TaggedLogAPIEntity> entities = unmarshalEntitiesByServie(fileInputStream, entityDefinition);
+            DataStorage dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+            CreateStatement createStatement = new CreateStatement(entities,entityDefinition);
+            ModifyResult<String> result = createStatement.execute(dataStorage);
+            if(result.isSuccess()) {
+                List<String> keys =result.getIdentifiers();
+                if(keys != null) {
+                    response.setObj(keys, String.class);
+                    response.setObj(keys, String.class);
+                    meta.put(TOTAL_RESULTS,keys.size());
+                }else{
+                    meta.put(TOTAL_RESULTS,0);
+                }
+                meta.put(ELAPSEDMS,stopWatch.getTime());
+                response.setMeta(meta);
+                response.setSuccess(true);
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            response.setException(e);
+        }finally {
+            stopWatch.stop();
+        }
+        return response;
+    }
+
+    @PUT
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity update(InputStream inputStream,
+                                                 @QueryParam("serviceName") String serviceName){
+        GenericServiceAPIResponseEntity<String> response = new GenericServiceAPIResponseEntity<String>();
+        DataStorage dataStorage;
+        Map<String,Object> meta = new HashMap<>();
+        StopWatch stopWatch = new StopWatch();
+        try {
+            stopWatch.start();
+            EntityDefinition entityDefinition = EntityDefinitionManager.getEntityByServiceName(serviceName);
+
+            if(entityDefinition == null){
+                throw new IllegalArgumentException("entity definition of service "+serviceName+" not found");
+            }
+
+            List<? extends TaggedLogAPIEntity> entities = unmarshalEntitiesByServie(inputStream, entityDefinition);
+            dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+
+            UpdateStatement updateStatement = new UpdateStatement(entities,entityDefinition);
+            ModifyResult<String> result = updateStatement.execute(dataStorage);
+            if(result.isSuccess()) {
+                List<String> keys =result.getIdentifiers();
+                if(keys != null) {
+                    response.setObj(keys, String.class);
+                    meta.put(TOTAL_RESULTS,keys.size());
+                }else{
+                    meta.put(TOTAL_RESULTS,0);
+                }
+                meta.put(ELAPSEDMS,stopWatch.getTime());
+                response.setMeta(meta);
+                response.setSuccess(true);
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            response.setException(e);
+        } finally {
+            stopWatch.stop();
+        }
+        return response;
+    }
+
+    @PUT
+    @Consumes({MediaType.MULTIPART_FORM_DATA})
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity update(@FormDataParam("file") InputStream fileInputStream,
+                                                  @FormDataParam("file") FormDataContentDisposition cdh,
+                                                  @QueryParam("serviceName") String serviceName){
+        GenericServiceAPIResponseEntity<String> response = new GenericServiceAPIResponseEntity<String>();
+        DataStorage dataStorage;
+        Map<String,Object> meta = new HashMap<>();
+        StopWatch stopWatch = new StopWatch();
+        try {
+            stopWatch.start();
+            EntityDefinition entityDefinition = EntityDefinitionManager.getEntityByServiceName(serviceName);
+
+            if(entityDefinition == null){
+                throw new IllegalArgumentException("entity definition of service "+serviceName+" not found");
+            }
+
+            List<? extends TaggedLogAPIEntity> entities = unmarshalEntitiesByServie(fileInputStream, entityDefinition);
+            dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+
+            UpdateStatement updateStatement = new UpdateStatement(entities,entityDefinition);
+            ModifyResult<String> result = updateStatement.execute(dataStorage);
+            if(result.isSuccess()) {
+                List<String> keys =result.getIdentifiers();
+                if(keys != null) {
+                    response.setObj(keys, String.class);
+                    meta.put(TOTAL_RESULTS,keys.size());
+                }else{
+                    meta.put(TOTAL_RESULTS,0);
+                }
+                meta.put(ELAPSEDMS,stopWatch.getTime());
+                response.setMeta(meta);
+                response.setSuccess(true);
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            response.setException(e);
+        } finally {
+            stopWatch.stop();
+        }
+        return response;
+    }
+
+
+
+    /**
+     * @param value rowkey value
+     * @param serviceName entity service name
+     * @return GenericServiceAPIResponseEntity
+     */
+    @GET
+    @Path(ROWKEY_PATH)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity search(@QueryParam("value") String value,@QueryParam("serviceName") String serviceName){
+        GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+        Map<String,Object> meta = new HashMap<>();
+        DataStorage dataStorage;
+        StopWatch stopWatch = null;
+        try {
+            if(serviceName == null) throw new IllegalArgumentException("serviceName is null");
+            RowkeyQueryStatement queryStatement = new RowkeyQueryStatement(value,serviceName);
+            stopWatch = new StopWatch();
+            stopWatch.start();
+            dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+            if(dataStorage==null){
+                LOG.error("Data storage is null");
+                throw new IllegalDataStorageException("data storage is null");
+            }
+            QueryResult<?> result = queryStatement.execute(dataStorage);
+            if(result.isSuccess()){
+                meta.put(FIRST_TIMESTAMP, result.getFirstTimestamp());
+                meta.put(LAST_TIMESTAMP, result.getLastTimestamp());
+                meta.put(TOTAL_RESULTS, result.getSize());
+                meta.put(ELAPSEDMS,stopWatch.getTime());
+                response.setObj(result.getData());
+                response.setType(result.getEntityType());
+                response.setSuccess(true);
+                response.setMeta(meta);
+                return response;
+            }
+        } catch (Exception e) {
+            response.setException(e);
+            LOG.error(e.getMessage(),e);
+        }finally {
+            if(stopWatch!=null) stopWatch.stop();
+        }
+        return response;
+    }
+
+    /**
+     * @param serviceName entity service name
+     * @return GenericServiceAPIResponseEntity
+     */
+    @POST
+    @Path(ROWKEY_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity search(InputStream inputStream,@QueryParam("serviceName") String serviceName){
+        GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+        Map<String,Object> meta = new HashMap<>();
+        DataStorage dataStorage;
+
+        StopWatch stopWatch = null;
+        try {
+            if(serviceName == null) throw new IllegalArgumentException("serviceName is null");
+
+            final List<String> values = unmarshalAsStringlist(inputStream);
+            final RowkeyQueryStatement queryStatement = new RowkeyQueryStatement(values,serviceName);
+
+            stopWatch = new StopWatch();
+            stopWatch.start();
+            dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+            if(dataStorage==null){
+                LOG.error("Data storage is null");
+                throw new IllegalDataStorageException("Data storage is null");
+            }
+            QueryResult<?> result = queryStatement.execute(dataStorage);
+            if(result.isSuccess()){
+                meta.put(FIRST_TIMESTAMP, result.getFirstTimestamp());
+                meta.put(LAST_TIMESTAMP, result.getLastTimestamp());
+                meta.put(TOTAL_RESULTS, result.getSize());
+                meta.put(ELAPSEDMS,stopWatch.getTime());
+                response.setObj(result.getData());
+                response.setType(result.getEntityType());
+                response.setSuccess(true);
+                response.setMeta(meta);
+                return response;
+            }
+        } catch (Exception e) {
+            response.setException(e);
+            LOG.error(e.getMessage(),e);
+        }finally {
+            if(stopWatch!=null) stopWatch.stop();
+        }
+        return response;
+    }
+
+
+    /**
+     *
+     * @param query
+     * @param startTime
+     * @param endTime
+     * @param pageSize
+     * @param startRowkey
+     * @param treeAgg
+     * @param timeSeries
+     * @param intervalmin
+     * @param top
+     * @param filterIfMissing
+     * @param parallel
+     * @param metricName
+     * @param verbose
+     * @return
+     */
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @SuppressWarnings("unchecked")
+    public GenericServiceAPIResponseEntity search(@QueryParam("query") String query,
+                                                 @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+                                                 @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+                                                 @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+                                                 @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+                                                 @QueryParam("filterIfMissing") boolean filterIfMissing,
+                                                 @QueryParam("parallel") int parallel,
+                                                 @QueryParam("metricName") String metricName,
+                                                 @QueryParam("verbose") Boolean verbose){
+        RawQuery rawQuery = RawQuery.build()
+                .query(query)
+                .startTime(startTime)
+                .endTime(endTime)
+                .pageSize(pageSize)
+                .startRowkey(startRowkey)
+                .treeAgg(treeAgg)
+                .timeSeries(timeSeries)
+                .intervalMin(intervalmin)
+                .top(top)
+                .filerIfMissing(filterIfMissing)
+                .parallel(parallel)
+                .metricName(metricName)
+                .verbose(verbose)
+                .done();
+
+        QueryStatement queryStatement = new QueryStatement(rawQuery);
+        GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+        Map<String,Object> meta = new HashMap<>();
+
+        DataStorage dataStorage;
+        StopWatch stopWatch = new StopWatch();
+        try {
+            stopWatch.start();
+            dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+            if(dataStorage==null){
+                LOG.error("Data storage is null");
+                throw new IllegalDataStorageException("data storage is null");
+            }
+            
+            QueryResult<?> result = queryStatement.execute(dataStorage);
+            if(result.isSuccess()){
+                meta.put(FIRST_TIMESTAMP, result.getFirstTimestamp());
+                meta.put(LAST_TIMESTAMP, result.getLastTimestamp());
+                meta.put(TOTAL_RESULTS, result.getSize());
+                meta.put(ELAPSEDMS,stopWatch.getTime());
+
+                response.setObj(result.getData());
+                response.setType(result.getEntityType());
+                response.setSuccess(true);
+                response.setMeta(meta);
+                return response;
+            }
+        } catch (Exception e) {
+            response.setException(e);
+            LOG.error(e.getMessage(),e);
+        }finally {
+            stopWatch.stop();
+        }
+        return response;
+    }
+
+    /**
+     *
+     * @param query
+     * @param startTime
+     * @param endTime
+     * @param pageSize
+     * @param startRowkey
+     * @param treeAgg
+     * @param timeSeries
+     * @param intervalmin
+     * @param top
+     * @param filterIfMissing
+     * @param parallel
+     * @param metricName
+     * @param verbose
+     * @return
+     */
+    @GET
+    @Path(JSONP_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public JSONWithPadding searchWithJsonp(@QueryParam("query") String query,
+                                           @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+                                           @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+                                           @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+                                           @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+                                           @QueryParam("filterIfMissing") boolean filterIfMissing,
+                                           @QueryParam("parallel") int parallel,
+                                           @QueryParam("metricName") String metricName,
+                                           @QueryParam("verbose") Boolean verbose,
+                                           @QueryParam("callback") String callback){
+        GenericServiceAPIResponseEntity result = search(query, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, top, filterIfMissing, parallel, metricName, verbose);
+        return new JSONWithPadding(new GenericEntity<GenericServiceAPIResponseEntity>(result){}, callback);
+    }
+
+    /**
+     * TODO
+     *
+     * Delete by query
+     *
+     * @return
+     */
+    @DELETE
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity deleteByQuery(@QueryParam("query") String query,
+                                                 @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+                                                 @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+                                                 @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+                                                 @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+                                                 @QueryParam("filterIfMissing") boolean filterIfMissing,
+                                                 @QueryParam("parallel") int parallel,
+                                                 @QueryParam("metricName") String metricName,
+                                                 @QueryParam("verbose") Boolean verbose){
+        RawQuery rawQuery = RawQuery.build()
+                .query(query)
+                .startTime(startTime)
+                .endTime(endTime)
+                .pageSize(pageSize)
+                .startRowkey(startRowkey)
+                .treeAgg(treeAgg)
+                .timeSeries(timeSeries)
+                .intervalMin(intervalmin)
+                .top(top)
+                .filerIfMissing(filterIfMissing)
+                .parallel(parallel)
+                .metricName(metricName)
+                .verbose(verbose)
+                .done();
+
+        GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+        Map<String,Object> meta = new HashMap<String, Object>();
+        DataStorage dataStorage = null;
+        StopWatch stopWatch = new StopWatch();
+        try {
+            stopWatch.start();
+            dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+            if(dataStorage==null){
+                LOG.error("Data storage is null");
+                throw new IllegalDataStorageException("Data storage is null");
+            }
+            
+            DeleteStatement deleteStatement = new DeleteStatement(rawQuery);
+            ModifyResult<String> deleteResult = deleteStatement.execute(dataStorage);
+            if(deleteResult.isSuccess()){
+                meta.put(ELAPSEDMS, stopWatch.getTime());
+                response.setObj(deleteResult.getIdentifiers(),String.class);
+                response.setSuccess(true);
+                response.setMeta(meta);
+            }
+            return response;
+        } catch (Exception e) {
+            response.setException(e);
+            LOG.error(e.getMessage(),e);
+        }finally {
+            stopWatch.stop();
+        }
+        return response;
+    }
+
+    /**
+     *
+     * Delete by entity lists
+     *
+     * Use "POST /entities/delete" instead of "DELETE  /entities" to walk around jersey DELETE issue for request with body
+     *
+     * @param inputStream
+     * @param serviceName
+     * @return
+     */
+    @POST
+    @Path(DELETE_ENTITIES_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity deleteEntities(InputStream inputStream,
+                                                 @QueryParam("serviceName") String serviceName,
+                                                 @QueryParam("byId") Boolean deleteById){
+        GenericServiceAPIResponseEntity<String> response = new GenericServiceAPIResponseEntity<String>();
+        DataStorage dataStorage = null;
+        Map<String,Object> meta = new HashMap<String, Object>();
+
+        if(deleteById == null) deleteById = false;
+
+        StopWatch stopWatch = new StopWatch();
+
+        try {
+            stopWatch.start();
+            dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+            DeleteStatement statement = new DeleteStatement(serviceName);
+
+            if(deleteById) {
+                LOG.info("Deleting "+serviceName+" by ids");
+                List<String> deleteIds = unmarshalAsStringlist(inputStream);
+                statement.setIds(deleteIds);
+            }else {
+                LOG.info("Deleting "+serviceName+" by entities");
+                EntityDefinition entityDefinition = EntityDefinitionManager.getEntityByServiceName(serviceName);
+                if (entityDefinition == null) {
+                    throw new IllegalArgumentException("Entity definition of service " + serviceName + " not found");
+                }
+                List<? extends TaggedLogAPIEntity> entities = unmarshalEntitiesByServie(inputStream, entityDefinition);
+                statement.setEntities(entities);
+            }
+
+            ModifyResult<String> result = statement.execute(dataStorage);
+            if (result.isSuccess()) {
+                List<String> keys = result.getIdentifiers();
+                if (keys != null) {
+                    response.setObj(keys, String.class);
+                    meta.put(TOTAL_RESULTS, keys.size());
+                } else {
+                    meta.put(TOTAL_RESULTS, 0);
+                }
+                meta.put(ELAPSEDMS, stopWatch.getTime());
+                response.setMeta(meta);
+                response.setSuccess(true);
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            response.setException(e);
+        }finally {
+            stopWatch.stop();
+        }
+        return response;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericObjectMapperProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericObjectMapperProvider.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericObjectMapperProvider.java
new file mode 100755
index 0000000..c10c28d
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericObjectMapperProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.generic;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ser.FilterProvider;
+
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.ext.ContextResolver;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+@Produces(MediaType.APPLICATION_JSON)
+public class GenericObjectMapperProvider implements ContextResolver<ObjectMapper> {
+    private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    @Override
+    public ObjectMapper getContext(Class<?> clazz) {
+        return OBJECT_MAPPER;
+    }
+    public static void setFilter(FilterProvider filter){
+        OBJECT_MAPPER.setFilters(filter);
+    }
+
+    static{
+        setFilter(TaggedLogAPIEntity.getFilterProvider());
+        // set more filter here
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/ListQueryResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/ListQueryResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/ListQueryResource.java
new file mode 100755
index 0000000..b14dc22
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/ListQueryResource.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.generic;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.*;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.query.GenericQuery;
+import org.apache.eagle.query.ListQueryCompiler;
+import org.apache.eagle.service.common.EagleExceptionWrapper;
+import org.apache.eagle.storage.hbase.query.GenericQueryBuilder;
+import org.apache.eagle.common.DateTimeUtil;
+import com.sun.jersey.api.json.JSONWithPadding;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.eagle.query.aggregate.timeseries.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.GenericEntity;
+import javax.ws.rs.core.MediaType;
+import java.util.*;
+
+@Path("list")
+public class ListQueryResource {
+	private static final Logger LOG = LoggerFactory.getLogger(ListQueryResource.class);
+
+	/**
+	 * Support old interface without verbose parameter
+	 */
+	public ListQueryAPIResponseEntity listQuery(@QueryParam("query") String query,
+												@QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+												@QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+												@QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+												@QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+												@QueryParam("filterIfMissing") boolean filterIfMissing,
+												@QueryParam("parallel") int parallel,
+												@QueryParam("metricName") String metricName){
+		return listQuery(query, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, top, filterIfMissing, parallel, metricName,true);
+	}
+
+	/**
+	 * TODO refactor the code structure,  now it's messy
+	 * @param query
+	 * @param startTime
+	 * @param endTime
+	 * @param pageSize
+	 * @param startRowkey
+	 * @param treeAgg
+	 * @param timeSeries
+	 * @param intervalmin
+	 * @return
+	 */
+	@GET
+	@Produces({MediaType.APPLICATION_JSON})
+	public ListQueryAPIResponseEntity listQuery(@QueryParam("query") String query, 
+			@QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+			@QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey, 
+			@QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries, 
+			@QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+			@QueryParam("filterIfMissing") boolean filterIfMissing,
+			@QueryParam("parallel") int parallel,
+			@QueryParam("metricName") String metricName,
+			@QueryParam("verbose") Boolean verbose) {
+		if(!EagleConfigFactory.load().isCoprocessorEnabled())
+			return listQueryWithoutCoprocessor(query,startTime,endTime,pageSize,startRowkey,treeAgg,timeSeries,intervalmin,top,filterIfMissing,parallel,metricName,verbose);
+
+		StopWatch watch = new StopWatch();
+		watch.start();
+		ListQueryAPIResponseEntity result = new ListQueryAPIResponseEntity();
+		try{
+			validateQueryParameters(startRowkey, pageSize);
+
+			// 1. Compile query to parse parameters and HBase Filter
+			ListQueryCompiler comp = new ListQueryCompiler(query, filterIfMissing);
+			String serviceName = comp.serviceName();
+			
+			SearchCondition condition = new SearchCondition();
+			condition.setOutputVerbose(verbose == null || verbose);
+			condition.setOutputAlias(comp.getOutputAlias());
+			condition.setFilter(comp.filter());
+			condition.setQueryExpression(comp.getQueryExpression());
+			if(comp.sortOptions() == null && top > 0) {
+				LOG.warn("Parameter \"top\" is only used for sort query! Ignore top parameter this time since it's not a sort query");
+			}
+
+			// 2. Initialize partition values if set
+			// TODO: For now we don't support one query to query multiple partitions. In future 
+			// if partition is defined for the entity, internally We need to spawn multiple
+			// queries and send one query for each search condition for each partition
+			final List<String[]> partitionValues = comp.getQueryPartitionValues();
+			if (partitionValues != null) {
+				condition.setPartitionValues(Arrays.asList(partitionValues.get(0)));
+			}
+
+			// 3. Set time range if it's timeseries service
+			EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+			if(ed.isTimeSeries()){
+				// TODO check timestamp exists for timeseries or topology data
+				condition.setStartTime(startTime);
+				condition.setEndTime(endTime);
+			}
+
+			// 4. Set HBase start scanning rowkey if given
+			condition.setStartRowkey(startRowkey);
+
+			// 5. Set page size
+			condition.setPageSize(pageSize);
+
+			// 6. Generate output,group-by,aggregated fields
+			List<String> outputFields = comp.outputFields();
+			List<String> groupbyFields = comp.groupbyFields();
+			List<String> aggregateFields = comp.aggregateFields();
+			Set<String> filterFields = comp.getFilterFields();
+
+			// Start to generate output fields list {
+			condition.setOutputAll(comp.isOutputAll());
+			if(outputFields == null) outputFields = new ArrayList<String>();
+			if(comp.hasAgg()){
+				if(groupbyFields != null) outputFields.addAll(groupbyFields);
+				if(aggregateFields != null) outputFields.addAll(aggregateFields);
+				if(GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(serviceName) && !outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+					outputFields.add(GenericMetricEntity.VALUE_FIELD);
+				}
+			}
+			if(filterFields!=null) outputFields.addAll(filterFields);
+			condition.setOutputFields(outputFields);
+			if(comp.isOutputAll()){
+				LOG.info("Output fields: ALL");
+			}else{
+				LOG.info("Output fields: " + StringUtils.join(outputFields, ","));
+			}
+			// } END
+
+			// 7. Build GenericQuery
+			GenericQuery reader = GenericQueryBuilder
+									.select(outputFields)
+									.from(serviceName, metricName).where(condition)
+									.groupBy(
+											comp.hasAgg(),
+											groupbyFields,
+											comp.aggregateFunctionTypes(),
+											aggregateFields)
+									.timeSeries(timeSeries, intervalmin)
+									.treeAgg(treeAgg)
+									.orderBy(comp.sortOptions(),comp.sortFunctions(),comp.sortFields()).top(top)
+									.parallel(parallel)								
+									.build();
+			
+			// 8. Fill response object
+			List entities = reader.result();
+			result.setObj(entities);
+			result.setTotalResults(entities.size());
+			result.setSuccess(true);
+			result.setLastTimestamp(reader.getLastTimestamp());
+			result.setFirstTimestamp(reader.getFirstTimeStamp());
+		}catch(Exception ex){
+			LOG.error("Fail executing list query", ex);
+			result.setException(EagleExceptionWrapper.wrap(ex));
+			result.setSuccess(false);
+			return result;
+		}finally{
+			watch.stop();
+			result.setElapsedms(watch.getTime());
+		}
+		LOG.info("Query done " + watch.getTime() + " ms");
+		return result;
+	}
+	
+	/**
+	 * <b>TODO</b> remove the legacy deprecated implementation of listQueryWithoutCoprocessor
+	 *
+	 * @see #listQuery(String, String, String, int, String, boolean, boolean, long, int, boolean, int, String,Boolean)
+	 *
+	 * @param query
+	 * @param startTime
+	 * @param endTime
+	 * @param pageSize
+	 * @param startRowkey
+	 * @param treeAgg
+	 * @param timeSeries
+	 * @param intervalmin
+	 * @return
+	 */
+	@GET
+	@Path("/legacy")
+	@Produces({MediaType.APPLICATION_JSON})
+	@Deprecated
+	public ListQueryAPIResponseEntity listQueryWithoutCoprocessor(@QueryParam("query") String query,
+	                                                              @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+	                                                              @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+	                                                              @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+	                                                              @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+	                                                              @QueryParam("filterIfMissing") boolean filterIfMissing,
+	                                                              @QueryParam("parallel") int parallel,
+	                                                              @QueryParam("metricName") String metricName,
+																  @QueryParam("verbose") Boolean verbose) {
+		StopWatch watch = new StopWatch();
+		watch.start();
+		ListQueryAPIResponseEntity result = new ListQueryAPIResponseEntity();
+		try{
+			validateQueryParameters(startRowkey, pageSize);
+			ListQueryCompiler comp = new ListQueryCompiler(query, filterIfMissing);
+			String serviceName = comp.serviceName();
+
+			SearchCondition condition = new SearchCondition();
+			condition.setFilter(comp.filter());
+			condition.setQueryExpression(comp.getQueryExpression());
+			if(comp.sortOptions() == null && top > 0) {
+				LOG.warn("Parameter \"top\" is only used for sort query! Ignore top parameter this time since it's not a sort query");
+			}
+
+			// TODO: For now we don't support one query to query multiple partitions. In future
+			// if partition is defined for the entity, internally We need to spawn multiple
+			// queries and send one query for each search condition for each partition
+			final List<String[]> partitionValues = comp.getQueryPartitionValues();
+			if (partitionValues != null) {
+				condition.setPartitionValues(Arrays.asList(partitionValues.get(0)));
+			}
+			EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+			if(ed.isTimeSeries()){
+				// TODO check timestamp exists for timeseries or topology data
+				condition.setStartTime(startTime);
+				condition.setEndTime(endTime);
+			}
+			condition.setOutputVerbose(verbose==null || verbose );
+			condition.setOutputAlias(comp.getOutputAlias());
+			condition.setOutputAll(comp.isOutputAll());
+			condition.setStartRowkey(startRowkey);
+			condition.setPageSize(pageSize);
+
+			List<String> outputFields = comp.outputFields();
+			if(outputFields == null) outputFields = new ArrayList<String>();
+
+			/**
+			 * TODO ugly logic, waiting for refactoring
+			 */
+			if(!comp.hasAgg() && !serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){ // pure list query
+//				List<String> outputFields = comp.outputFields();
+				Set<String> filterFields = comp.getFilterFields();
+				if(filterFields != null) outputFields.addAll(filterFields);
+				condition.setOutputFields(outputFields);
+				if(condition.isOutputAll()){
+					LOG.info("Output: ALL");
+				}else{
+					LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+				}
+				GenericEntityBatchReader reader = new GenericEntityBatchReader(serviceName, condition);
+				List<? extends TaggedLogAPIEntity> entityList = reader.read();
+				result.setObj(entityList);
+				result.setTotalResults(entityList.size());
+				result.setSuccess(true);
+				result.setLastTimestamp(reader.getLastTimestamp());
+				result.setFirstTimestamp(reader.getFirstTimestamp());
+			}else if(!comp.hasAgg() && serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){
+				// validate metric name
+				if(metricName == null || metricName.isEmpty()){
+					throw new IllegalArgumentException("metricName should not be empty for metric list query");
+				}
+//				List<String> outputFields = comp.outputFields();
+				Set<String> filterFields = comp.getFilterFields();
+				if(filterFields != null) outputFields.addAll(filterFields);
+				condition.setOutputFields(outputFields);
+				if(condition.isOutputAll()){
+					LOG.info("Output: ALL");
+				}else{
+					LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+				}
+				GenericMetricEntityBatchReader reader = new GenericMetricEntityBatchReader(metricName, condition);
+				List<? extends TaggedLogAPIEntity> entityList = reader.read();
+				result.setObj(entityList);
+				result.setTotalResults(entityList.size());
+				result.setSuccess(true);
+				result.setLastTimestamp(reader.getLastTimestamp());
+				result.setFirstTimestamp(reader.getFirstTimestamp());
+			}
+			else if(!treeAgg && !timeSeries && parallel <= 0 ){ // non time-series based aggregate query, not hierarchical
+				List<String> groupbyFields = comp.groupbyFields();
+				List<String> aggregateFields = comp.aggregateFields();
+				Set<String> filterFields = comp.getFilterFields();
+//				List<String> outputFields = new ArrayList<String>();
+				if(groupbyFields != null) outputFields.addAll(groupbyFields);
+				if(filterFields != null) outputFields.addAll(filterFields);
+				outputFields.addAll(aggregateFields);
+
+				if(GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(serviceName) && !outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+					outputFields.add(GenericMetricEntity.VALUE_FIELD);
+				}
+
+				FlatAggregator agg = new FlatAggregator(groupbyFields, comp.aggregateFunctionTypes(), comp.aggregateFields());
+				StreamReader reader = null;
+				if(ed.getMetricDefinition() == null){
+					reader = new GenericEntityStreamReader(serviceName, condition);
+				}else{ // metric aggregation need metric reader
+					reader = new GenericMetricEntityDecompactionStreamReader(metricName, condition);
+				}
+				condition.setOutputFields(outputFields);
+				if(condition.isOutputAll()){
+					LOG.info("Output: ALL");
+				}else{
+					LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+				}
+				reader.register(agg);
+				reader.readAsStream();
+				ArrayList<Map.Entry<List<String>, List<Double>>> obj = new ArrayList<Map.Entry<List<String>, List<Double>>>();
+				obj.addAll(agg.result().entrySet());
+				if(comp.sortOptions() == null){
+					result.setObj(obj);
+				}else{ // has sort options
+					result.setObj(PostFlatAggregateSort.sort(agg.result(), comp.sortOptions(), top));
+				}
+				result.setTotalResults(0);
+				result.setSuccess(true);
+				result.setLastTimestamp(reader.getLastTimestamp());
+				result.setFirstTimestamp(reader.getFirstTimestamp());
+			}else if(!treeAgg && !timeSeries && parallel > 0){ // TODO ugly branch, let us refactor
+				List<String> groupbyFields = comp.groupbyFields();
+				List<String> aggregateFields = comp.aggregateFields();
+				Set<String> filterFields = comp.getFilterFields();
+//				List<String> outputFields = new ArrayList<String>();
+				if(groupbyFields != null) outputFields.addAll(groupbyFields);
+				if(filterFields != null) outputFields.addAll(filterFields);
+				outputFields.addAll(aggregateFields);
+				if(GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(serviceName) && !outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+					outputFields.add(GenericMetricEntity.VALUE_FIELD);
+				}
+				condition.setOutputFields(outputFields);
+				if(condition.isOutputAll()){
+					LOG.info("Output: ALL");
+				}else{
+					LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+				}
+				FlatAggregator agg = new FlatAggregator(groupbyFields, comp.aggregateFunctionTypes(), comp.aggregateFields());
+				EntityCreationListener listener = EntityCreationListenerFactory.synchronizedEntityCreationListener(agg);
+				StreamReader reader = new GenericEntityStreamReaderMT(serviceName, condition, parallel);
+				reader.register(listener);
+				reader.readAsStream();
+				ArrayList<Map.Entry<List<String>, List<Double>>> obj = new ArrayList<Map.Entry<List<String>, List<Double>>>();
+				obj.addAll(agg.result().entrySet());
+				if(comp.sortOptions() == null){
+					result.setObj(obj);
+				}else{ // has sort options
+					result.setObj(PostFlatAggregateSort.sort(agg.result(), comp.sortOptions(), top));
+				}
+				result.setTotalResults(0);
+				result.setSuccess(true);
+				result.setLastTimestamp(reader.getLastTimestamp());
+				result.setFirstTimestamp(reader.getFirstTimestamp());
+			}else if(!treeAgg && timeSeries){ // time-series based aggregate query, not hierarchical
+				List<String> groupbyFields = comp.groupbyFields();
+				List<String> sortFields = comp.sortFields();
+				List<String> aggregateFields = comp.aggregateFields();
+				Set<String> filterFields = comp.getFilterFields();
+//				List<String> outputFields = new ArrayList<String>();
+				if(groupbyFields != null) outputFields.addAll(groupbyFields);
+				if(filterFields != null) outputFields.addAll(filterFields);
+				if (sortFields != null) outputFields.addAll(sortFields);
+				outputFields.addAll(aggregateFields);
+				if(GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(serviceName) && !outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+					outputFields.add(GenericMetricEntity.VALUE_FIELD);
+				}
+				StreamReader reader = null;
+				if(ed.getMetricDefinition() == null){
+					if(parallel <= 0){ // TODO ugly quick win
+						reader = new GenericEntityStreamReader(serviceName, condition);
+					}else{
+						reader = new GenericEntityStreamReaderMT(serviceName, condition, parallel);
+					}
+				}else{ // metric aggregation need metric reader
+					reader = new GenericMetricEntityDecompactionStreamReader(metricName, condition);
+					if(!outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+						outputFields.add(GenericMetricEntity.VALUE_FIELD);
+					}
+				}
+				condition.setOutputFields(outputFields);
+				if(condition.isOutputAll()){
+					LOG.info("Output: ALL");
+				}else{
+					LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+				}
+				TimeSeriesAggregator tsAgg = new TimeSeriesAggregator(groupbyFields, comp.aggregateFunctionTypes(), aggregateFields,
+						DateTimeUtil.humanDateToDate(condition.getStartTime()).getTime(), DateTimeUtil.humanDateToDate(condition.getEndTime()).getTime(), intervalmin*60*1000);
+				if(parallel <= 0){
+					reader.register(tsAgg);
+				}else{
+					EntityCreationListener listener = EntityCreationListenerFactory.synchronizedEntityCreationListener(tsAgg);
+					reader.register(listener);
+				}
+				// for sorting
+				FlatAggregator sortAgg = null;
+				if (comp.sortOptions() != null) {
+					sortAgg = new FlatAggregator(groupbyFields, comp.sortFunctions(), comp.sortFields());
+					if(parallel <= 0){
+						reader.register(sortAgg);
+					}else{
+						EntityCreationListener listener = EntityCreationListenerFactory.synchronizedEntityCreationListener(sortAgg);
+						reader.register(listener);
+					}
+				}
+				reader.readAsStream();
+				ArrayList<Map.Entry<List<String>, List<double[]>>> obj = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
+				obj.addAll(tsAgg.getMetric().entrySet());
+				if(comp.sortOptions() == null){
+					result.setObj(obj);
+				}else{ // has sort options
+					result.setObj(TimeSeriesPostFlatAggregateSort.sort(sortAgg.result(), tsAgg.getMetric(), comp.sortOptions(), top));
+				}
+				result.setTotalResults(0);
+				result.setSuccess(true);
+				result.setLastTimestamp(reader.getLastTimestamp());
+				result.setFirstTimestamp(reader.getFirstTimestamp());
+			}
+			else{ // use hierarchical aggregate mode
+				List<String> groupbyFields = comp.groupbyFields();
+				List<String> aggregateFields = comp.aggregateFields();
+				Set<String> filterFields = comp.getFilterFields();
+//				List<String> outputFields = new ArrayList<String>();
+				if(groupbyFields != null) outputFields.addAll(groupbyFields);
+				if(filterFields != null) outputFields.addAll(filterFields);
+				outputFields.addAll(aggregateFields);
+				if(GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(serviceName) && !outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+					outputFields.add(GenericMetricEntity.VALUE_FIELD);
+				}
+				condition.setOutputFields(outputFields);
+				if(condition.isOutputAll()){
+					LOG.info("Output: ALL");
+				}else{
+					LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+				}
+				GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, condition);
+				HierarchicalAggregator agg = new HierarchicalAggregator(groupbyFields, comp.aggregateFunctionTypes(), comp.aggregateFields());
+				reader.register(agg);
+				reader.readAsStream();
+				if(comp.sortOptions() == null){
+					result.setObj(agg.result());
+				}else{ // has sort options
+					result.setObj(PostHierarchicalAggregateSort.sort(agg.result(), comp.sortOptions()));
+				}
+				result.setTotalResults(0);
+				result.setSuccess(true);
+				result.setLastTimestamp(reader.getLastTimestamp());
+				result.setFirstTimestamp(reader.getFirstTimestamp());
+			}
+		}catch(Exception ex){
+			LOG.error("Fail executing list query: " + query, ex);
+			result.setException(EagleExceptionWrapper.wrap(ex));
+			result.setSuccess(false);
+			return result;
+		}finally{
+			watch.stop();
+			result.setElapsedms(watch.getTime());
+		}
+		LOG.info("Query done " + watch.getTime() + " ms");
+		return result;
+	}
+
+
+	@GET
+	@Path("/jsonp")
+	@Produces({"application/x-javascript", "application/json", "application/xml"})
+	public JSONWithPadding listQueryWithJsonp(@QueryParam("query") String query, 
+			@QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+			@QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey, 
+			@QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries, 
+			@QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+			@QueryParam("filterIfMissing") boolean filterIfMissing,
+			@QueryParam("parallel") int parallel,
+			@QueryParam("update") String callback,
+			@QueryParam("verbose") boolean verbose) {
+		ListQueryAPIResponseEntity result = listQuery(query, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, top, filterIfMissing, parallel, null,verbose);
+		return new JSONWithPadding(new GenericEntity<ListQueryAPIResponseEntity>(result){}, callback);		
+	}
+	
+	private void validateQueryParameters(String startRowkey, int pageSize){
+		if(pageSize <= 0){
+			throw new IllegalArgumentException("Positive pageSize value should be always provided. The list query format is:\n" + "eagle-service/rest/list?query=<querystring>&pageSize=10&startRowkey=xyz&startTime=xxx&endTime=xxx");
+		}
+		
+		if(startRowkey != null && startRowkey.equals("null")){
+			LOG.warn("startRowkey being null string is not same to startRowkey == null");
+		}
+		return;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/MetadataResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/MetadataResource.java
new file mode 100755
index 0000000..d5ab87f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/MetadataResource.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.generic;
+
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.log.entity.meta.MetricDefinition;
+import com.sun.jersey.api.model.AbstractResource;
+import com.sun.jersey.api.model.AbstractResourceMethod;
+import com.sun.jersey.api.model.AbstractSubResourceMethod;
+import com.sun.jersey.server.impl.modelapi.annotation.IntrospectionModeller;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.Map;
+
+/**
+ * @since : 7/3/14,2014
+ */
+@Path(MetadataResource.PATH_META)
+public class MetadataResource {
+	final static String PATH_META = "meta";
+	final static String PATH_RESOURCE = "resource";
+	final static String PATH_SERVICE = "service";
+
+	@GET
+	@Produces(MediaType.APPLICATION_JSON)
+	public Response index(@Context Application application,
+                        @Context HttpServletRequest request){
+		String basePath = request.getRequestURL().toString();
+		ObjectNode root = JsonNodeFactory.instance.objectNode();
+
+		root.put(PATH_RESOURCE,joinUri(basePath,PATH_RESOURCE));
+		root.put(PATH_SERVICE,joinUri(basePath,PATH_SERVICE));
+		return Response.ok().entity(root).build();
+	}
+
+	@GET
+	@Path(PATH_RESOURCE)
+	@Produces(MediaType.APPLICATION_JSON)
+	public Response listAllResourcesRoutes(@Context Application application,
+	                                       @Context HttpServletRequest request){
+		String basePath = request.getRequestURL().toString();
+		basePath = basePath.substring(0,basePath.length() - PATH_META.length() - PATH_RESOURCE.length() -1);
+		ObjectNode root = JsonNodeFactory.instance.objectNode();
+		root.put("base",basePath);
+		ArrayNode resources = JsonNodeFactory.instance.arrayNode();
+		root.put( "resources", resources );
+
+		for ( Class<?> aClass : application.getClasses()){
+			if ( isAnnotatedResourceClass(aClass)){
+				AbstractResource resource = IntrospectionModeller.createResource(aClass);
+				ObjectNode resourceNode = JsonNodeFactory.instance.objectNode();
+				String uriPrefix = resource.getPath().getValue();
+
+				for ( AbstractSubResourceMethod srm : resource.getSubResourceMethods() ) {
+					String uri = uriPrefix + "/" + srm.getPath().getValue();
+					addTo( resourceNode, uri, srm, joinUri(basePath, uri) );
+				}
+
+				for ( AbstractResourceMethod srm : resource.getResourceMethods() ) {
+					addTo( resourceNode, uriPrefix, srm, joinUri( basePath, uriPrefix ) );
+				}
+				resources.add( resourceNode );
+			}
+		}
+
+		return Response.ok().entity( root ).build();
+	}
+
+	private String joinUri(String basePath, String uriPrefix) {
+		if(basePath.endsWith("/") && uriPrefix.startsWith("/")){
+			return basePath.substring(0,basePath.length()-2)+uriPrefix;
+		}else if(basePath.endsWith("/") || uriPrefix.startsWith("/")){
+			return basePath+ uriPrefix;
+		}
+		return basePath+"/"+uriPrefix;
+	}
+
+	private void addTo( ObjectNode resourceNode, String uriPrefix, AbstractResourceMethod srm, String path ){
+		if(resourceNode.get( uriPrefix ) == null){
+			ObjectNode inner = JsonNodeFactory.instance.objectNode();
+			inner.put("path", path);
+			inner.put("methods", JsonNodeFactory.instance.arrayNode());
+			resourceNode.put( uriPrefix, inner );
+		}
+
+		((ArrayNode) resourceNode.get( uriPrefix ).get("methods")).add( srm.getHttpMethod() );
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	private boolean isAnnotatedResourceClass( Class rc ){
+		if ( rc.isAnnotationPresent( Path.class ) ) {
+			return true;
+		}
+
+		for ( Class i : rc.getInterfaces() ) {
+			if ( i.isAnnotationPresent( Path.class ) ) {
+				return true;
+			}
+		}
+
+		return false;
+	}
+
+	@GET
+	@Path(PATH_SERVICE)
+	@Produces(MediaType.APPLICATION_JSON)
+	public Response listAllEntities(@Context Application application,
+                                     @Context HttpServletRequest request) throws Exception {
+		Map<String,EntityDefinition> entities = EntityDefinitionManager.entities();
+		ObjectNode root = JsonNodeFactory.instance.objectNode();
+
+		ArrayNode services = JsonNodeFactory.instance.arrayNode();
+
+		for(Map.Entry<String,EntityDefinition> entry : entities.entrySet()){
+//			ObjectNode serviceNode = JsonNodeFactory.instance.objectNode();
+//			serviceNode.put(entry.getKey(),entityDefationitionAsJson(entry.getValue()));
+			services.add(entityDefationitionAsJson(entry.getValue()));
+		}
+		root.put("count",entities.keySet().size());
+		root.put("services",services);
+		return Response.ok().entity(root).build();
+	}
+
+	private JsonNode entityDefationitionAsJson(EntityDefinition def) {
+		ObjectNode node = JsonNodeFactory.instance.objectNode();
+		node.put("service",def.getService());
+		node.put("entityClass",def.getEntityClass().getName());
+		node.put("table",def.getTable());
+		node.put("columnFamily",def.getColumnFamily());
+		node.put("prefix",def.getPrefix());
+		if(def.getPartitions()!=null){
+			node.put("partitions",arrayNode(def.getPartitions()));
+		}
+		node.put("isTimeSeries",def.isTimeSeries());
+
+		MetricDefinition mdf = def.getMetricDefinition();
+		if(mdf!=null){
+			node.put("interval", mdf.getInterval());
+		}
+
+		IndexDefinition[] indexDef = def.getIndexes();
+		if(indexDef!=null){
+			ArrayNode indexDefArray = JsonNodeFactory.instance.arrayNode();
+			for(IndexDefinition idef : indexDef){
+				ObjectNode idn = JsonNodeFactory.instance.objectNode();
+				idn.put("indexPrefix",idef.getIndexPrefix());
+
+				if(idef.getIndex()!=null){
+					ObjectNode index = JsonNodeFactory.instance.objectNode();
+					index.put("name",idef.getIndex().name());
+					index.put("columns",arrayNode(idef.getIndex().columns()));
+					idn.put("index",index);
+				}
+
+				indexDefArray.add(idn);
+			}
+			node.put("indexs",indexDefArray);
+		}
+		return node;
+	}
+
+	private ArrayNode arrayNode(String[] values){
+		ArrayNode an = JsonNodeFactory.instance.arrayNode();
+		for(String v:values){
+			an.add(v);
+		}
+		return an;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/metric/EagleMetricResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/metric/EagleMetricResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/metric/EagleMetricResource.java
new file mode 100644
index 0000000..f3ff420
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/metric/EagleMetricResource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metric;
+
+import java.util.List;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.entity.GenericCreateAPIResponseEntity;
+import org.apache.eagle.log.entity.GenericEntityWriter;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+
+@Path(EagleMetricResource.METRIC_URL_PATH)
+public class EagleMetricResource {
+	private static final Logger LOG = LoggerFactory.getLogger(EagleMetricResource.class);
+	public static final String METRIC_URL_PATH = "/metric";
+	
+	@POST
+	@Consumes(MediaType.APPLICATION_JSON)
+	@Produces(MediaType.APPLICATION_JSON)
+	public GenericCreateAPIResponseEntity createGenericMetricEntity(List<GenericMetricEntity> entities) {
+		GenericCreateAPIResponseEntity result = new GenericCreateAPIResponseEntity();
+		try{
+			GenericEntityWriter writer = new GenericEntityWriter(GenericMetricEntity.GENERIC_METRIC_SERVICE);
+			List<String> rowkeys = null;
+			rowkeys = writer.write(entities);
+			result.setEncodedRowkeys(rowkeys);
+			result.setSuccess(true);
+			return result;
+		}catch(Exception ex){
+			LOG.error("Fail writing Generic Metric entity", ex);
+			result.setSuccess(false);
+			result.setException(ex.getMessage());
+			return result;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowKeyQueryResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowKeyQueryResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowKeyQueryResource.java
new file mode 100644
index 0000000..9480695
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowKeyQueryResource.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.service.rowkey;
+
+import java.io.IOException;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.eagle.service.common.EagleExceptionWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.base.taggedlog.NoSuchRowException;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.RowkeyQueryAPIResponseEntity;
+import org.apache.eagle.log.entity.index.RowKeyLogReader;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.common.EagleBase64Wrapper;
+
+/**
+ * @since Jan 26, 2015
+ */
+@Path("/rowkeyquery")
+public class RowKeyQueryResource {
+	private static final Logger LOG = LoggerFactory.getLogger(RowKeyQueryResource.class);
+
+	@GET
+	@Produces({MediaType.APPLICATION_JSON})
+	public RowkeyQueryAPIResponseEntity getEntityByRowkey(@QueryParam("query") String query, @QueryParam("rowkey") String rowkey){
+		RowkeyQueryAPIResponseEntity result = new RowkeyQueryAPIResponseEntity();
+		RowKeyLogReader reader = null;
+
+		try {
+			EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(query);
+			reader = new RowKeyLogReader(ed, EagleBase64Wrapper.decode(rowkey));
+			reader.open();
+			InternalLog log = reader.read();
+			TaggedLogAPIEntity entity;
+			entity = HBaseInternalLogHelper.buildEntity(log, ed);
+			result.setObj(entity);			
+			result.setSuccess(true);
+			return result;
+		}
+		catch(NoSuchRowException ex){
+			LOG.error("rowkey " + ex.getMessage() + " does not exist!", ex);
+			result.setSuccess(false);
+			result.setException(EagleExceptionWrapper.wrap(ex));
+			return result;
+		}
+		catch(Exception ex){
+			LOG.error("Cannot read alert by rowkey", ex);
+			result.setSuccess(false);
+			result.setException(EagleExceptionWrapper.wrap(ex));
+			return result;
+		}
+		finally{
+			try {
+				if(reader != null)
+					reader.close();
+			} catch (IOException e) {
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowkeyResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowkeyResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowkeyResource.java
new file mode 100644
index 0000000..d4a6a42
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowkeyResource.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.rowkey;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.eagle.service.common.EagleExceptionWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.base.taggedlog.RowkeyAPIEntity;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.old.GenericDeleter;
+import org.apache.eagle.log.entity.old.HBaseLogByRowkeyReader;
+import org.apache.eagle.common.ByteUtil;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.EagleBase64Wrapper;
+import org.apache.eagle.common.service.POSTResultEntityBase;
+
+@Deprecated
+@Path("rowkey")
+public class RowkeyResource {
+	private static final Logger LOG = LoggerFactory.getLogger(RowkeyResource.class);
+	
+	@GET
+	@Produces(MediaType.APPLICATION_JSON)
+	public RowkeyAPIEntity inspectRowkey(@QueryParam("table") String table, @QueryParam("cf") String columnFamily, 
+			@QueryParam("key") String key, @QueryParam("all") String all, @QueryParam("field") List<String> fields){
+		RowkeyAPIEntity entity = new RowkeyAPIEntity();
+		byte[] row = null;
+		boolean includingAllQualifiers = false;
+		if(all != null && all.equals("true"))
+			includingAllQualifiers = true;
+		HBaseLogByRowkeyReader getter = new HBaseLogByRowkeyReader(table, columnFamily, includingAllQualifiers, fields);
+		InternalLog log = null;
+		try{
+			getter.open();
+			row = EagleBase64Wrapper.decode(key);
+			log = getter.get(row);
+		}catch(Exception ex){
+			LOG.error("Cannot get rowkey", ex);
+			entity.setSuccess(false);
+			entity.setException(EagleExceptionWrapper.wrap(ex));
+			return entity;
+		}finally{
+			try{
+				getter.close();
+			}catch(Exception ex){}
+		}
+		
+		Map<String, String> fieldNameValueMap = new TreeMap<String, String>();
+		entity.setFieldNameValueMap(fieldNameValueMap);
+		// populate qualifiers
+		Map<String, byte[]> qualifierValues = log.getQualifierValues();
+		for(Map.Entry<String, byte[]> qualifier : qualifierValues.entrySet()){
+			if(qualifier.getValue() != null){
+				fieldNameValueMap.put(qualifier.getKey(), new String(qualifier.getValue()));
+			}
+		}
+		
+		// decode rowkey
+		// the first integer is prefix hashcode
+		entity.setPrefixHashCode(ByteUtil.bytesToInt(row, 0));
+		long ts = Long.MAX_VALUE-ByteUtil.bytesToLong(row, 4);
+		entity.setTimestamp(ts);
+		entity.setHumanTime(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(ts));
+		int offset = 4+8;
+		int len = row.length;
+		Map<Integer, Integer> tagNameHashValueHashMap = new HashMap<Integer, Integer>();
+		// TODO boundary check please
+		while(offset < len){
+			int tagNameHash = ByteUtil.bytesToInt(row, offset);
+			offset += 4;
+			int tagValueHash = ByteUtil.bytesToInt(row, offset);
+			offset += 4;
+			tagNameHashValueHashMap.put(tagNameHash, tagValueHash);
+		}
+		
+		entity.setSuccess(true);
+		return entity;
+	}
+	
+	/**
+	 * for entities, the only required field is encodedRowkey
+	 * @param table
+	 * @param columnFamily
+	 * @param entities
+	 */
+	@DELETE
+	@Consumes(MediaType.APPLICATION_JSON)
+	@Produces(MediaType.APPLICATION_JSON)
+	public POSTResultEntityBase deleteEntityByEncodedRowkey(@QueryParam("table") String table, @QueryParam("cf") String columnFamily,
+			List<TaggedLogAPIEntity> entities){
+		GenericDeleter deleter = new GenericDeleter(table, columnFamily);
+		POSTResultEntityBase result = new POSTResultEntityBase();
+		try{
+			deleter.delete(entities);
+		}catch(Exception ex){
+			LOG.error("Fail deleting entity " + table + ":" + columnFamily, ex);
+			result.setSuccess(false);
+			result.setException(ex.getMessage());
+			return result;
+		}
+		result.setSuccess(true);
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/selfcheck/EagleServiceSelfCheckAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/selfcheck/EagleServiceSelfCheckAPIEntity.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/selfcheck/EagleServiceSelfCheckAPIEntity.java
new file mode 100644
index 0000000..0e52a2e
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/selfcheck/EagleServiceSelfCheckAPIEntity.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.selfcheck;
+
+/**
+ * expose internal configuration or metrics
+ */
+//@XmlRootElement
+//@XmlAccessorType(XmlAccessType.FIELD)
+//@XmlType(propOrder = {"env", "hbaseZookeeperQuorum", "hbaseZookeeperClientPort"})
+public class EagleServiceSelfCheckAPIEntity {
+	private String env;
+	private String hbaseZookeeperQuorum;
+	private String hbaseZookeeperClientPort;
+	public String getEnv() {
+		return env;
+	}
+	public void setEnv(String env) {
+		this.env = env;
+	}
+	public String getHbaseZookeeperQuorum() {
+		return hbaseZookeeperQuorum;
+	}
+	public void setHbaseZookeeperQuorum(String hbaseZookeeperQuorum) {
+		this.hbaseZookeeperQuorum = hbaseZookeeperQuorum;
+	}
+	public String getHbaseZookeeperClientPort() {
+		return hbaseZookeeperClientPort;
+	}
+	public void setHbaseZookeeperClientPort(String hbaseZookeeperClientPort) {
+		this.hbaseZookeeperClientPort = hbaseZookeeperClientPort;
+	}
+}



Mime
View raw message