eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [18/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"
Date Thu, 19 Nov 2015 10:47:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ExpressionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ExpressionParser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ExpressionParser.java
deleted file mode 100755
index 58a1e04..0000000
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ExpressionParser.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package eagle.log.expression;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.log.entity.EntityQualifierUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import parsii.eval.Expression;
-import parsii.eval.Parser;
-import parsii.eval.Scope;
-import parsii.eval.Variable;
-import parsii.tokenizer.ParseException;
-
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * <h1>Expression Evaluation</h1>
- *
- * Given expression in string and set context variables, return value in double
- *
- * <br/>
- * <br/>
- * For example:
- * <code>EXP{(max(a, b)* min(a, b)) / abs(a-b+c-d)} => 600.0</code>
- *
- * <br/>
- * <br/>
- * <b>NOTE:</b>  Expression variable <b>must</b> be in format: <code>fieldName</code> instead of <code>@fieldName</code>
- *
- * <br/>
- * <br/>
- * <h2>Dependencies:</h2>
- * <ul>
- *     <li>
- *         <a href="https://github.com/scireum/parsii">scireum/parsii</a>
- *         <i>Super fast and simple evaluator for mathematical expressions written in Java</i>
- *     </li>
- * </ul>
- *
- */
-public class ExpressionParser{
-	private final static Logger LOG = LoggerFactory.getLogger(ExpressionParser.class);
-
-	private String exprStr;
-	private Expression expression;
-	private Scope scope;
-
-	@SuppressWarnings("unused")
-	public Scope getScope() {
-		return scope;
-	}
-
-	private List<String> dependentFields;
-
-	/**
-	 * @param exprStr expression string in format like: <code>(max(a, b)* min(a, b)) / abs(a-b+c-d)</code>
-	 *
-	 * @throws ParseException
-	 * @throws ParsiiInvalidException
-	 */
-	public ExpressionParser(String exprStr) throws ParseException, ParsiiInvalidException{
-		this.exprStr = exprStr;
-		scope = Scope.create();
-		expression = Parser.parse(this.exprStr,scope);
-	}
-
-	@SuppressWarnings("unused")
-	public ExpressionParser(String exprStr, Map<String, Double> context) throws ParsiiInvalidException, ParseException, ParsiiUnknowVariableException {
-		this(exprStr);
-		setVariables(context);
-	}
-	
-	public ExpressionParser setVariables(Map<String, Double> tuple) throws ParsiiUnknowVariableException{
-//		for(String valName : tuple.keySet()) {
-//			Double value = tuple.get(valName);
-		for(Map.Entry<String,Double> entry : tuple.entrySet()) {
-            String valName = entry.getKey();
-            Double value = entry.getValue();
-			Variable variable = scope.getVariable(valName);
-			if(variable!=null && value !=null) {
-				variable.setValue(value);
-			}else{
-				if(LOG.isDebugEnabled()) LOG.warn("Variable for "+valName+" is null in scope of expression: "+this.exprStr);
-			}
-		}
-		return this;
-	}
-
-	@SuppressWarnings("unused")
-	public ExpressionParser setVariable(Entry<String, Double> tuple) throws ParsiiUnknowVariableException{
-		if (getDependentFields().contains(tuple.getKey())) {
-			scope.getVariable(tuple.getKey()).setValue(tuple.getValue());
-		}
-		else {
-			throw new ParsiiUnknowVariableException("unknown variable: " + tuple.getKey());
-		}
-		return this;
-	}
-	
-	public ExpressionParser setVariable(String key, Double value) throws ParsiiUnknowVariableException{
-		scope.getVariable(key).setValue(value);
-		return this;
-	}
-
-	public double eval() throws Exception{
-		return expression.evaluate();
-	}
-
-	/**
-	 * Thread safe
-	 *
-	 * @param tuple
-	 * @return
-	 * @throws ParsiiUnknowVariableException
-	 */
-	public double eval(Map<String, Double> tuple) throws Exception {
-		synchronized (this){
-			this.setVariables(tuple);
-			return this.eval();
-		}
-	}
-
-	public List<String> getDependentFields() {
-		if (dependentFields == null) {
-			dependentFields = new ArrayList<String>();
-			for (String variable : scope.getNames()) {
-				if (!variable.equals("pi") && !variable.equals("E") && !variable.equals("euler"))
-					dependentFields.add(variable);
-			}
-		}
-		return dependentFields; 
-	}
-
-	private final static Map<String, ExpressionParser> _exprParserCache = new HashMap<String, ExpressionParser>();
-
-	/**
-	 * Thread safe
-	 *
-	 * @param expr
-	 * @return
-	 * @throws ParsiiInvalidException
-	 * @throws ParseException
-	 */
-	public static ExpressionParser parse(String expr) throws ParsiiInvalidException, ParseException {
-		if(expr == null) throw new IllegalStateException("Expression to parse is null");
-		synchronized (_exprParserCache) {
-			ExpressionParser parser = _exprParserCache.get(expr);
-			if (parser == null) {
-				parser = new ExpressionParser(expr);
-				_exprParserCache.put(expr, parser);
-			}
-			return parser;
-		}
-	}
-	public static double eval(String expression,Map<String,Double> context) throws Exception {
-		ExpressionParser parser = parse(expression);
-		return parser.eval(context);
-	}
-
-	private static final Map<String,Method> _entityMethodCache = new HashMap<String, Method>();
-	public static double eval(String expression,TaggedLogAPIEntity entity) throws Exception {
-		ExpressionParser parser = parse(expression);
-		List<String> dependencies = parser.getDependentFields();
-		Map<String,Double> context = new HashMap<String,Double>();
-		for(String field:dependencies){
-			String methodName = "get"+field.substring(0, 1).toUpperCase() + field.substring(1);
-			String methodUID = entity.getClass().getName()+"."+methodName;
-
-			Method m;
-			synchronized (_entityMethodCache) {
-				m = _entityMethodCache.get(methodUID);
-				if (m == null) {
-					m = entity.getClass().getMethod(methodName);
-					_entityMethodCache.put(methodUID, m);
-				}
-			}
-			Object obj = m.invoke(entity);
-			Double doubleValue = EntityQualifierUtils.convertObjToDouble(obj);
-			// if(doubleValue == Double.NaN) throw new IllegalArgumentException("Field "+field+": "+obj+" in expression "+expression+" is not number");
-			context.put(field,doubleValue);
-		}
-		return parser.eval(context);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ParsiiInvalidException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ParsiiInvalidException.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ParsiiInvalidException.java
deleted file mode 100755
index c674b8b..0000000
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ParsiiInvalidException.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package eagle.log.expression;
-
-/**
- * @since Nov 7, 2014
- */
-public class ParsiiInvalidException extends Exception{
-	
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Default constructor
-	 */
-	public ParsiiInvalidException() {
-		super();
-	}
-
-	/**
-	 * @param message
-	 * @param cause
-	 */
-	public ParsiiInvalidException(String message, Throwable cause) {
-		super(message, cause);
-	}
-
-	/**
-	 * @param message
-	 */
-	public ParsiiInvalidException(String message) {
-		super(message);
-	}
-
-	/**
-	 * @param cause
-	 */
-	public ParsiiInvalidException(Throwable cause) {
-		super(cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ParsiiUnknowVariableException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ParsiiUnknowVariableException.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ParsiiUnknowVariableException.java
deleted file mode 100755
index b8746ff..0000000
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/eagle/log/expression/ParsiiUnknowVariableException.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package eagle.log.expression;
-
-/**
- * @since Nov 7, 2014
- */
-public class ParsiiUnknowVariableException extends Exception{
-	
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Default constructor
-	 */
-	public ParsiiUnknowVariableException() {
-		super();
-	}
-
-	/**
-	 * @param message
-	 * @param cause
-	 */
-	public ParsiiUnknowVariableException(String message, Throwable cause) {
-		super(message, cause);
-	}
-
-	/**
-	 * @param message
-	 */
-	public ParsiiUnknowVariableException(String message) {
-		super(message);
-	}
-
-	/**
-	 * @param cause
-	 */
-	public ParsiiUnknowVariableException(Throwable cause) {
-		super(cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/EntityContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/EntityContext.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/EntityContext.java
new file mode 100644
index 0000000..17b3fdb
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/EntityContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.base.taggedlog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EntityContext {
+	private Map<String, Object> context;
+
+	public Map<String, Object> getContext() {
+		return context;
+	}
+	
+	public EntityContext() {
+		this.context = new HashMap<>();
+	}
+	
+	protected EntityContext(EntityContext context) {
+		this.context = new HashMap<>(context.context);
+	}
+	
+	public EntityContext cloneEntity() {
+		return new EntityContext(this);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/NoSuchRowException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/NoSuchRowException.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/NoSuchRowException.java
new file mode 100644
index 0000000..3304bea
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/NoSuchRowException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.base.taggedlog;
+
+public class NoSuchRowException extends RuntimeException{
+	static final long serialVersionUID = -4538233994503905943L;
+
+	public NoSuchRowException(){
+		super();
+	}
+	
+	public NoSuchRowException(String s){
+		super(s);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/RowkeyAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/RowkeyAPIEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/RowkeyAPIEntity.java
new file mode 100644
index 0000000..d72c35a
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/RowkeyAPIEntity.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.base.taggedlog;
+
+import java.util.Map;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlType;
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(propOrder = {"success", "exception", "prefixHashCode", "timestamp", "humanTime", "tagNameHashValueHashMap", "fieldNameValueMap"})
+public class RowkeyAPIEntity {
+	boolean success;
+	String exception;
+	int prefixHashCode;
+	long timestamp;
+	String humanTime;
+	Map<Integer, Integer> tagNameHashValueHashMap;
+	Map<String, String> fieldNameValueMap;
+	
+	public boolean isSuccess() {
+		return success;
+	}
+	public void setSuccess(boolean success) {
+		this.success = success;
+	}
+	public String getException() {
+		return exception;
+	}
+	public void setException(String exception) {
+		this.exception = exception;
+	}
+	public String getHumanTime() {
+		return humanTime;
+	}
+	public void setHumanTime(String humanTime) {
+		this.humanTime = humanTime;
+	}
+	public int getPrefixHashCode() {
+		return prefixHashCode;
+	}
+	public void setPrefixHashCode(int prefixHashcode) {
+		this.prefixHashCode = prefixHashcode;
+	}
+	public long getTimestamp() {
+		return timestamp;
+	}
+	public void setTimestamp(long timestamp) {
+		this.timestamp = timestamp;
+	}
+	public Map<Integer, Integer> getTagNameHashValueHashMap() {
+		return tagNameHashValueHashMap;
+	}
+	public void setTagNameHashValueHashMap(
+			Map<Integer, Integer> tagNameHashValueHashMap) {
+		this.tagNameHashValueHashMap = tagNameHashValueHashMap;
+	}
+	public Map<String, String> getFieldNameValueMap() {
+		return fieldNameValueMap;
+	}
+	public void setFieldNameValueMap(Map<String, String> fieldNameValueMap) {
+		this.fieldNameValueMap = fieldNameValueMap;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/TaggedLogAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/TaggedLogAPIEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/TaggedLogAPIEntity.java
new file mode 100755
index 0000000..b001955
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/TaggedLogAPIEntity.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.base.taggedlog;
+
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
+import java.beans.PropertyChangeSupport;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.annotate.JsonFilter;
+import org.codehaus.jackson.map.ser.BeanPropertyWriter;
+import org.codehaus.jackson.map.ser.FilterProvider;
+import org.codehaus.jackson.map.ser.impl.SimpleBeanPropertyFilter;
+import org.codehaus.jackson.map.ser.impl.SimpleFilterProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.common.DateTimeUtil;
+
+/**
+ * rowkey: prefix + timestamp + tagNameValues
+ * as of now, all tags will be persisted as a column in hbase table
+ * tag name is column qualifier name
+ * tag value is column value
+ */
+@JsonFilter(TaggedLogAPIEntity.PropertyBeanFilterName)
+public class TaggedLogAPIEntity implements PropertyChangeListener, Serializable{
+	private static final Logger LOG = LoggerFactory.getLogger(TaggedLogAPIEntity.class);
+	private String prefix;
+	private long timestamp;
+	private Map<String, String> tags;
+
+	public void setExp(Map<String, Object> exp) {
+		this.exp = exp;
+	}
+
+	public Map<String,Object> getExp() {
+		return this.exp;
+	}
+
+	/**
+	 * Extra dynamic attributes. 
+	 * TODO: can we move exp, _serializeAlias, _serializeVerbose to a wrapper class? 
+	 */
+	private Map<String,Object> exp;
+
+	private String encodedRowkey;
+	// track what qualifiers are changed
+	private Set<String> _modifiedProperties = new HashSet<String>();
+	protected PropertyChangeSupport _pcs = new PropertyChangeSupport(this);
+
+
+	public Map<String, String> getSerializeAlias() {
+		return _serializeAlias;
+	}
+
+	public void setSerializeAlias(Map<String, String> _serializeAlias) {
+		this._serializeAlias = _serializeAlias;
+	}
+
+	private Map<String,String> _serializeAlias = null;
+
+	public boolean isSerializeVerbose() {
+		return _serializeVerbose;
+	}
+
+	public void setSerializeVerbose(boolean _serializeVerbose) {
+		this._serializeVerbose = _serializeVerbose;
+	}
+
+	private boolean _serializeVerbose = true;
+
+	public TaggedLogAPIEntity(){
+		_pcs.addPropertyChangeListener(this);
+	}
+	
+	public String getPrefix() {
+		return prefix;
+	}
+	public void setPrefix(String prefix) {
+		this.prefix = prefix;
+	}
+	public long getTimestamp() {
+		return timestamp;
+	}
+	public void setTimestamp(long timestamp) {
+		this.timestamp = timestamp;
+	}
+	public Map<String, String> getTags() {
+		return tags;
+	}
+	public void setTags(Map<String, String> tags) {
+		this.tags = tags;
+	}
+	public String getEncodedRowkey() {
+		return encodedRowkey;
+	}
+	public void setEncodedRowkey(String encodedRowkey) {
+		this.encodedRowkey = encodedRowkey;
+	}
+	
+	protected void valueChanged(String fieldModified){
+		_pcs.firePropertyChange(fieldModified, null, null);
+	}
+	
+	public void propertyChange(PropertyChangeEvent evt) {
+		_modifiedProperties.add(evt.getPropertyName());
+	}
+	
+	public Set<String> modifiedQualifiers(){
+		return this._modifiedProperties;
+	}
+	
+	public String toString(){
+		StringBuffer sb = new StringBuffer();
+		sb.append("prefix:");
+		sb.append(prefix);
+		sb.append(", timestamp:");
+		sb.append(timestamp);
+		sb.append(", humanReadableDate:");
+		sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp));
+		sb.append(", tags: ");
+		if(tags != null){
+			for(Map.Entry<String, String> entry : tags.entrySet()){
+				sb.append(entry.toString());
+				sb.append(",");
+			}
+		}
+		sb.append(", encodedRowkey:");
+		sb.append(encodedRowkey);
+		return sb.toString();
+	}
+
+	private static Set<String> getPropertyNames(){
+		if(_propertyNames == null) {
+			Field[] fields = TaggedLogAPIEntity.class.getDeclaredFields();
+			Set<String> fieldName = new HashSet<String>();
+			for (Field f : fields) fieldName.add(f.getName());
+			_propertyNames = fieldName;
+		}
+		return _propertyNames;
+	}
+
+	private static class BeanPropertyFilter extends SimpleBeanPropertyFilter {
+		private final static String prefix = "prefix";
+		private final static String encodedRowkey = "encodedRowkey";
+		private final static String exp = "exp";
+		private final static String timestamp = "timestamp";
+		@SuppressWarnings("serial")
+		private final static Set<String> verboseFields = new HashSet<String>(){{
+			add(prefix);
+			add(encodedRowkey);
+		}};
+
+		@Override
+		public void serializeAsField(Object bean, JsonGenerator jgen, SerializerProvider provider, BeanPropertyWriter writer) throws Exception {
+			if(bean instanceof TaggedLogAPIEntity){
+				TaggedLogAPIEntity entity = (TaggedLogAPIEntity) bean;
+				Set<String> modified = entity.modifiedQualifiers();
+				Set<String> basePropertyNames = getPropertyNames();
+				String writerName = writer.getName();
+				if(modified.contains(writerName) || basePropertyNames.contains(writerName)){
+					if((!entity.isSerializeVerbose() && verboseFields.contains(writerName))||							// skip verbose fields
+							(timestamp.equals(writerName) && !EntityDefinitionManager.isTimeSeries(entity.getClass()))	// skip timestamp for non-timeseries entity
+					){
+						// log skip
+						if(LOG.isDebugEnabled()) LOG.debug("skip field");
+					}else{
+						// if serializeAlias is not null and exp is not null
+						if (exp.equals(writerName) && entity.getSerializeAlias()!=null && entity.getExp()!=null) {
+							Map<String, Object> _exp = new HashMap<String, Object>();
+							for (Map.Entry<String, Object> entry : entity.getExp().entrySet()) {
+								String alias = entity.getSerializeAlias().get(entry.getKey());
+								if (alias != null) {
+									_exp.put(alias, entry.getValue());
+								} else {
+									_exp.put(entry.getKey(), entry.getValue());
+								}
+							}
+							entity.setExp(_exp);
+						}
+						// write included field into serialized json output
+						writer.serializeAsField(bean, jgen, provider);
+					}
+				}
+			}else{
+				writer.serializeAsField(bean, jgen, provider);
+			}
+		}
+	}
+
+	public static FilterProvider getFilterProvider(){
+		if(_filterProvider == null){
+			SimpleFilterProvider _provider = new SimpleFilterProvider();
+			_provider.addFilter(PropertyBeanFilterName,new BeanPropertyFilter());
+			_filterProvider = _provider;
+		}
+		return _filterProvider;
+	}
+
+	//////////////////////////////////////
+	// Static fields
+	//////////////////////////////////////
+	private static Set<String> _propertyNames = null;
+	private static FilterProvider _filterProvider = null;
+	final static String PropertyBeanFilterName = "TaggedLogPropertyBeanFilter";
+
+    public static ObjectMapper buildObjectMapper(){
+        final JsonFactory factory = new JsonFactory();
+        final ObjectMapper mapper = new ObjectMapper(factory);
+        mapper.setFilters(TaggedLogAPIEntity.getFilterProvider());
+        return mapper;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/TaggedLogObjectMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/TaggedLogObjectMapper.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/TaggedLogObjectMapper.java
new file mode 100644
index 0000000..1df1c0d
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/TaggedLogObjectMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.base.taggedlog;
+
+import java.util.Map;
+
+public interface TaggedLogObjectMapper {
+	/**
+	 * when read, business logic should convert schema-less key/value into business object based on its own schema
+	 * @param entity
+	 * @param qualifierValues
+	 */
+	public void populateQualifierValues(TaggedLogAPIEntity entity, Map<String, byte[]> qualifierValues);
+	
+	/**
+	 * when write, business logic should convert business object to schema-less key value
+	 * @param entity
+	 * @return
+	 */
+	public Map<String, byte[]> createQualifierValues(TaggedLogAPIEntity entity);	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/AbstractHBaseLogReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/AbstractHBaseLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/AbstractHBaseLogReader.java
new file mode 100755
index 0000000..916706f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/AbstractHBaseLogReader.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.common.ByteUtil;
+import org.apache.eagle.common.EagleBase64Wrapper;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HBase Log Reader basic initialization:
+ * <ol>
+ *   <li>Open HBase connection to target HBase table</li>
+ *   <li>Generate HBase filter,start and stop row key, output qualifier and Scan </li>
+ *   <li><code>onOpen(HTableInterface,Scan)</code>: Callback abstract method </li>
+ *   <li><code>close</code>: Close HBase connection</li>
+ * </ol>
+ *
+ * @param <T> Reader entity class type
+ *
+ */
+public abstract class AbstractHBaseLogReader<T> implements LogReader<T> {
+	private static Logger LOG = LoggerFactory.getLogger(AbstractHBaseLogReader.class);
+
+	protected byte[][] qualifiers;
+	private HTableInterface tbl;
+	private byte[] startKey;
+	private byte[] stopKey;
+	protected Map<String, List<String>> searchTags;
+	private Filter filter;
+	private Date startTime;
+	private Date endTime;
+
+//	protected ResultScanner rs;
+	private boolean isOpen = false;
+
+	/**
+	 * TODO it's ugly that both _ed and prefix fields can hold prefix information,
+	 * prefix field should be in precedence over _ed
+	 */
+	private String _prefix;
+	protected EntityDefinition _ed;
+
+	public AbstractHBaseLogReader(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
+	                              Filter filter, String lastScanKey, byte[][] outputQualifiers){
+		this(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, null);
+	}
+	/**
+	 * This constructor supports partition.
+	 *
+	 * @param ed entity definition
+	 * @param partitions partition values, which is sorted in partition definition order. TODO: in future we need to support
+	 * multiple values for one partition field
+	 * @param startTime start time of the query
+	 * @param endTime end time of the query
+	 * @param filter filter for the hbase scan
+	 * @param lastScanKey the key of last scan
+	 * @param outputQualifiers the bytes of output qualifier names
+	 * @param prefix can be populated from outside world specifically for generic metric reader
+	 */
+	public AbstractHBaseLogReader(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
+	                              Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix){
+		this.startTime = startTime;
+		this.endTime = endTime;
+		this._ed = ed;
+		if (_ed.getPartitions() != null) {
+			if (partitions == null || _ed.getPartitions().length != partitions.size()) {
+				throw new IllegalArgumentException("Invalid argument. Entity " + ed.getClass().getSimpleName() + " defined "
+						+ "partitions, but argument partitions is null or number of partition values are different!");
+			}
+		}
+		/**
+		 * decide prefix field value
+		 */
+		if(prefix == null || prefix.isEmpty()){
+			this._prefix = _ed.getPrefix();
+		}else{
+			this._prefix = prefix;
+		}
+		this.qualifiers = outputQualifiers;
+		this.filter = filter;
+
+		this.startKey = buildRowKey(this._prefix, partitions, startTime);
+		
+		
+		/**
+		 * startTime should be inclusive, -128 is max value for hbase Bytes comparison, see PureJavaComparer.compareTo
+		 * as an alternative, we can use startTime-1000 and endTime-1000 to make sure startTime is inclusive and endTime is exclusive
+		 */
+		this.startKey = ByteUtil.concat(this.startKey, new byte[] {-1, -1,-1,-1});
+		if (lastScanKey == null) {
+			this.stopKey = buildRowKey(this._prefix, partitions, endTime);
+			// endTime should be exclusive
+			this.stopKey = ByteUtil.concat(this.stopKey, new byte[] {-1,-1,-1,-1,-1});
+		} else {
+			// build stop key
+			this.stopKey = EagleBase64Wrapper.decode(lastScanKey);
+			// TODO to-be-fixed, probably it's an issue because contacting 1 is not
+			// enough for lexicographical sorting
+			this.stopKey = ByteUtil.concat(this.stopKey, new byte[] { 1 });
+		}
+	}
+	
+	/**
+	 * TODO If the required field is null for a row, then this row will not be fetched. That could be a problem for counting
+	 * Need another version of read to strictly get the number of rows which will return all the columns for a column family
+	 */
+	@Override
+	public void open() throws IOException {
+		if (isOpen)
+			return; // silently return
+		try {
+			tbl = EagleConfigFactory.load().getHTable(_ed.getTable());
+		} catch (RuntimeException ex) {
+			throw new IOException(ex);
+		}
+
+		Scan s1 = new Scan();
+		// reverse timestamp, startRow is stopKey, and stopRow is startKey
+		s1.setStartRow(stopKey);
+		s1.setStopRow(startKey);
+		s1.setFilter(filter);
+		// TODO the # of cached rows should be minimum of (pagesize and 100)
+		int cs = EagleConfigFactory.load().getHBaseClientScanCacheSize();
+		s1.setCaching(cs);
+		// TODO not optimized for all applications
+		s1.setCacheBlocks(true)
+		;
+		// scan specified columnfamily and qualifiers
+		if(this.qualifiers == null) {
+			// Filter all
+			s1.addFamily(_ed.getColumnFamily().getBytes());
+		}else{
+			for (byte[] qualifier : qualifiers) {
+				s1.addColumn(_ed.getColumnFamily().getBytes(), qualifier);
+			}
+		}
+		// TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. More graceful implementation should use SingleColumnValueExcludeFilter, 
+		// but it's complicated in current implementation. 
+		workaroundHBASE2198(s1, filter);
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(s1.toString());
+		}
+//		rs = tbl.getScanner(s1);
+		this.onOpen(tbl,s1);
+		isOpen = true;
+	}
+
+	/**
+	 * HBase table connection callback function
+	 *
+	 * @param tbl   HBase table connection
+	 * @param scan  HBase scan
+	 * @throws IOException
+	 */
+	protected abstract void onOpen(HTableInterface tbl,Scan scan) throws IOException;
+
+	/**
+	 * <h2>History</h2>
+	 * <ul>
+	 * 	<li><b>Nov 19th, 2014</b>: Fix for out put all qualifiers</li>
+	 * </ul>
+	 * @param s1
+	 * @param filter
+	 */
+	protected void workaroundHBASE2198(Scan s1, Filter filter) {
+		if (filter instanceof SingleColumnValueFilter) {
+			if(this.qualifiers == null){
+				s1.addFamily(((SingleColumnValueFilter) filter).getFamily());
+			}else {
+				s1.addColumn(((SingleColumnValueFilter) filter).getFamily(), ((SingleColumnValueFilter) filter).getQualifier());
+			}
+			return;
+		}
+		if (filter instanceof FilterList) {
+			for (Filter f : ((FilterList)filter).getFilters()) {
+				workaroundHBASE2198(s1, f);
+			}
+		}
+	}
+
+	/**
+	 * <h2>Close:</h2>
+	 * 1. release current table connection
+	 *
+	 * @throws IOException
+	 */
+	@Override
+	public void close() throws IOException {
+		if(tbl != null){
+			new HTableFactory().releaseHTableInterface(tbl);
+		}
+//		if(rs != null){
+//			rs.close();
+//		}
+	}
+
+	private static byte[] buildRowKey(String prefix, List<String> partitions, Date t){
+		final int length = (partitions == null) ? (4 + 8) : (4 + 8 + partitions.size() * 4);
+		final byte[] key = new byte[length];
+		int offset = 0;
+		ByteUtil.intToBytes(prefix.hashCode(), key, offset);
+		offset += 4;
+		if (partitions != null) {
+			for (String partition : partitions) {
+				ByteUtil.intToBytes(partition.hashCode(), key, offset);
+				offset += 4;
+			}
+		}
+		// reverse timestamp
+		long ts = Long.MAX_VALUE - t.getTime();
+		ByteUtil.longToBytes(ts, key, offset);
+		return key;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/BaseEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/BaseEntityRepository.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/BaseEntityRepository.java
new file mode 100755
index 0000000..71253da
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/BaseEntityRepository.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class BaseEntityRepository  extends EntityRepository {
+
+	public BaseEntityRepository() {
+		entitySet.add(GenericMetricEntity.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityCreationListener.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityCreationListener.java
new file mode 100644
index 0000000..4ad8959
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityCreationListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+/**
+ * Interface to notify creation event of an entity 
+ */
+public interface EntityCreationListener {
+	public void entityCreated(TaggedLogAPIEntity entity) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityQualifierUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityQualifierUtils.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityQualifierUtils.java
new file mode 100755
index 0000000..52e7202
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityQualifierUtils.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
+import org.apache.eagle.log.entity.meta.Qualifier;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.hbase.KeyValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class EntityQualifierUtils {
+	private final static Logger LOG = LoggerFactory.getLogger(EntityQualifierUtils.class);
+
+	public static Map<String,Object> keyValuesToMap(List<KeyValue> row,EntityDefinition ed){
+		Map<String,Object> result = new HashMap<String,Object>();
+		for(KeyValue kv:row){
+			String qualifierName = new String(kv.getQualifier());
+			if(!ed.isTag(qualifierName)){
+				Qualifier qualifier = ed.getDisplayNameMap().get(qualifierName);
+				if(qualifier == null){
+					qualifier = ed.getQualifierNameMap().get(qualifierName);
+				}
+				qualifierName = qualifier.getDisplayName();
+				Object value = qualifier.getSerDeser().deserialize(kv.getValue());
+				result.put(qualifierName,value);
+			}else{
+				result.put(qualifierName,new String(kv.getValue()));
+			}
+		}
+		return result;
+	}
+
+	public static Map<String,Double> keyValuesToDoubleMap(List<KeyValue> row,EntityDefinition ed){
+		Map<String,Double> result = new HashMap<String,Double>();
+		for(KeyValue kv:row){
+			String qualifierName = new String(kv.getQualifier());
+			if(!ed.isTag(qualifierName)){
+				Qualifier qualifier = ed.getDisplayNameMap().get(qualifierName);
+				if(qualifier == null){
+					qualifier = ed.getQualifierNameMap().get(qualifierName);
+				}
+				qualifierName = qualifier.getDisplayName();
+				Object value = qualifier.getSerDeser().deserialize(kv.getValue());
+				result.put(qualifierName,convertObjToDouble(value));
+			}else{
+				result.put(qualifierName,Double.NaN);
+			}
+		}
+		return result;
+	}
+
+	/**
+	 * Map[Display Name,Double Value]
+	 *
+	 * @param map
+	 * @param ed
+	 * @return
+	 */
+	public static Map<String,Double> bytesMapToDoubleMap(Map<String,byte[]> map,EntityDefinition ed){
+		Map<String,Double> result = new HashMap<String,Double>();
+		for(Map.Entry<String,byte[]> entry:map.entrySet()){
+			String qualifierName = entry.getKey();
+			Qualifier qualifier = ed.getDisplayNameMap().get(qualifierName);
+			if(qualifier == null) qualifier = ed.getQualifierNameMap().get(qualifierName);
+			if(qualifier!=null && entry.getValue()!=null) {
+				qualifierName = qualifier.getDisplayName();
+				Object value = qualifier.getSerDeser().deserialize(entry.getValue());
+				result.put(qualifierName, convertObjToDouble(value));
+			}else{
+				result.put(qualifierName,null);
+			}
+		}
+		return result;
+	}
+
+	public static byte[] toBytes(EntityDefinition ed, String qualifierName, String qualifierValueInStr){
+		// Get field type from entity class
+		// and skip for not-found fields query expression
+		Object typedValue = null;
+		EntitySerDeser serDeser = null;
+		if(ed.isTag(qualifierName)){
+			typedValue = qualifierValueInStr;
+			serDeser = EntityDefinitionManager.getSerDeser(String.class);
+		}else{
+			try{
+				Field field = ed.getEntityClass().getDeclaredField(qualifierName);
+				Class<?> fieldType = field.getType();
+				serDeser =  EntityDefinitionManager.getSerDeser(fieldType);
+				if(serDeser == null){
+					throw new IllegalArgumentException("Can't find EntitySerDeser for field: "+ qualifierName +"'s type: "+fieldType
+							+", so the field is not supported to be filtered yet");
+				}
+				typedValue = convertStringToObject(qualifierValueInStr, fieldType);
+			} catch (NoSuchFieldException ex) {
+				// Handle the field not found exception in caller
+				LOG.error("Field " + qualifierName + " not found in " + ed.getEntityClass());
+				throw new IllegalArgumentException("Field "+qualifierName+" not found in "+ed.getEntityClass(),ex);
+			}
+		}
+		return serDeser.serialize(typedValue);
+	}
+
+	public static Class<?> getType(EntityDefinition ed, String qualifierName) {
+		Field field;
+		try {
+			field = ed.getEntityClass().getDeclaredField(qualifierName);
+		} catch (NoSuchFieldException e) {
+			if(LOG.isDebugEnabled()) LOG.debug("Field "+qualifierName+" not found in "+ed.getEntityClass());
+			return null;
+		}
+		return field.getType();
+	}
+
+	/**
+	 * Not support negative numeric value:
+	 * - http://en.wikipedia.org/wiki/Double-precision_floating-point_format
+	 *
+	 * @param value
+	 * @param type
+	 * @return
+	 */
+	public static Object convertStringToObject(String value, Class<?> type){
+		Object obj = null;
+		try{
+			if(String.class.equals(type)){
+				obj =  value;
+			}if(Long.class.equals(type) || long.class.equals(type)){
+				obj = Long.parseLong(value);
+				// if((Long) obj < 0) throw new IllegalArgumentException("Don't support negative Long yet: "+obj);
+			}else if(Integer.class.equals(type) || int.class.equals(type)){
+				obj = Integer.parseInt(value);
+				// if((Integer) obj < 0) throw new IllegalArgumentException("Don't support negative Integer yet: "+obj);
+			}else if(Double.class.equals(type) || double.class.equals(type)){
+				obj = Double.parseDouble(value);
+				// if((Double) obj < 0) throw new IllegalArgumentException("Don't support negative Double yet: "+obj);
+			}else if(Float.class.equals(type) || float.class.equals(type)){
+				obj = Float.parseFloat(value);
+				// if((Double) obj < 0) throw new IllegalArgumentException("Don't support negative Float yet: "+obj);
+			}
+			if(obj != null) return obj;
+		}catch (NumberFormatException ex){
+			throw new IllegalArgumentException("Fail to convert string: "+value +" into type of "+type,ex);
+		}
+
+		throw new IllegalArgumentException("Fail to convert string: "+value +" into type of "+type+", illegal type: "+type);
+	}
+
+	/**
+	 *
+	 * @param obj
+	 * @return double value, otherwise Double.NaN
+	 */
+	public static double convertObjToDouble(Object obj){
+		if(Long.class.equals(obj.getClass()) || long.class.equals(obj.getClass())){
+			Long _value = (Long) obj;
+			return _value.doubleValue();
+		}else if(Integer.class.equals(obj.getClass()) || int.class.equals(obj.getClass())){
+			Integer _value = (Integer) obj;
+			return _value.doubleValue();
+		}else if(Double.class.equals(obj.getClass()) || double.class.equals(obj.getClass())) {
+			return (Double) obj;
+		}else if(Float.class.equals(obj.getClass()) || float.class.equals(obj.getClass())) {
+			Float _value = (Float) obj;
+			return _value.doubleValue();
+		}else if(Short.class.equals(obj.getClass()) || short.class.equals(obj.getClass())) {
+			Float _value = (Float) obj;
+			return _value.doubleValue();
+		}else if(Byte.class.equals(obj.getClass()) || byte.class.equals(obj.getClass())) {
+			Byte _value = (Byte) obj;
+			return _value.doubleValue();
+		}
+		LOG.warn("Failed to convert object " + obj.toString() + " in type of " + obj.getClass() + " to double");
+		return Double.NaN;
+	}
+
+	/**
+	 * Parse List String as Set without duplicate items
+	 *
+	 * <br></br>
+	 * Support:
+	 * <ul>
+	 * <li>normal string: ("a","b") => ["a","b"] </li>
+	 * <li>number: (1.5,"b") => [1.5,"b"] </li>
+	 * <li>inner string comma: ("va,lue","value",",") => ["va,lue","value",","]</li>
+	 * <li>inner escaped chars: ("va\"lue","value") => ["va\"lue","value"]</li>
+	 * <li>some bad formats list: ("va"lue","value") => ["va\"lue","value"]</li>
+	 * </ul>
+	 *
+	 * <b>Warning:</b> it will not throw exception if the format is not strictly valid
+	 *
+	 * @param listValue in format (item1,item2,...)
+	 * @return
+	 */
+	public static List<String> parseList(String listValue){
+		Matcher matcher = SET_PATTERN.matcher(listValue);
+		if(matcher.find()){
+			String content = matcher.group(1);
+			List<String> result = new ArrayList<String>();
+			StringBuilder str = null;
+			STATE state = null;
+			char last = 0;
+			for(char c: content.toCharArray()){
+				if(str == null) str = new StringBuilder();
+				if(c == DOUBLE_QUOTE && last != SLASH){
+					// Open or Close String
+					if(state == STATE.STRING)
+						state = null;
+					else state = STATE.STRING;
+				}else if(c == COMMA && state != STATE.STRING){
+					result.add(unescape(str.toString()));
+					str = null;
+					last = c;
+					continue;
+				}
+				last = c;
+				str.append(c);
+			}
+			if(str!=null) result.add(unescape(str.toString()));
+			return result;
+		}else{
+			LOG.error("Invalid list value: " + listValue);
+			throw new IllegalArgumentException("Invalid format of list value: "+listValue+", must be in format: (item1,item2,...)");
+		}
+	}
+
+	private static String unescape(String str){
+		int start=0,end = str.length();
+		if(str.startsWith("\"")) start = start +1;
+		if(str.endsWith("\"")) end = end -1;
+		str = str.substring(start,end);
+		return StringEscapeUtils.unescapeJava(str);
+	}
+
+	private final static Pattern SET_PATTERN = Pattern.compile("^\\((.*)\\)$");
+	private final static char COMMA = ',';
+	private final static char DOUBLE_QUOTE = '"';
+	private final static char SLASH = '\\';
+	private static enum STATE{ STRING }
+
+
+
+//  TODO: NOT FINISHED
+//  private final static Map<String,String> ESCAPE_REGEXP=new HashMap<String,String>(){{
+//			this.put("\\.","\\\\.");
+//	}};
+//
+//	public static String escapeRegExp(String value) {
+//		String _value = value;
+//		for(Map.Entry<String,String> entry:ESCAPE_REGEXP.entrySet()){
+//			_value = _value.replace(entry.getKey(),entry.getValue());
+//		}
+//		return _value;
+//	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityUniq.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityUniq.java
new file mode 100755
index 0000000..36e1e0b
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityUniq.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.log.entity;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+/**
+ * @since Sep 12, 2014
+ */
+public class EntityUniq {
+	
+	public Map<String, String> tags;
+	public Long timestamp;
+	public long createdTime; // for cache removal;
+	
+	public EntityUniq(Map<String, String> tags, long timestamp) {
+		this.tags = new HashMap<String, String>(tags);
+		this.timestamp = timestamp;
+		this.createdTime = System.currentTimeMillis();
+	}
+	
+	@Override	
+	public boolean equals(Object obj) {		
+		if (obj instanceof EntityUniq) {
+			EntityUniq au = (EntityUniq) obj;
+			if (tags.size() != au.tags.size()) return false;
+			for (Entry<String, String> keyValue : au.tags.entrySet()) {
+				boolean keyExist = tags.containsKey(keyValue.getKey());
+				if ( !keyExist || !tags.get(keyValue.getKey()).equals(keyValue.getValue())) {				
+					return false;
+				}
+			}
+			if (!timestamp.equals(au.timestamp)) return false;
+			return true;
+		}
+		return false;
+	}
+	
+	@Override
+	public int hashCode() {	
+		int hashCode = 0;
+		for (String value : tags.values()) {
+			hashCode ^= value.hashCode();	
+		}
+		return hashCode ^= timestamp.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericCreateAPIResponseEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericCreateAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericCreateAPIResponseEntity.java
new file mode 100644
index 0000000..6ffa621
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericCreateAPIResponseEntity.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlType;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(propOrder = {"success", "exception", "encodedRowkeys"})
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+public class GenericCreateAPIResponseEntity{
+	private boolean success;
+	private String exception;
+	private List<String> encodedRowkeys;
+	
+	public List<String> getEncodedRowkeys() {
+		return encodedRowkeys;
+	}
+	public void setEncodedRowkeys(List<String> encodedRowkeys) {
+		this.encodedRowkeys = encodedRowkeys;
+	}
+	public boolean isSuccess() {
+		return success;
+	}
+	public void setSuccess(boolean success) {
+		this.success = success;
+	}
+	public String getException() {
+		return exception;
+	}
+	public void setException(String exception) {
+		this.exception = exception;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityBatchReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityBatchReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityBatchReader.java
new file mode 100755
index 0000000..9c42ab2
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityBatchReader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class GenericEntityBatchReader implements EntityCreationListener{
+	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class);
+	
+	private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+	private StreamReader reader;
+	
+	public GenericEntityBatchReader(String serviceName, SearchCondition condition) throws InstantiationException, IllegalAccessException{
+		reader = new GenericEntityStreamReader(serviceName, condition);
+		reader.register(this);
+	}
+	
+	public GenericEntityBatchReader(StreamReader reader) throws InstantiationException, IllegalAccessException{
+		this.reader = reader;
+		reader.register(this);
+	}
+	
+	public long getLastTimestamp() {
+		return reader.getLastTimestamp();
+	}
+	public long getFirstTimestamp(){ return reader.getFirstTimestamp();}
+	
+	@Override
+	public void entityCreated(TaggedLogAPIEntity entity){
+		entities.add(entity);
+	}
+	
+	@SuppressWarnings("unchecked")
+	public <T> List<T> read() throws Exception{
+		if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch mode");
+		reader.readAsStream();
+		return (List<T>)entities;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityScanStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityScanStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityScanStreamReader.java
new file mode 100755
index 0000000..3f97e78
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityScanStreamReader.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.EntityConstants;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.common.DateTimeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+
+public class GenericEntityScanStreamReader extends StreamReader {
+	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityScanStreamReader.class);
+	
+	private EntityDefinition entityDef;
+	private SearchCondition condition;
+	private String prefix;
+	private long lastTimestamp = 0;
+	private long firstTimestamp = 0;
+	
+	public GenericEntityScanStreamReader(String serviceName, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
+		this.prefix = prefix;
+		checkNotNull(serviceName, "serviceName");
+		this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+		checkNotNull(entityDef, "EntityDefinition");
+		this.condition = condition;
+	}
+
+	public GenericEntityScanStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
+		this.prefix = prefix;
+		checkNotNull(entityDef, "entityDef");
+		this.entityDef = entityDef;
+		checkNotNull(entityDef, "EntityDefinition");
+		this.condition = condition;
+	}
+	
+	public long getLastTimestamp() {
+		return lastTimestamp;
+	}
+	
+	private void checkNotNull(Object o, String message){
+		if(o == null){
+			throw new IllegalArgumentException(message + " should not be null");
+		}
+	}
+
+	public EntityDefinition getEntityDefinition() {
+		return entityDef;
+	}
+
+	public SearchCondition getSearchCondition() {
+		return condition;
+	}
+
+	@Override
+	public void readAsStream() throws Exception{
+		Date start = null;
+		Date end = null;
+		// shortcut to avoid read when pageSize=0
+		if(condition.getPageSize() <= 0){
+			return; // return nothing
+		}
+		// Process the time range if needed
+		if(entityDef.isTimeSeries()){
+			start = DateTimeUtil.humanDateToDate(condition.getStartTime());
+			end = DateTimeUtil.humanDateToDate(condition.getEndTime());
+		}else{
+			start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME);
+			end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME);
+		}
+		byte[][] outputQualifiers = null;
+		if(!condition.isOutputAll()) {
+			// Generate the output qualifiers
+			outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields());
+		}
+		HBaseLogReader2 reader = new HBaseLogReader2(entityDef, condition.getPartitionValues(), start, end, condition.getFilter(), condition.getStartRowkey(), outputQualifiers, this.prefix);
+		try{
+			reader.open();
+			InternalLog log;
+			int count = 0;
+			while ((log = reader.read()) != null) {
+				TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDef);
+				if (lastTimestamp < entity.getTimestamp()) {
+					lastTimestamp = entity.getTimestamp();
+				}
+				if(firstTimestamp > entity.getTimestamp() || firstTimestamp == 0){
+					firstTimestamp = entity.getTimestamp();
+				}
+
+				entity.setSerializeVerbose(condition.isOutputVerbose());
+				entity.setSerializeAlias(condition.getOutputAlias());
+
+				for(EntityCreationListener l : _listeners){
+					l.entityCreated(entity);
+				}
+				if(++count == condition.getPageSize())
+					break;
+			}
+		}catch(IOException ioe){
+			LOG.error("Fail reading log", ioe);
+			throw ioe;
+		}finally{
+			reader.close();
+		}		
+	}
+
+	@Override
+	public long getFirstTimestamp() {
+		return this.firstTimestamp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
new file mode 100755
index 0000000..c3d916e
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.entity.index.NonClusteredIndexStreamReader;
+import org.apache.eagle.log.entity.index.UniqueIndexStreamReader;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.query.parser.ORExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class GenericEntityStreamReader extends StreamReader {
+	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReader.class);
+	
+	private EntityDefinition entityDef;
+	private SearchCondition condition;
+	private String prefix;
+	private StreamReader readerAfterPlan;
+
+	public GenericEntityStreamReader(String serviceName, SearchCondition condition) throws InstantiationException, IllegalAccessException{
+		this(serviceName, condition, null);
+	}
+
+	public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition) throws InstantiationException, IllegalAccessException{
+		this(entityDef, condition, entityDef.getPrefix());
+	}
+	
+	public GenericEntityStreamReader(String serviceName, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
+		this.prefix = prefix;
+		checkNotNull(serviceName, "serviceName");
+		this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+		checkNotNull(entityDef, "EntityDefinition");
+		this.condition = condition;
+		this.readerAfterPlan = selectQueryReader();
+	}
+
+	public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
+		this.prefix = prefix;
+		checkNotNull(entityDef, "entityDef");
+		this.entityDef = entityDef;
+		checkNotNull(entityDef, "EntityDefinition");
+		this.condition = condition;
+		this.readerAfterPlan = selectQueryReader();
+	}
+
+	private void checkNotNull(Object o, String message){
+		if(o == null){
+			throw new IllegalArgumentException(message + " should not be null");
+		}
+	}
+	
+	public EntityDefinition getEntityDefinition() {
+		return entityDef;
+	}
+	
+	public SearchCondition getSearchCondition() {
+		return condition;
+	}
+	
+	@Override
+	public void readAsStream() throws Exception{
+		readerAfterPlan._listeners.addAll(this._listeners);
+		readerAfterPlan.readAsStream();
+	}
+	
+	private StreamReader selectQueryReader() throws InstantiationException, IllegalAccessException {
+		final ORExpression query = condition.getQueryExpression();
+		IndexDefinition[] indexDefs = entityDef.getIndexes();
+
+        // Index just works with query condition
+		if (indexDefs != null && condition.getQueryExpression()!=null) {
+			List<byte[]> rowkeys = new ArrayList<>();
+			for (IndexDefinition index : indexDefs) {
+				// Check unique index first
+				if (index.isUnique()) {
+					final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
+					if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
+						LOG.info("Selectd query unique index " + index.getIndexName() + " for query: " + condition.getQueryExpression());
+						return new UniqueIndexStreamReader(index, condition, rowkeys);
+					}
+				}
+			}
+			for (IndexDefinition index : indexDefs) {
+				// Check non-clustered index
+				if (!index.isUnique()) {
+					final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
+					if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
+						LOG.info("Selectd query non clustered index " + index.getIndexName() + " for query: " + condition.getQueryExpression().toString());
+						return new NonClusteredIndexStreamReader(index, condition, rowkeys);
+					}
+				}
+			}
+		}
+		return new GenericEntityScanStreamReader(entityDef, condition, this.prefix);
+	}
+
+	@Override
+	public long getLastTimestamp() {
+		return readerAfterPlan.getLastTimestamp();
+	}
+
+	@Override
+	public long getFirstTimestamp() {
+		return readerAfterPlan.getFirstTimestamp();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
new file mode 100755
index 0000000..1946d6c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.common.DateTimeUtil;
+
+/**
+ * multi-threading stream readers which only applies to time-series entity where we split the query into
+ * different time range
+ * 
+ * When this class is used together with list query or aggregate query, be aware that the query's behavior could
+ * be changed for example pageSize does not work well, output sequence is not determined
+ */
+public class GenericEntityStreamReaderMT extends StreamReader{
+	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReaderMT.class);
+	private List<GenericEntityStreamReader> readers = new ArrayList<GenericEntityStreamReader>(); 
+	
+	public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition, int numThreads) throws Exception{
+		checkIsTimeSeries(serviceName);
+		checkNumThreads(numThreads);
+		long queryStartTime = DateTimeUtil.humanDateToSeconds(condition.getStartTime())*1000;
+		long queryEndTime = DateTimeUtil.humanDateToSeconds(condition.getEndTime())*1000;
+		long subStartTime = queryStartTime;
+		long subEndTime = 0;
+		long interval = (queryEndTime-queryStartTime) / numThreads;
+		for(int i=0; i<numThreads; i++){
+			// split search condition by time range
+			subStartTime = queryStartTime + i*interval;
+			if(i == numThreads-1){
+				subEndTime = queryEndTime;
+			}else{
+				subEndTime = subStartTime + interval;
+			}
+			String strStartTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime);
+			String strEndTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime);
+			SearchCondition sc = new SearchCondition(condition);
+			sc.setStartTime(strStartTime);
+			sc.setEndTime(strEndTime);
+			GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, sc);
+			readers.add(reader);
+		}
+	}
+	
+	private void checkIsTimeSeries(String serviceName) throws Exception{
+		EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+		if(!ed.isTimeSeries()){
+			throw new IllegalArgumentException("Multi-threading stream reader must be applied to time series table");
+		}
+	}
+	
+	private void checkNumThreads(int numThreads){
+		if(numThreads <= 0){
+			throw new IllegalArgumentException("Multi-threading stream reader must have numThreads >= 1");
+		}
+	}
+	
+	/**
+	 * default to 2 threads
+	 * @param serviceName
+	 * @param condition
+	 */
+	public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition) throws Exception{
+		this(serviceName, condition, 2);
+	}
+	
+	@Override
+	public void readAsStream() throws Exception{
+		// populate listeners to all readers
+		for(EntityCreationListener l : _listeners){
+			for(GenericEntityStreamReader r : readers){
+				r.register(l);
+			}
+		}
+
+		List<Future<Void>> futures = new ArrayList<Future<Void>>();
+		for(GenericEntityStreamReader r : readers){
+			SingleReader reader = new SingleReader(r);
+			Future<Void> readFuture = EagleConfigFactory.load().getExecutor().submit(reader);
+			futures.add(readFuture);
+		}
+		
+		// join threads and check exceptions
+		for(Future<Void> future : futures){
+			try{
+				future.get();
+			}catch(Exception ex){
+				LOG.error("Error in read", ex);
+				throw ex;
+			}
+		}
+	}
+	
+	private static class SingleReader implements Callable<Void>{
+		private GenericEntityStreamReader reader;
+		public SingleReader(GenericEntityStreamReader reader){
+			this.reader = reader;
+		}
+		@Override
+		public Void call() throws Exception{
+			reader.readAsStream();
+			return null;
+		}
+	}
+
+	@Override
+	public long getLastTimestamp() {
+		long lastTimestamp = 0;
+		for (GenericEntityStreamReader reader : readers) {
+			if (lastTimestamp < reader.getLastTimestamp()) {
+				lastTimestamp = reader.getLastTimestamp();
+			}
+		}
+		return lastTimestamp;
+	}
+
+	@Override
+	public long getFirstTimestamp() {
+		long firstTimestamp = 0;
+		for (GenericEntityStreamReader reader : readers) {
+			if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp == 0) {
+				firstTimestamp = reader.getLastTimestamp();
+			}
+		}
+		return firstTimestamp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
new file mode 100755
index 0000000..5c8b12d
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.common.EagleBase64Wrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class GenericEntityWriter {
+	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityWriter.class);
+	private EntityDefinition entityDef;
+
+	public GenericEntityWriter(String serviceName) throws InstantiationException, IllegalAccessException{
+		this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+		checkNotNull(entityDef, "serviceName");
+	}
+
+	public GenericEntityWriter(EntityDefinition entityDef) throws InstantiationException, IllegalAccessException{
+		this.entityDef = entityDef;
+		checkNotNull(entityDef, "serviceName");
+	}
+	
+	private void checkNotNull(Object o, String message) {
+		if(o == null){
+			throw new IllegalArgumentException(message + " should not be null");
+		}
+	}
+
+	/**
+	 * @param entities
+	 * @return row keys
+	 * @throws Exception
+	 */
+	public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws Exception{
+		HBaseLogWriter writer = new HBaseLogWriter(entityDef.getTable(), entityDef.getColumnFamily());
+		List<String> rowkeys = new ArrayList<String>(entities.size());
+		List<InternalLog> logs = new ArrayList<InternalLog>(entities.size());
+		
+		try{
+			writer.open();
+			for(TaggedLogAPIEntity entity : entities){
+				final InternalLog entityLog = HBaseInternalLogHelper.convertToInternalLog(entity, entityDef);
+				logs.add(entityLog);
+			}
+			List<byte[]> bRowkeys  = writer.write(logs);
+			for (byte[] rowkey : bRowkeys) {
+				rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
+			}
+
+		}catch(Exception ex){
+			LOG.error("fail writing tagged log", ex);
+			throw ex;
+		}finally{
+			writer.close();
+	 	}
+		return rowkeys;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
new file mode 100755
index 0000000..a9a8c75
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.ServicePath;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.entity.meta.Metric;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+/**
+ * GenericMetricEntity should use prefix field which is extended from TaggedLogAPIEntity as metric name
+ * metric name is used to partition the metric tables
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagle_metric")
+@ColumnFamily("f")
+@Prefix(GenericMetricEntity.GENERIC_METRIC_PREFIX_PLACE_HOLDER)
+@Service(GenericMetricEntity.GENERIC_METRIC_SERVICE)
+@TimeSeries(true)
+@Metric(interval=60000)
+@ServicePath(path = "/metric")
+public class GenericMetricEntity extends TaggedLogAPIEntity {
+	public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
+	public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = "GENERIC_METRIC_PREFIX_PLACEHODLER";
+	public static final String VALUE_FIELD ="value";
+
+	@Column("a")
+	private double[] value;
+
+	public double[] getValue() {
+		return value;
+	}
+
+	public void setValue(double[] value) {
+		this.value = value;
+		_pcs.firePropertyChange("value", null, null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
new file mode 100755
index 0000000..84b02ae
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class GenericMetricEntityBatchReader  implements EntityCreationListener{
+	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class);
+	
+	private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+	private GenericEntityStreamReader reader;
+	
+	public GenericMetricEntityBatchReader(String metricName, SearchCondition condition) throws Exception{
+		reader = new GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, condition, metricName);
+	}
+	
+	public long getLastTimestamp() {
+		return reader.getLastTimestamp();
+	}
+	public long getFirstTimestamp() {
+		return reader.getFirstTimestamp();
+	}
+	@Override
+	public void entityCreated(TaggedLogAPIEntity entity){
+		entities.add(entity);
+	}
+	
+	@SuppressWarnings("unchecked")
+	public <T> List<T> read() throws Exception{
+		if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch mode");
+		reader.register(this);
+		reader.readAsStream();
+		return (List<T>)entities;
+	}
+}



Mime
View raw message