eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [17/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"
Date Thu, 19 Nov 2015 10:47:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
new file mode 100755
index 0000000..b1dd64c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.common.DateTimeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+
+public class GenericMetricEntityDecompactionStreamReader extends StreamReader implements EntityCreationListener{
+	@SuppressWarnings("unused")
+	private static final Logger LOG = LoggerFactory.getLogger(GenericMetricEntityDecompactionStreamReader.class);
+	private GenericEntityStreamReader reader;
+	private EntityDefinition ed;
+	private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE;
+	private long start;
+	private long end;
+	private GenericMetricShadowEntity single = new GenericMetricShadowEntity();
+	
+	/**
+	 * it makes sense that serviceName should not be provided while metric name should be provided as prefix
+	 * @param metricName
+	 * @param condition
+	 * @throws InstantiationException
+	 * @throws IllegalAccessException
+	 * @throws ParseException
+	 */
+	public GenericMetricEntityDecompactionStreamReader(String metricName, SearchCondition condition) throws InstantiationException, IllegalAccessException, ParseException{
+		ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+		checkIsMetric(ed);
+		reader = new GenericEntityStreamReader(serviceName, condition, metricName);
+		start = DateTimeUtil.humanDateToSeconds(condition.getStartTime())*1000;
+		end = DateTimeUtil.humanDateToSeconds(condition.getEndTime())*1000;
+	}
+	
+	private void checkIsMetric(EntityDefinition ed){
+		if(ed.getMetricDefinition() == null)
+			throw new IllegalArgumentException("Only metric entity comes here");
+	}
+	
+	@Override
+	public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
+		GenericMetricEntity e = (GenericMetricEntity)entity;
+		double[] value = e.getValue();
+		if(value != null) {
+			int count =value.length;
+			@SuppressWarnings("unused")
+			Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
+			for (int i = 0; i < count; i++) {
+				long ts = entity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
+				// exclude those entity which is not within the time range in search condition. [start, end)
+				if (ts < start || ts >= end) {
+					continue;
+				}
+				single.setTimestamp(ts);
+				single.setTags(entity.getTags());
+				single.setValue(e.getValue()[i]);
+				for (EntityCreationListener l : _listeners) {
+					l.entityCreated(single);
+				}
+			}
+		}
+	}
+	
+	@Override
+	public void readAsStream() throws Exception{
+		reader.register(this);
+		reader.readAsStream();
+	}
+
+	@Override
+	public long getLastTimestamp() {
+		return reader.getLastTimestamp();
+	}
+
+	@Override
+	public long getFirstTimestamp() {
+		return reader.getFirstTimestamp();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
new file mode 100644
index 0000000..acd1290
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+/**
+ * just a shadow class to avoid dynamically create the class and instantiate using reflection
+ */
+public class GenericMetricShadowEntity extends TaggedLogAPIEntity {
+	private double value;
+
+	public double getValue() {
+		return value;
+	}
+
+	public void setValue(double value) {
+		this.value = value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
new file mode 100644
index 0000000..edb9fc0
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.common.EagleExceptionWrapper;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonDeserialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlType;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The only GenericServiceAPIResponseEntity for both client and server side
+ *
+ * @see GenericServiceAPIResponseEntityDeserializer
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(propOrder = {"success","exception","meta","type","obj"})
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonDeserialize(using = GenericServiceAPIResponseEntityDeserializer.class)
+@JsonIgnoreProperties(ignoreUnknown=true)
+public class GenericServiceAPIResponseEntity<T>{
+    /**
+     * Please use primitive type of value in meta as possible
+     */
+    private Map<String,Object> meta;
+	private boolean success;
+	private String exception;
+    private List<T> obj;
+    private Class<T> type;
+
+    public GenericServiceAPIResponseEntity(){
+        // default constructor
+    }
+    public GenericServiceAPIResponseEntity(Class<T> type){
+        this.setType(type);
+    }
+
+    public Map<String, Object> getMeta() {
+        return meta;
+    }
+
+    public void setMeta(Map<String, Object> meta) {
+        this.meta = meta;
+    }
+
+    public List<T> getObj() {
+        return obj;
+    }
+
+    public void setObj(List<T> obj) {
+        this.obj = obj;
+    }
+
+    public void setObj(List<T> obj,Class<T> type) {
+        this.setObj(obj);
+        this.setType(type);
+    }
+
+    public Class<T> getType() {
+        return type;
+    }
+
+    /**
+     * Set the first object's class as type
+     */
+    @SuppressWarnings("unused")
+    public void setTypeByObj(){
+        for(T t:this.obj){
+            if(this.type == null && t!=null){
+                this.type = (Class<T>) t.getClass();
+            }
+        }
+    }
+
+    /**
+     * can explicitly change type class
+     *
+     * @param type
+     */
+    public void setType(Class<T> type) {
+        this.type = type;
+    }
+
+	public boolean isSuccess() {
+		return success;
+	}
+	public void setSuccess(boolean success) {
+		this.success = success;
+	}
+	public String getException() {
+		return exception;
+	}
+	public void setException(String exception) {
+		this.exception = exception;
+	}
+
+    public void setException(Exception exception){
+        if(exception!=null) this.exception = EagleExceptionWrapper.wrap(exception);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
new file mode 100644
index 0000000..85b875d
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.ObjectCodec;
+import org.codehaus.jackson.map.DeserializationContext;
+import org.codehaus.jackson.map.JsonDeserializer;
+import org.codehaus.jackson.map.type.TypeFactory;
+import org.codehaus.jackson.type.JavaType;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * @since 3/18/15
+ */
+public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserializer<GenericServiceAPIResponseEntity> {
+    private final static String META_FIELD="meta";
+    private final static String SUCCESS_FIELD="success";
+    private final static String EXCEPTION_FIELD="exception";
+    private final static String OBJ_FIELD="obj";
+    private final static String TYPE_FIELD="type";
+
+    @Override
+    public GenericServiceAPIResponseEntity deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+        GenericServiceAPIResponseEntity entity = new GenericServiceAPIResponseEntity();
+        ObjectCodec objectCodec = jp.getCodec();
+
+        JsonNode rootNode = jp.getCodec().readTree(jp);
+        if(rootNode.isObject()){
+            Iterator<Map.Entry<String,JsonNode>> fields = rootNode.getFields();
+            JsonNode objNode = null;
+            while(fields.hasNext()){
+                Map.Entry<String,JsonNode> field = fields.next();
+                if (META_FIELD.equals(field.getKey()) && field.getValue() != null)
+                    entity.setMeta(objectCodec.readValue(field.getValue().traverse(), Map.class));
+                else if(SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null){
+                    entity.setSuccess(field.getValue().getValueAsBoolean(false));
+                }else if(EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null){
+                    entity.setException(field.getValue().getTextValue());
+                }else if(TYPE_FIELD.endsWith(field.getKey())  && field.getValue() != null){
+                    try {
+                        entity.setType(Class.forName(field.getValue().getTextValue()));
+                    } catch (ClassNotFoundException e) {
+                        throw new IOException(e);
+                    }
+                }else if(OBJ_FIELD.equals(field.getKey()) && field.getValue() != null){
+                    objNode = field.getValue();
+                }
+            }
+
+            if(objNode!=null) {
+                JavaType collectionType=null;
+                if (entity.getType() != null) {
+                    collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, entity.getType());
+                }else{
+                    collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, Map.class);
+                }
+                List obj = objectCodec.readValue(objNode.traverse(), collectionType);
+                entity.setObj(obj);
+            }
+        }else{
+            throw new IOException("root node is not object");
+        }
+        return entity;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
new file mode 100755
index 0000000..7a38033
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.expression.ExpressionParser;
+import org.apache.eagle.log.entity.meta.*;
+import org.apache.eagle.query.parser.TokenConstant;
+import org.apache.eagle.common.ByteUtil;
+import org.apache.eagle.common.EagleBase64Wrapper;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class HBaseInternalLogHelper {
+	private final static Logger LOG  = LoggerFactory.getLogger(HBaseInternalLogHelper.class);
+
+	private static final EntitySerDeserializer ENTITY_SERDESER = new EntitySerDeserializer();
+
+	/**
+	 *
+	 * @param ed
+	 * @param r
+	 * @param qualifiers if null, return all qualifiers defined in ed
+	 * @return
+	 */
+	public static InternalLog parse(EntityDefinition ed, Result r, byte[][] qualifiers) {
+		final byte[] row = r.getRow();
+		// skip the first 4 bytes : prefix
+		final int offset = (ed.getPartitions() == null) ? (4) : (4 + ed.getPartitions().length * 4);
+		long timestamp = ByteUtil.bytesToLong(row, offset);
+		// reverse timestamp
+		timestamp = Long.MAX_VALUE - timestamp;
+		final byte[] family = ed.getColumnFamily().getBytes();
+		final Map<String, byte[]> allQualifierValues = new HashMap<String, byte[]>();
+
+		if (qualifiers != null) {
+			int count = qualifiers.length;
+			final byte[][] values = new byte[count][];
+			for (int i = 0; i < count; i++) {
+				// TODO if returned value is null, it means no this column for this row, so why set null to the object?
+				values[i] = r.getValue(family, qualifiers[i]);
+				allQualifierValues.put(new String(qualifiers[i]), values[i]);
+			}
+		}else{
+			// return all qualifiers
+			for(KeyValue kv:r.list()){
+				byte[] qualifier = kv.getQualifier();
+				byte[] value = kv.getValue();
+				allQualifierValues.put(new String(qualifier),value);
+			}
+		}
+		final InternalLog log = buildObject(ed, row, timestamp, allQualifierValues);
+		return log;
+	}
+
+	/**
+	 *
+	 * @param ed
+	 * @param row
+	 * @param timestamp
+	 * @param allQualifierValues <code>Map &lt; Qualifier name (not display name),Value in bytes array &gt;</code>
+	 * @return
+	 */
+	public static InternalLog buildObject(EntityDefinition ed, byte[] row, long timestamp, Map<String, byte[]> allQualifierValues) {
+		InternalLog log = new InternalLog();
+		String myRow = EagleBase64Wrapper.encodeByteArray2URLSafeString(row);
+		log.setEncodedRowkey(myRow);
+		log.setPrefix(ed.getPrefix());
+		log.setTimestamp(timestamp);
+
+		Map<String, byte[]> logQualifierValues = new HashMap<String, byte[]>();
+		Map<String, String> logTags = new HashMap<String, String>();
+		Map<String, Object> extra = null;
+
+		Map<String,Double> doubleMap = null;
+		// handle with metric
+		boolean isMetricEntity = GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService());
+		double[] metricValueArray = null;
+
+		for (Map.Entry<String, byte[]> entry : allQualifierValues.entrySet()) {
+			if (ed.isTag(entry.getKey())) {
+				if (entry.getValue() != null) {
+					logTags.put(entry.getKey(), new String(entry.getValue()));
+				}else if (TokenConstant.isExpression(entry.getKey())){
+					if(doubleMap == null) doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed);
+					// Caculate expression based fields
+					String expression = TokenConstant.parseExpressionContent(entry.getKey());
+					if (extra == null) extra = new HashMap<String, Object>();
+
+					// Evaluation expression as output based on entity
+					// -----------------------------------------------
+					// 1) Firstly, check whether is metric entity and expression requires value and also value is not number (i.e. double[])
+					// 2) Treat all required fields as double, if not number, then set result as NaN
+
+					try {
+						ExpressionParser parser = ExpressionParser.parse(expression);
+						boolean isRequiringValue = parser.getDependentFields().contains(GenericMetricEntity.VALUE_FIELD);
+
+						if(isMetricEntity && isRequiringValue && doubleMap.get(GenericMetricEntity.VALUE_FIELD)!=null
+								&& Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) // EntityQualifierUtils will convert non-number field into Double.NaN
+						{
+							// if dependent fields require "value"
+							// and value exists but value's type is double[] instead of double
+
+							// handle with metric value array based expression
+							// lazily extract metric value as double array if required
+							if(metricValueArray == null){
+								// if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){
+								Qualifier qualifier = ed.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
+								EntitySerDeser serDeser = qualifier.getSerDeser();
+								if(serDeser instanceof DoubleArraySerDeser){
+									byte[] value = allQualifierValues.get(qualifier.getQualifierName());
+									if(value !=null ) metricValueArray = (double[]) serDeser.deserialize(value);
+								}
+								// }
+							}
+
+							if(metricValueArray!=null){
+								double[] resultBucket = new double[metricValueArray.length];
+								Map<String, Double> _doubleMap = new HashMap<String,Double>(doubleMap);
+								_doubleMap.remove(entry.getKey());
+								for(int i=0;i< resultBucket.length;i++) {
+									_doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]);
+									resultBucket[i]=  parser.eval(_doubleMap);
+								}
+								extra.put(expression,resultBucket);
+							}else{
+								LOG.warn("Failed convert metric value into double[] type which is required by expression: "+expression);
+								// if require value in double[] is NaN
+								double value = parser.eval(doubleMap);
+								extra.put(expression, value);
+							}
+						}else {
+							double value = parser.eval(doubleMap);
+							extra.put(expression, value);
+							// LOG.info("DEBUG: "+entry.getKey()+" = "+ value);
+						}
+					} catch (Exception e) {
+						LOG.error("Failed to eval expression "+expression+", exception: "+e.getMessage(),e);
+					}
+				}
+			} else {
+				logQualifierValues.put(entry.getKey(),entry.getValue());
+			}
+		}
+		log.setQualifierValues(logQualifierValues);
+		log.setTags(logTags);
+		log.setExtraValues(extra);
+		return log;
+	}
+	
+	public static TaggedLogAPIEntity buildEntity(InternalLog log, EntityDefinition entityDef) throws Exception {
+		Map<String, byte[]> qualifierValues = log.getQualifierValues();
+		TaggedLogAPIEntity entity = ENTITY_SERDESER.readValue(qualifierValues, entityDef);
+		if (entity.getTags() == null && log.getTags() != null) {
+			entity.setTags(log.getTags());
+		}
+		entity.setExp(log.getExtraValues());
+		entity.setTimestamp(log.getTimestamp());
+		entity.setEncodedRowkey(log.getEncodedRowkey());
+		entity.setPrefix(log.getPrefix());
+		return entity;
+	}
+	
+	public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> logs, EntityDefinition entityDef) throws Exception {
+		final List<TaggedLogAPIEntity> result = new ArrayList<TaggedLogAPIEntity>(logs.size());
+		for (InternalLog log : logs) {
+			result.add(buildEntity(log, entityDef));
+		}
+		return result;
+	}
+	
+	public static byte[][] getOutputQualifiers(EntityDefinition entityDef, List<String> outputFields) {
+		final byte[][] result = new byte[outputFields.size()][];
+		int index = 0;
+		for(String field : outputFields){
+			// convert displayName to qualifierName
+			Qualifier q = entityDef.getDisplayNameMap().get(field);
+			if(q == null){ // for tag case
+				result[index++] = field.getBytes();
+			}else{ // for qualifier case
+				result[index++] = q.getQualifierName().getBytes();
+			}
+		}
+		return result;
+	}
+
+	public static InternalLog convertToInternalLog(TaggedLogAPIEntity entity, EntityDefinition entityDef) throws Exception {
+		final InternalLog log = new InternalLog();
+		final Map<String, String> inputTags = entity.getTags();
+		final Map<String, String> tags = new TreeMap<String, String>();
+		if(inputTags!=null) {
+			for (Map.Entry<String, String> entry : inputTags.entrySet()) {
+				tags.put(entry.getKey(), entry.getValue());
+			}
+		}
+		log.setTags(tags);
+		if(entityDef.isTimeSeries()){
+			log.setTimestamp(entity.getTimestamp());
+		}else{
+			log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0
+		}
+		
+		// For Metric entity, prefix is populated along with entity instead of EntityDefinition
+		if(entity.getPrefix() != null && !entity.getPrefix().isEmpty()){
+			log.setPrefix(entity.getPrefix());
+		}else{
+			log.setPrefix(entityDef.getPrefix());
+		}
+		
+		log.setPartitions(entityDef.getPartitions());
+		EntitySerDeserializer des = new EntitySerDeserializer();
+		log.setQualifierValues(des.writeValue(entity, entityDef));
+		
+		final IndexDefinition[] indexDefs = entityDef.getIndexes();
+		if (indexDefs != null) {
+			final List<byte[]> indexRowkeys = new ArrayList<byte[]>();
+			for (int i = 0; i < indexDefs.length; ++i) {
+				final IndexDefinition indexDef = indexDefs[i];
+				final byte[] indexRowkey = indexDef.generateIndexRowkey(entity);
+				indexRowkeys.add(indexRowkey);
+			}
+			log.setIndexRowkeys(indexRowkeys);
+		}
+		return log;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
new file mode 100755
index 0000000..c8b9a33
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+public class HBaseLogReader2 extends AbstractHBaseLogReader<InternalLog> {
+	protected ResultScanner rs;
+
+	public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] outputQualifiers) {
+		super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers);
+	}
+
+	/**
+	 * This constructor supports partition.
+	 *
+	 * @param ed               entity definition
+	 * @param partitions       partition values, which is sorted in partition definition order. TODO: in future we need to support
+	 *                         multiple values for one partition field
+	 * @param startTime        start time of the query
+	 * @param endTime          end time of the query
+	 * @param filter           filter for the hbase scan
+	 * @param lastScanKey      the key of last scan
+	 * @param outputQualifiers the bytes of output qualifier names
+	 * @param prefix           can be populated from outside world specifically for generic metric reader
+	 */
+	public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix) {
+		super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix);
+	}
+
+	@Override
+	protected void onOpen(HTableInterface tbl, Scan scan) throws IOException {
+		rs = tbl.getScanner(scan);
+	}
+
+	/**
+	 * <h2>Close:</h2>
+	 * 1. Call super.close(): release current table connection <br></br>
+	 * 2. Close Scanner<br></br>
+	 *
+	 * @throws IOException
+	 */
+	@Override
+	public void close() throws IOException {
+		super.close();
+		if(rs != null){
+			rs.close();
+		}
+	}
+
+	@Override
+	public InternalLog read() throws IOException {
+		if (rs == null)
+			throw new IllegalArgumentException(
+					"ResultScanner must be initialized before reading");
+		InternalLog t = null;
+		Result r = rs.next();
+		if (r != null) {
+			t = HBaseInternalLogHelper.parse(_ed, r, qualifiers);
+		}
+		return t;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
new file mode 100644
index 0000000..059ee7f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.hadoop.hbase.client.HTableFactory;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseLogWriter implements LogWriter {
+	private static Logger LOG = LoggerFactory.getLogger(HBaseLogWriter.class);
+	private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes();
+	
+	private HTableInterface tbl;
+	private String table;
+	private String columnFamily;
+	
+	public HBaseLogWriter(String table, String columnFamily) {
+		// TODO assert for non-null of table and columnFamily
+		this.table = table;
+		this.columnFamily = columnFamily;
+	}
+	
+	@Override
+	public void open() throws IOException {
+		try{
+			tbl = EagleConfigFactory.load().getHTable(this.table);
+//			LOGGER.info("HBase table " + table + " audo reflush is " + (tbl.isAutoFlush() ? "enabled" : "disabled"));
+		}catch(Exception ex){
+			LOG.error("Cannot create htable", ex);
+			throw new IOException(ex);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		if(tbl != null){
+			new HTableFactory().releaseHTableInterface(tbl);
+		}
+	}
+
+	@Override
+	public void flush() throws IOException {
+		tbl.flushCommits();
+	}
+	
+	protected void populateColumnValues(Put p, InternalLog log){
+		Map<String, byte[]> qualifierValues = log.getQualifierValues();
+		// iterate all qualifierValues
+		for(Map.Entry<String, byte[]> entry : qualifierValues.entrySet()){
+			p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue());
+		}
+		
+		Map<String, String> tags = log.getTags();
+		// iterate all tags, each tag will be stored as a column qualifier
+		if(tags != null){
+			for(Map.Entry<String, String> entry : tags.entrySet()){
+				// TODO need a consistent handling of null values
+				if(entry.getValue() != null)
+					p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue().getBytes());
+			}
+		}
+	}
+
+	/**
+	 * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
+	 */
+	@Override
+	public byte[] write(InternalLog log) throws IOException{
+		final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
+		final Put p = new Put(rowkey);
+		populateColumnValues(p, log);
+		tbl.put(p);
+		final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+		if (indexRowkeys != null) {
+			writeIndexes(rowkey, indexRowkeys);
+		}
+		return rowkey;
+	}
+
+	/**
+	 * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
+	 */
+	public List<byte[]> write(List<InternalLog> logs) throws IOException{
+		final List<Put> puts = new ArrayList<Put>(logs.size());
+		final List<byte[]> result = new ArrayList<byte[]>(logs.size());
+		for (InternalLog log : logs) {
+			final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
+			final Put p = new Put(rowkey);
+			populateColumnValues(p, log);
+			puts.add(p);
+			final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+			if (indexRowkeys != null) {
+				writeIndexes(rowkey, indexRowkeys, puts);
+			}
+			result.add(rowkey);
+		}
+		tbl.put(puts);
+		return result;
+	}
+	
+	@Override
+	public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException{
+		Put p = new Put(rowkey);
+		populateColumnValues(p, log);
+		tbl.put(p);
+		final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+		if (indexRowkeys != null) {
+			writeIndexes(rowkey, indexRowkeys);
+		}
+	}
+
+	private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) throws IOException {
+		for (byte[] indexRowkey : indexRowkeys) {
+			Put p = new Put(indexRowkey);
+			p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
+			tbl.put(p);
+		}
+	}
+
+	private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, List<Put> puts) throws IOException {
+		for (byte[] indexRowkey : indexRowkeys) {
+			Put p = new Put(indexRowkey);
+			p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
+			puts.add(p);
+//			tbl.put(p);
+		}
+	}
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
new file mode 100755
index 0000000..8276640
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.common.DateTimeUtil;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO we should decouple BaseLog during write time and BaseLog during read time
+ */
+public class InternalLog {
+	private String encodedRowkey;
+	private String prefix;
+	private String[] partitions;
+	private long timestamp;
+	private Map<String, byte[]> qualifierValues;
+
+	private Map<String,Object> extraValues;
+	private Map<String, String> tags;
+	private Map<String, List<String>> searchTags;
+	private List<byte[]> indexRowkeys;
+
+	public String getEncodedRowkey() {
+		return encodedRowkey;
+	}
+
+	public void setEncodedRowkey(String encodedRowkey) {
+		this.encodedRowkey = encodedRowkey;
+	}
+	
+	public Map<String, byte[]> getQualifierValues() {
+		return qualifierValues;
+	}
+	public void setQualifierValues(Map<String, byte[]> qualifierValues) {
+		this.qualifierValues = qualifierValues;
+	}
+
+	public Map<String, List<String>> getSearchTags() {
+		return searchTags;
+	}
+	public void setSearchTags(Map<String, List<String>> searchTags) {
+		this.searchTags = searchTags;
+	}
+	public String getPrefix() {
+		return prefix;
+	}
+	public void setPrefix(String prefix) {
+		this.prefix = prefix;
+	}
+	public String[] getPartitions() {
+		return partitions;
+	}
+	public void setPartitions(String[] partitions) {
+		this.partitions = partitions;
+	}
+	public long getTimestamp() {
+		return timestamp;
+	}
+	public void setTimestamp(long timestamp) {
+		this.timestamp = timestamp;
+	}
+	public Map<String, String> getTags() {
+		return tags;
+	}
+	public void setTags(Map<String, String> tags) {
+		this.tags = tags;
+	}
+	public List<byte[]> getIndexRowkeys() {
+		return indexRowkeys;
+	}
+	public void setIndexRowkeys(List<byte[]> indexRowkeys) {
+		this.indexRowkeys = indexRowkeys;
+	}
+	public Map<String, Object> getExtraValues() { return extraValues; }
+	public void setExtraValues(Map<String, Object> extraValues) { this.extraValues = extraValues; }
+
+	public String toString(){
+		StringBuffer sb = new StringBuffer();
+		sb.append(prefix);
+		sb.append("|");
+		sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp));
+		sb.append("(");
+		sb.append(timestamp);
+		sb.append(")");
+		sb.append("|searchTags:");
+		if(searchTags != null){
+			for(String tagkey : searchTags.keySet()){
+				sb.append(tagkey);
+				sb.append('=');
+				List<String> tagValues = searchTags.get(tagkey);
+				sb.append("(");
+				for(String tagValue : tagValues){
+					sb.append(tagValue);
+					sb.append(",");
+				}
+				sb.append(")");
+				sb.append(",");
+			}
+		}
+		sb.append("|tags:");
+		if(tags != null){
+			for(Map.Entry<String, String> entry : tags.entrySet()){
+				sb.append(entry.getKey());
+				sb.append("=");
+				sb.append(entry.getValue());
+				sb.append(",");
+			}
+		}
+		sb.append("|columns:");
+		if(qualifierValues != null){
+			for(String qualifier : qualifierValues.keySet()){
+				byte[] value = qualifierValues.get(qualifier);
+				sb.append(qualifier);
+				sb.append("=");
+				if(value != null){
+					sb.append(new String(value));
+				}
+				sb.append(",");
+			}
+		}
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/ListQueryAPIResponseEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/ListQueryAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/ListQueryAPIResponseEntity.java
new file mode 100755
index 0000000..15de946
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/ListQueryAPIResponseEntity.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/**
+ * TODO: (hchen9) currently we disable firstTimestamp in response avoid breaking older client implementation, but we may need to remove "firstTimestamp" from @JsonIgnoreProperties(ignoreUnknown = true,value={"firstTimestamp"}) to enable the feature later
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true,value={"firstTimestamp"})
+public class ListQueryAPIResponseEntity {
+	private boolean success;
+	private String exception;
+	private int totalResults;
+	private long elapsedms;
+	private long lastTimestamp;
+	private long firstTimestamp;
+	public long getFirstTimestamp() {
+		return firstTimestamp;
+	}
+	public void setFirstTimestamp(long firstTimestamp) {
+		this.firstTimestamp = firstTimestamp;
+	}
+	private Object obj;
+
+	public long getElapsedms() {
+		return elapsedms;
+	}
+	public void setElapsedms(long elapsedms) {
+		this.elapsedms = elapsedms;
+	}
+	public boolean isSuccess() {
+		return success;
+	}
+	public void setSuccess(boolean success) {
+		this.success = success;
+	}
+	public String getException() {
+		return exception;
+	}
+	public void setException(String exception) {
+		this.exception = exception;
+	}
+	public int getTotalResults() {
+		return totalResults;
+	}
+	public void setTotalResults(int totalResults) {
+		this.totalResults = totalResults;
+	}
+	public long getLastTimestamp() {
+		return lastTimestamp;
+	}
+	public void setLastTimestamp(long lastTimestamp) {
+		this.lastTimestamp = lastTimestamp;
+	}
+	public Object getObj() {
+		return obj;
+	}
+	public void setObj(Object obj) {
+		this.obj = obj;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/LogReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/LogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/LogReader.java
new file mode 100755
index 0000000..da1e1ab
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/LogReader.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface LogReader<T> extends Closeable{
+	public void open() throws IOException;
+
+	public void close() throws IOException;
+	
+	public T read() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/LogWriter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/LogWriter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/LogWriter.java
new file mode 100644
index 0000000..6ef4ee3
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/LogWriter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface LogWriter extends Closeable{
+	public void flush() throws IOException;
+
+	public void open() throws IOException;
+
+	public void close() throws IOException;
+
+	public byte[] write(InternalLog log) throws IOException;
+	
+	public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/MetricMetadataEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/MetricMetadataEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/MetricMetadataEntity.java
new file mode 100755
index 0000000..f655ba0
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/MetricMetadataEntity.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Indexes;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Index;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagle_metric")
+@ColumnFamily("f")
+@Prefix("dmeta")
+@Service("MetricMetadataService")
+@TimeSeries(false)
+@Indexes({
+	@Index(name="Index_1_name", columns = { "name" }, unique = true)
+	})
+public class MetricMetadataEntity extends TaggedLogAPIEntity {
+	
+	@Column("a")
+	private String storeType;
+	@Column("b")
+	private String displayName;
+	@Column("c")
+	private String defaultDownSamplingFunction;
+	@Column("d")
+	private String defaultAggregateFunction;
+	@Column("e")
+	private String aggFunctions;
+	@Column("f")
+	private String downSamplingFunctions;
+	@Column("g")
+	private String resolutions;
+	@Column("h")
+	private String drillDownPaths;
+	
+	public String getStoreType() {
+		return storeType;
+	}
+	public void setStoreType(String storeType) {
+		this.storeType = storeType;
+		_pcs.firePropertyChange("storeType", null, null);
+	}
+	public String getDisplayName() {
+		return displayName;
+	}
+	public void setDisplayName(String displayName) {
+		this.displayName = displayName;
+		_pcs.firePropertyChange("displayName", null, null);
+	}
+	public String getDefaultDownSamplingFunction() {
+		return defaultDownSamplingFunction;
+	}
+	public void setDefaultDownSamplingFunction(String defaultDownSamplingFunction) {
+		this.defaultDownSamplingFunction = defaultDownSamplingFunction;
+		_pcs.firePropertyChange("defaultDownSamplingFunction", null, null);
+	}
+	public String getDefaultAggregateFunction() {
+		return defaultAggregateFunction;
+	}
+	public void setDefaultAggregateFunction(String defaultAggregateFunction) {
+		this.defaultAggregateFunction = defaultAggregateFunction;
+		_pcs.firePropertyChange("defaultAggregateFunction", null, null);
+	}
+	public String getAggFunctions() {
+		return aggFunctions;
+	}
+	public void setAggFunctions(String aggFunctions) {
+		this.aggFunctions = aggFunctions;
+		_pcs.firePropertyChange("aggFunctions", null, null);
+	}
+	public String getDownSamplingFunctions() {
+		return downSamplingFunctions;
+	}
+	public void setDownSamplingFunctions(String downSamplingFunctions) {
+		this.downSamplingFunctions = downSamplingFunctions;
+		_pcs.firePropertyChange("downSamplingFunctions", null, null);
+	}
+	public String getResolutions() {
+		return resolutions;
+	}
+	public void setResolutions(String resolutions) {
+		this.resolutions = resolutions;
+		_pcs.firePropertyChange("resolutions", null, null);
+	}
+	public String getDrillDownPaths() {
+		return drillDownPaths;
+	}
+	public void setDrillDownPaths(String drillDownPaths) {
+		this.drillDownPaths = drillDownPaths;
+		_pcs.firePropertyChange("drillDownPaths", null, null);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/QualifierCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/QualifierCreationListener.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/QualifierCreationListener.java
new file mode 100755
index 0000000..b0eeaed
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/QualifierCreationListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import java.util.Map;
+
+public interface QualifierCreationListener {
+	/**
+	 * Qualifier <b>display name</b> mapped to qualifier value in bytes[]
+	 *
+	 * @param qualifiers
+	 */
+	public void qualifierCreated(Map<String, byte[]> qualifiers);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/QualifierNotDefinedException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/QualifierNotDefinedException.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/QualifierNotDefinedException.java
new file mode 100644
index 0000000..88135bb
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/QualifierNotDefinedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+public class QualifierNotDefinedException extends Exception{
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+
+	public QualifierNotDefinedException(String message){
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java
new file mode 100755
index 0000000..1978d43
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.eagle.log.entity.meta.EntityConstants;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.common.ByteUtil;
+
+public class RowkeyBuilder {
+	
+	public static final int EMPTY_PARTITION_DEFAULT_HASH_CODE = 0;
+	
+	/**
+	 * Generate the internal sorted hashmap for tags. Please note the partition tags should not be included in the result map.
+	 * @param partitions array of partition tags in order
+	 * @param tags tags of the entity
+	 * @return the sorted hash map of the tags
+	 */
+	public static SortedMap<Integer, Integer> generateSortedTagMap(String[] partitions, Map<String, String> tags) {
+		final SortedMap<Integer, Integer> tagHashMap = new TreeMap<Integer, Integer>();
+		for (Map.Entry<String, String> entry: tags.entrySet()) {
+			final String tagName = entry.getKey();
+			final String tagValue = entry.getValue();
+			// If it's a partition tag, we need to remove it from tag hash list. It need to 
+			// put to the fix partition hash slot in rowkey.
+			if (tagValue == null || isPartitionTag(partitions, tagName))
+				continue;
+			tagHashMap.put(tagName.hashCode(), tagValue.hashCode());
+		}
+		return tagHashMap;
+	}
+	
+	/**
+	 * build rowkey from InternalLog object
+	 * @param log internal log entity to write
+	 * @return the rowkey of the entity
+	 */
+	public static byte[] buildRowkey(InternalLog log) {
+		final String[] partitions = log.getPartitions();
+		final Map<String, String> tags = log.getTags();
+		final SortedMap<Integer, Integer> tagHashMap = generateSortedTagMap(partitions, tags);
+		
+		// reverse timestamp
+		long ts = Long.MAX_VALUE - log.getTimestamp();
+		
+		List<Integer> partitionHashValues = new ArrayList<Integer>();
+		if (partitions != null) {
+			for (String partition : partitions) {
+				final String tagValue = tags.get(partition);
+				if (tagValue != null) {
+					partitionHashValues.add(tagValue.hashCode());
+				} else {
+					partitionHashValues.add(EMPTY_PARTITION_DEFAULT_HASH_CODE);
+				}
+			}
+		}
+		return buildRowkey(log.getPrefix().hashCode(), partitionHashValues, ts, tagHashMap);
+	}
+	
+	public static long getTimestamp(byte[] rowkey, EntityDefinition ed) {
+		if (!ed.isTimeSeries()) {
+			return EntityConstants.FIXED_WRITE_TIMESTAMP;
+		}
+		final int offset = (ed.getPartitions() == null) ? 4 : (4 + ed.getPartitions().length * 4);
+		return Long.MAX_VALUE - ByteUtil.bytesToLong(rowkey, offset);
+	}
+	
+	/**
+	 * Check if the tagName is one of the partition tags
+	 * @param partitions paritition tags of the entity
+	 * @param tagName the tag name that needs to check
+	 * @return
+	 */
+	private static boolean isPartitionTag(String[] partitions, String tagName) {
+		if (partitions != null) {
+			for (String partition : partitions) {
+				if (partition.equals(tagName)) {
+					return true;
+				}
+			}
+		}
+		return false;
+	}
+
+	/**
+	 * rowkey is: prefixHash:4 + (partitionValueHash:4)* + timestamp:8 + (tagnameHash:4 + tagvalueHash:4)*
+	 * partition fields are sorted by partition definition order, while tag fields are sorted by tag name's 
+	 * hash code values. 
+	 */
+	private static byte[] buildRowkey(int prefixHash, List<Integer> partitionHashValues, long timestamp, SortedMap<Integer, Integer> tags){
+		// alloacate byte array for rowkey
+		final int len = 4 + 8 + tags.size() * (4 + 4) + (partitionHashValues.size() * 4);
+		final byte[] rowkey = new byte[len];
+		int offset = 0;
+
+		// 1. set prefix
+		ByteUtil.intToBytes(prefixHash, rowkey, offset);
+		offset += 4;
+		
+		// 2. set partition
+		for (Integer partHash : partitionHashValues) {
+			ByteUtil.intToBytes(partHash, rowkey, offset);
+			offset += 4;
+		}
+		
+		// 3. set timestamp
+		ByteUtil.longToBytes(timestamp, rowkey, offset);
+		offset += 8;
+
+		// 4. set tag key/value hashes
+		for (Map.Entry<Integer, Integer> entry : tags.entrySet()) {
+			ByteUtil.intToBytes(entry.getKey(), rowkey, offset);
+			offset += 4;
+			ByteUtil.intToBytes(entry.getValue(), rowkey, offset);
+			offset += 4;
+		}
+		
+		return rowkey;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyQueryAPIResponseEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyQueryAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyQueryAPIResponseEntity.java
new file mode 100644
index 0000000..1745f74
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyQueryAPIResponseEntity.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+public class RowkeyQueryAPIResponseEntity {
+	private boolean success;
+	private String exception;
+	private Object obj;
+
+	public boolean isSuccess() {
+		return success;
+	}
+	public void setSuccess(boolean success) {
+		this.success = success;
+	}
+	public String getException() {
+		return exception;
+	}
+	public void setException(String exception) {
+		this.exception = exception;
+	}
+	public Object getObj() {
+		return obj;
+	}
+	public void setObj(Object obj) {
+		this.obj = obj;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/SearchCondition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/SearchCondition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/SearchCondition.java
new file mode 100755
index 0000000..fada0e2
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/SearchCondition.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.query.parser.ORExpression;
+import org.apache.hadoop.hbase.filter.Filter;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * search condition includes the following:
+ * 1. prefix - part of rowkey
+ * 2. startTime,endTime - timestamp, part of rowkey
+ * 3. hbase filter converted from query 
+ * 4. aggregate parameters
+ * 4. sort options
+ * 5. output fields and tags
+ * 6. entityName
+ * 7. pagination: pageSize and startRowkey
+ */
+public class SearchCondition{
+	private String startTime;
+	private String endTime;
+	private Filter filter;
+	private List<String> outputFields;
+	private boolean outputAll;
+	private long pageSize;
+	private String startRowkey;
+	private String entityName;
+	private List<String> partitionValues;
+	private ORExpression queryExpression;
+
+	public boolean isOutputVerbose() {
+		return outputVerbose;
+	}
+
+	public void setOutputVerbose(boolean outputVerbose) {
+		this.outputVerbose = outputVerbose;
+	}
+
+	public Map<String, String> getOutputAlias() {
+		return outputAlias;
+	}
+
+	public void setOutputAlias(Map<String, String> outputAlias) {
+		this.outputAlias = outputAlias;
+	}
+
+	private boolean outputVerbose;
+	private Map<String,String> outputAlias;
+
+	/**
+	 * copy constructor
+	 * @param sc
+	 */
+	public SearchCondition(SearchCondition sc){
+		this.startTime = sc.startTime;
+		this.endTime = sc.endTime;
+		this.filter = sc.filter;
+		this.outputFields = sc.outputFields;
+		this.pageSize = sc.pageSize;
+		this.startRowkey = sc.startRowkey;
+		this.entityName = sc.entityName;
+		this.partitionValues = sc.partitionValues;
+		this.queryExpression = sc.queryExpression;
+	}
+	
+	public SearchCondition(){
+	}
+	
+	public Filter getFilter() {
+		return filter;
+	}
+	public void setFilter(Filter filter) {
+		this.filter = filter;
+	}
+	public long getPageSize() {
+		return pageSize;
+	}
+	public void setPageSize(long pageSize) {
+		this.pageSize = pageSize;
+	}
+	public String getStartRowkey() {
+		return startRowkey;
+	}
+	public void setStartRowkey(String startRowkey) {
+		this.startRowkey = startRowkey;
+	}
+	public String getEntityName() {
+		return entityName;
+	}
+	public void setEntityName(String entityName) {
+		this.entityName = entityName;
+	}
+	public List<String> getOutputFields() {
+		return outputFields;
+	}
+	public void setOutputFields(List<String> outputFields) {
+		this.outputFields = outputFields;
+	}
+	public String getStartTime() {
+		return startTime;
+	}
+	public void setStartTime(String startTime) {
+		this.startTime = startTime;
+	}
+	public String getEndTime() {
+		return endTime;
+	}
+	public void setEndTime(String endTime) {
+		this.endTime = endTime;
+	}
+	public List<String> getPartitionValues() {
+		return partitionValues;
+	}
+	public void setPartitionValues(List<String> partitionValues) {
+		this.partitionValues = partitionValues;
+	}
+	public ORExpression getQueryExpression() {
+		return queryExpression;
+	}
+	public void setQueryExpression(ORExpression queryExpression) {
+		this.queryExpression = queryExpression;
+	}
+
+	public boolean isOutputAll() {
+		return outputAll;
+	}
+
+	public void setOutputAll(boolean outputAll) {
+		this.outputAll = outputAll;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/StreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/StreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/StreamReader.java
new file mode 100755
index 0000000..005a2e2
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/StreamReader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class StreamReader {
+	protected List<EntityCreationListener> _listeners = new ArrayList<EntityCreationListener>();
+
+	/**
+	 * Listener can be only notified after it is added to listener list
+	 * @param listener
+	 */
+	public synchronized void register(EntityCreationListener listener){
+		_listeners.add(listener);
+	}
+	
+	/**
+	 * Listener can not get notification once after it is removed from listener list
+	 * @param listener
+	 */
+	public synchronized void unregister(EntityCreationListener listener){
+		_listeners.remove(listener);
+	}
+	
+	public abstract void readAsStream() throws Exception;
+	
+	/**
+	 * Get scanned last entity timestamp
+	 * 
+	 * @return
+	 */
+	public abstract long getLastTimestamp();
+	
+	/**
+	 * Get scanned first entity timestamp
+	 * @return
+	 */
+	public abstract long getFirstTimestamp();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/BooleanExpressionComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/BooleanExpressionComparator.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/BooleanExpressionComparator.java
new file mode 100755
index 0000000..0d71e10
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/BooleanExpressionComparator.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import org.apache.eagle.log.entity.EntityQualifierUtils;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
+import org.apache.eagle.log.entity.meta.Qualifier;
+import org.apache.eagle.log.expression.ExpressionParser;
+import org.apache.eagle.log.expression.ParsiiInvalidException;
+import org.apache.eagle.query.parser.ComparisonOperator;
+import org.apache.eagle.query.parser.TokenType;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.WritableComparable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import parsii.tokenizer.ParseException;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * BooleanExpressionComparator
+ *
+ * Currently support double expression only.
+ *
+ * TODO: 1) thread-safe? 2) Rewrite filter expression to evaluate once
+ *
+ */
+public class BooleanExpressionComparator implements WritableComparable<List<KeyValue>> {
+    private final static Logger LOG = LoggerFactory.getLogger(BooleanExpressionComparator.class);
+
+    // Should be Writable
+    private QualifierFilterEntity filterEntity;
+    private EntityDefinition ed;
+
+    // Should not be writable
+    private double leftValue;
+    private double rightValue;
+    private BooleanExprFunc func = null;
+
+    public Set<String> getRequiredFields() {
+        return requiredFields;
+    }
+
+    private Set<String> requiredFields = new HashSet<String>();
+
+    public BooleanExpressionComparator(){}
+
+    public BooleanExpressionComparator(QualifierFilterEntity entity,EntityDefinition ed){
+        this.filterEntity  = entity;
+        this.ed = ed;
+        try {
+            this.init();
+        } catch (Exception ex) {
+            // Client side expression validation to fast fail if having error
+            LOG.error("Got exception: "+ex.getMessage(),ex);
+            throw new ExpressionEvaluationException(ex.getMessage(),ex);
+        }
+    }
+
+    private void init() throws ParsiiInvalidException, ParseException {
+        LOG.info("Filter expression: "+filterEntity.toString());
+        if (filterEntity.getKey() != null) {
+            if (filterEntity.getKeyType() == TokenType.NUMBER) {
+                leftValue = Double.parseDouble(filterEntity.getKey());
+            } else {
+                ExpressionParser parser = ExpressionParser.parse(filterEntity.getKey());
+                requiredFields.addAll(parser.getDependentFields());
+            }
+        } else {
+            throw new IllegalStateException("QualifierFilterEntity key is null");
+        }
+
+        if (filterEntity.getValue() != null) {
+            if (filterEntity.getValueType() == TokenType.NUMBER) {
+                rightValue = Double.parseDouble(filterEntity.getValue());
+            } else {
+                ExpressionParser parser = ExpressionParser.parse(filterEntity.getValue());
+                requiredFields.addAll(parser.getDependentFields());
+            }
+        } else {
+            throw new IllegalStateException("QualifierFilterEntity value is null");
+        }
+
+        if (this.filterEntity.getOp() == null)
+            throw new IllegalStateException("QualifierFilterEntity op is null");
+        this.func = _opExprFuncMap.get(this.filterEntity.getOp());
+        if (this.func == null)
+            throw new IllegalStateException("No boolean evaluation function found for operation: " + this.filterEntity.getOp());
+    }
+
+    /**
+     * if(Double.isInfinite(leftValue) || Double.isInfinite(rightValue)) return false;
+     *
+     * @param context Map[String,Double]
+     * @return evaluation result as true (1) or false (0)
+     * @throws Exception
+     */
+    private boolean eval(Map<String,Double> context) throws Exception {
+        if(filterEntity.getKeyType() != TokenType.NUMBER){
+            leftValue = eval(filterEntity.getKey(),context);
+        }
+        if(filterEntity.getValueType() != TokenType.NUMBER){
+            rightValue = eval(filterEntity.getValue(),context);
+        }
+        if(Double.isInfinite(leftValue) || Double.isInfinite(rightValue)){
+//            if(LOG.isDebugEnabled()) {
+            if (Double.isInfinite(leftValue)) {
+                LOG.warn("Evaluation result of key: " + this.filterEntity.getKey() + " is " + leftValue + " (Infinite), ignore");
+            } else {
+                LOG.warn("Evaluation result of value: "+this.filterEntity.getValue()+" is "+rightValue+" (Infinite), ignore");
+            }
+//            }
+            return false;
+        }
+        return func.eval(leftValue,rightValue);
+    }
+
+    /**
+     * if(Double.isInfinite(leftValue) || Double.isInfinite(rightValue)) return false;
+     *
+     * @param expr
+     * @param context
+     * @return
+     * @throws Exception
+     */
+    private double eval(String expr,Map<String,Double> context) throws Exception {
+        return ExpressionParser.parse(expr).eval(context);
+    }
+
+    /**
+     *
+     * @param row List[KeyValue] All key values in a row
+     *
+     * @return 0 to filter out row [false], otherwise to include row into scanner [true]
+     */
+    @Override
+    public int compareTo(List<KeyValue> row) {
+        Map<String,Double> context = new HashMap<String, Double>();
+        for(KeyValue kv:row){
+            String qualifierName = new String(kv.getQualifier());
+
+            // Because assume just handle about double value
+            // so ignore tag whose value is String
+            if(!this.ed.isTag(qualifierName)){
+                Qualifier qualifier = this.ed.getQualifierNameMap().get(qualifierName);
+                String displayName = qualifier.getDisplayName();
+                if(displayName == null) displayName = qualifierName;
+                try {
+                    if(this.requiredFields.contains(displayName)) {
+                        EntitySerDeser serDeser = qualifier.getSerDeser();
+                        double value = EntityQualifierUtils.convertObjToDouble(serDeser.deserialize(kv.getValue()));
+                        if (Double.isNaN(value)) {
+                            context.put(displayName, value);
+                        }
+                    }
+                }catch (Exception ex){
+                    LOG.warn("Failed to parse value of field "+displayName+" as double, ignore: "+ex.getMessage(),ex);
+                }
+            }
+        }
+        return compareTo(context);
+    }
+
+    /**
+     * @param context Map[String,Double]
+     *
+     * @return context.keySet().containsAll(this.requiredFields) && eval(context) ? 1:0;
+     */
+    int compareTo(Map<String,Double> context){
+        try {
+            if(context.keySet().containsAll(this.requiredFields)){
+                return eval(context)? 1:0;
+            }else{
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Require variables: [" + StringUtils.join(this.requiredFields, ",") + "], but just given: [" + StringUtils.join(context.keySet(), ",") + "]");
+                }
+                return 0;
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(),e);
+            throw new ExpressionEvaluationException(e.getMessage(),e);
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        this.filterEntity.write(out);
+        this.ed.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.filterEntity = new QualifierFilterEntity();
+        this.filterEntity.readFields(in);
+        this.ed = new EntityDefinition();
+        this.ed.readFields(in);
+
+        try {
+            this.init();
+        } catch (Exception ex){
+            LOG.error("Got exception: "+ex.getMessage(),ex);
+            throw new IOException(ex.getMessage(),ex);
+        }
+    }
+
+    private static Map<ComparisonOperator,BooleanExprFunc> _opExprFuncMap = new HashMap<ComparisonOperator, BooleanExprFunc>();
+
+    static {
+        _opExprFuncMap.put(ComparisonOperator.EQUAL,new EqualExprFunc());
+        _opExprFuncMap.put(ComparisonOperator.IS,new EqualExprFunc());
+
+        _opExprFuncMap.put(ComparisonOperator.NOT_EQUAL,new NotEqualExprFunc());
+        _opExprFuncMap.put(ComparisonOperator.IS_NOT,new NotEqualExprFunc());
+
+        _opExprFuncMap.put(ComparisonOperator.LESS,new LessExprFunc());
+        _opExprFuncMap.put(ComparisonOperator.LESS_OR_EQUAL,new LessOrEqualExprFunc());
+        _opExprFuncMap.put(ComparisonOperator.GREATER,new GreaterExprFunc());
+        _opExprFuncMap.put(ComparisonOperator.GREATER_OR_EQUAL,new GreaterOrEqualExprFunc());
+
+        // "Life should be much better with functional programming language" - Hao Chen Nov 18th, 2014
+    }
+
+    private static interface BooleanExprFunc {
+        boolean eval(double val1,double val2);
+    }
+
+    private static class EqualExprFunc implements BooleanExprFunc {
+        @Override
+        public boolean eval(double val1, double val2) {
+            return val1 == val2;
+        }
+    }
+    private static class NotEqualExprFunc implements BooleanExprFunc {
+        @Override
+        public boolean eval(double val1, double val2) {
+            return val1 != val2;
+        }
+    }
+
+    private static class LessExprFunc implements BooleanExprFunc {
+        @Override
+        public boolean eval(double val1, double val2) {
+            return val1 < val2;
+        }
+    }
+    private static class LessOrEqualExprFunc implements BooleanExprFunc {
+        @Override
+        public boolean eval(double val1, double val2) {
+            return val1 <= val2;
+        }
+    }
+    private static class GreaterExprFunc implements BooleanExprFunc {
+        @Override
+        public boolean eval(double val1, double val2) {
+            return val1 > val2;
+        }
+    }
+    private static class GreaterOrEqualExprFunc implements BooleanExprFunc {
+        @Override
+        public boolean eval(double val1, double val2) {
+            return val1 >= val2;
+        }
+    }
+
+    public static class ExpressionEvaluationException extends RuntimeException{
+        public ExpressionEvaluationException(String message, Throwable cause) {
+            super(message, cause);
+        }
+        public ExpressionEvaluationException(String message) {
+            super(message);
+        }
+        public ExpressionEvaluationException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return this.getClass().getSimpleName()+" ("+this.filterEntity.toString()+")";
+    }
+}
\ No newline at end of file


Mime
View raw message