eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [2/3] incubator-eagle git commit: EAGLE-190 JBDC metadata storage extension and support derby embedded database as default storage
Date Wed, 16 Mar 2016 05:19:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/pom.xml b/eagle-core/eagle-query/eagle-storage-jdbc/pom.xml
index c099e98..c37247e 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/pom.xml
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/pom.xml
@@ -51,5 +51,9 @@
             <groupId>org.apache.torque</groupId>
             <artifactId>torque-runtime</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcConstants.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcConstants.java
index 7fb4dbd..af5be8f 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcConstants.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcConstants.java
@@ -16,6 +16,8 @@
  */
 package org.apache.eagle.storage.jdbc;
 
+import java.sql.Types;
+
 /**
  * Jdbc Storage Constants
  */
@@ -25,6 +27,9 @@ public class JdbcConstants {
     public static final String METRIC_NAME_COLUMN_NAME = "metric";
     public static final String ROW_KEY_COLUMN_NAME = "uuid";
 
+    public static final int DEFAULT_TYPE_FOR_COMPLEX_TYPE = Types.BLOB;
+    public static final int DEFAULT_VARCHAR_SIZE =1024;
+
     // Eagle JDBC Storage Configuration
     public final static String EAGLE_DB_USERNAME = "eagle.service.storage-username";
     public final static String EAGLE_DB_PASSWORD = "eagle.service.storage-password";
@@ -34,4 +39,8 @@ public class JdbcConstants {
     public final static String EAGLE_DATABASE= "eagle.service.storage-database";
     public final static String EAGLE_DRIVER_CLASS= "eagle.service.storage-driver-class";
     public final static String EAGLE_CONN_MAX_SIZE= "eagle.service.storage-connection-max";
+
+    public static final boolean isReservedField(String columnName){
+        return TIMESTAMP_COLUMN_NAME.equals(columnName) || METRIC_NAME_COLUMN_NAME.equals(columnName) || ROW_KEY_COLUMN_NAME.equals(columnName);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcStorage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcStorage.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcStorage.java
index 66b3c5a..c490dcd 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcStorage.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcStorage.java
@@ -30,13 +30,17 @@ import org.apache.eagle.storage.jdbc.entity.impl.JdbcEntityUpdaterImpl;
 import org.apache.eagle.storage.jdbc.entity.impl.JdbcEntityWriterImpl;
 import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition;
 import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinitionManager;
+import org.apache.eagle.storage.jdbc.schema.JdbcEntitySchemaManager;
 import org.apache.eagle.storage.operation.CompiledQuery;
 import org.apache.eagle.storage.result.ModifyResult;
 import org.apache.eagle.storage.result.QueryResult;
+import org.apache.torque.ConstraintViolationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.sql.SQLIntegrityConstraintViolationException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -52,8 +56,9 @@ public class JdbcStorage extends DataStorageBase {
         try {
             JdbcEntityDefinitionManager.load();
             ConnectionManagerFactory.getInstance();
+            JdbcEntitySchemaManager.getInstance().init();
         } catch (Exception e) {
-            LOG.error("Failed to initialize connection manager",e);
+            LOG.error("Failed to start connection manager",e);
             throw new IOException(e);
         }
     }
@@ -70,14 +75,14 @@ public class JdbcStorage extends DataStorageBase {
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
             result.setSuccess(false);
-            throw new IOException(e);
+            throw new IOException(e.getCause());
         }
         return result;
     }
 
     @Override
     public <E extends TaggedLogAPIEntity> ModifyResult<String> create(List<E> entities, EntityDefinition entityDefinition) throws IOException {
-        ModifyResult<String> result = new ModifyResult<String>();
+        ModifyResult<String> result = new ModifyResult<>();
         try {
             JdbcEntityDefinition jdbcEntityDefinition =  JdbcEntityDefinitionManager.getJdbcEntityDefinition(entityDefinition);
             JdbcEntityWriter writer = new JdbcEntityWriterImpl(jdbcEntityDefinition);
@@ -86,9 +91,9 @@ public class JdbcStorage extends DataStorageBase {
             result.setSize(keys.size());
             result.setSuccess(true);
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error(e.getMessage(), e.getCause());
             result.setSuccess(false);
-            throw new IOException(e);
+            throw new IOException(e.getCause());
         }
         return result;
     }
@@ -105,7 +110,7 @@ public class JdbcStorage extends DataStorageBase {
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
             result.setSuccess(false);
-            throw new IOException(e);
+            throw new IOException(e.getCause());
         }
         return result;
     }
@@ -123,7 +128,7 @@ public class JdbcStorage extends DataStorageBase {
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
             result.setSuccess(false);
-            throw new IOException(e);
+            throw new IOException(e.getCause());
         }
         return result;
     }
@@ -140,7 +145,7 @@ public class JdbcStorage extends DataStorageBase {
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
             result.setSuccess(false);
-            throw new IOException(e);
+            throw new IOException(e.getCause());
         }
         return result;
     }
@@ -170,7 +175,7 @@ public class JdbcStorage extends DataStorageBase {
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
             result.setSuccess(false);
-            throw new IOException(e);
+            throw new IOException(e.getCause());
         }
         return result;
     }
@@ -195,7 +200,7 @@ public class JdbcStorage extends DataStorageBase {
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
             result.setSuccess(false);
-            throw new IOException(e);
+            throw new IOException(e.getCause());
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/EncodedRowkeyPrimaryKeyBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/EncodedRowkeyPrimaryKeyBuilder.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/EncodedRowkeyPrimaryKeyBuilder.java
new file mode 100644
index 0000000..4ab8f56
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/EncodedRowkeyPrimaryKeyBuilder.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.storage.jdbc.conn.impl;
+
+
+import org.apache.eagle.common.EagleBase64Wrapper;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.log.entity.old.RowkeyHelper;
+import org.apache.eagle.storage.jdbc.conn.PrimaryKeyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EncodedRowkeyPrimaryKeyBuilder implements PrimaryKeyBuilder<String> {
+    private static final Logger LOG = LoggerFactory.getLogger(EncodedRowkeyPrimaryKeyBuilder.class);
+
+    @Override
+    public <T> String build(T t) {
+        if(t == null) return null;
+
+        try {
+            EntityDefinition entityDefinition
+                    = EntityDefinitionManager.getEntityDefinitionByEntityClass((Class<? extends TaggedLogAPIEntity>) t.getClass());
+            return EagleBase64Wrapper.encodeByteArray2URLSafeString(RowkeyHelper.getRowkey((TaggedLogAPIEntity) t,entityDefinition));
+        } catch (Exception e) {
+            LOG.error("Got error to build rowKey for {}",t,e);
+            throw new RuntimeException("Got error to build rowKey",e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/TorqueStatementPeerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/TorqueStatementPeerImpl.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/TorqueStatementPeerImpl.java
index 69e1192..833d7ae 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/TorqueStatementPeerImpl.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/TorqueStatementPeerImpl.java
@@ -45,7 +45,8 @@ public class TorqueStatementPeerImpl<T> implements StatementExecutor {
         this.basePeer.setTableMap(tableMap);
     }
 
-    private static PrimaryKeyBuilder<String> _primaryKeyBuilderInstance = new UUIDPrimaryKeyBuilder();;
+    private static PrimaryKeyBuilder<String> _primaryKeyBuilderInstance = new EncodedRowkeyPrimaryKeyBuilder();
+
     @Override
     public PrimaryKeyBuilder<String> getPrimaryKeyBuilder() {
         return _primaryKeyBuilderInstance;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/ExpressionCriterionBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/ExpressionCriterionBuilder.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/ExpressionCriterionBuilder.java
index ec92849..b3eb462 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/ExpressionCriterionBuilder.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/ExpressionCriterionBuilder.java
@@ -19,6 +19,7 @@ package org.apache.eagle.storage.jdbc.criteria.impl;
 import org.apache.eagle.log.entity.EntityQualifierUtils;
 import org.apache.eagle.storage.jdbc.criteria.CriterionBuilder;
 import org.apache.eagle.query.parser.*;
+import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition;
 import org.apache.torque.ColumnImpl;
 import org.apache.torque.criteria.Criterion;
 import org.apache.torque.criteria.SqlEnum;
@@ -33,10 +34,12 @@ import java.util.regex.Matcher;
 public class ExpressionCriterionBuilder implements CriterionBuilder {
     private final String tableName;
     private final ORExpression expression;
+    private final JdbcEntityDefinition jdbcEntityDefinition;
 
-    public ExpressionCriterionBuilder(ORExpression expression,String tableName) {
+    public ExpressionCriterionBuilder(ORExpression expression, JdbcEntityDefinition entityDefinition) {
         this.expression = expression;
-        this.tableName = tableName;
+        this.tableName = entityDefinition.getJdbcTableName();
+        this.jdbcEntityDefinition = entityDefinition;
     }
 
     @Override
@@ -65,23 +68,58 @@ public class ExpressionCriterionBuilder implements CriterionBuilder {
     }
 
     private Criterion toAtomicCriterion(AtomicExpression atomic){
-        Object left = toColumn(atomic.getKeyType(), atomic.getKey(),atomic.getOp());
-        Object right = toColumn(atomic.getValueType(), atomic.getValue(),atomic.getOp());
+        Class<?> columnType = locateColumnType(atomic);
+        Object left = toColumn(atomic.getKeyType(), atomic.getKey(),atomic.getOp(),columnType);
+        Object right = toColumn(atomic.getValueType(), atomic.getValue(), atomic.getOp(),columnType);
         SqlEnum op = toSqlEnum(atomic.getOp());
         return new Criterion(left,right,op);
     }
 
-    private Object toColumn(TokenType tokenType,String value,ComparisonOperator op){
-        if(op.equals(ComparisonOperator.CONTAINS) && tokenType.equals(TokenType.STRING)){
-            return "%"+value+"%";
-        }else if(tokenType.equals(TokenType.ID)){
-            return new ColumnImpl(this.tableName,parseEntityAttribute(value));
-        }else if(!tokenType.equals(TokenType.ID) && op.equals(ComparisonOperator.IN)){
+    private Class<?> locateColumnType(AtomicExpression atomic) {
+        String columnName = null;
+        if(atomic.getKeyType().equals(TokenType.ID)){
+            columnName =  parseEntityAttribute(atomic.getKey());
+        }else if(atomic.getValueType().equals(TokenType.ID)){
+            columnName = parseEntityAttribute(atomic.getValue());
+        }
+        if(jdbcEntityDefinition.getInternal().getDisplayNameMap().containsKey(columnName)){
+            try {
+                return jdbcEntityDefinition.getColumnType(columnName);
+            } catch (NoSuchFieldException e) {
+                throw new RuntimeException(e);
+            }
+        }else{
+            return null;
+        }
+    }
+
+    /**
+     * this place is used for rewriting query for jdbc connection
+     * @param tokenType
+     * @param value
+     * @param op
+     * @return
+     */
+    private Object toColumn(TokenType tokenType,String value,ComparisonOperator op, Class<?> columnType) {
+        if (op.equals(ComparisonOperator.CONTAINS) && tokenType.equals(TokenType.STRING)) {
+            return "%" + value + "%";
+        } else if (tokenType.equals(TokenType.ID)) {
+            return new ColumnImpl(this.tableName, parseEntityAttribute(value));
+        } else if (!tokenType.equals(TokenType.ID) && op.equals(ComparisonOperator.IN)) {
             return EntityQualifierUtils.parseList(value);
-        }else if(tokenType.equals(TokenType.NUMBER)){
+        } else if (tokenType.equals(TokenType.NUMBER)) {
             // TODO: currently only treat all number value as double
-            return Double.parseDouble(value);
-        }else{
+            if(columnType.equals(Long.class)) {
+                return Long.parseLong(value);
+            } else {
+                return Double.parseDouble(value);
+            }
+        } else if (op.equals(ComparisonOperator.LIKE) && value.equals(".*")){
+            return "%";
+        } else{
+            if((boolean.class.equals(columnType) || Boolean.class.equals(columnType)) && value != null){
+                return Boolean.valueOf(value);
+            }
             // TODO: parse type according entity field type
             return value;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java
index 4d590de..a82d185 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java
@@ -23,6 +23,7 @@ import org.apache.eagle.query.parser.ORExpression;
 import org.apache.eagle.storage.jdbc.criteria.CriteriaBuilder;
 import org.apache.eagle.storage.jdbc.criteria.CriterionBuilder;
 import org.apache.eagle.storage.jdbc.JdbcConstants;
+import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition;
 import org.apache.eagle.storage.operation.CompiledQuery;
 import org.apache.torque.ColumnImpl;
 import org.apache.torque.criteria.Criteria;
@@ -38,10 +39,18 @@ public class QueryCriteriaBuilder implements CriteriaBuilder {
 
     private final CompiledQuery query;
     private final String tableName;
+    private final Boolean limitEnabled;
+    private final JdbcEntityDefinition jdbcEntityDefinition;
 
-    public QueryCriteriaBuilder(CompiledQuery query, String tableName){
+    public QueryCriteriaBuilder(CompiledQuery query, JdbcEntityDefinition entityDefinition){
+        this(query,entityDefinition,true);
+    }
+
+    public QueryCriteriaBuilder(CompiledQuery query, JdbcEntityDefinition entityDefinition, Boolean limitEnabled){
         this.query = query;
-        this.tableName = tableName;
+        this.tableName = entityDefinition.getJdbcTableName();
+        this.limitEnabled = limitEnabled;
+        this.jdbcEntityDefinition = entityDefinition;
     }
 
     @Override
@@ -79,6 +88,12 @@ public class QueryCriteriaBuilder implements CriteriaBuilder {
             }
         }
 
+        // If no columns are specified, then select * by default
+        if(root.getSelectColumns() == null || root.getSelectColumns().size() ==0){
+            // SELECT *
+            root.addSelectColumn(new ColumnImpl(this.tableName, "*"));
+        }
+
         // FROM $tableName
         root.addFrom(this.tableName);
 
@@ -87,7 +102,7 @@ public class QueryCriteriaBuilder implements CriteriaBuilder {
                         .and(new Criterion(new ColumnImpl(this.tableName, JdbcConstants.TIMESTAMP_COLUMN_NAME),query.getEndTime(), SqlEnum.LESS_THAN));
         ORExpression expression = searchCondition.getQueryExpression();
         if(expression!=null){
-            CriterionBuilder criterionBuilder = new ExpressionCriterionBuilder(expression,tableName);
+            CriterionBuilder criterionBuilder = new ExpressionCriterionBuilder(expression,this.jdbcEntityDefinition);
             where = where.and(criterionBuilder.build());
         }
 
@@ -98,8 +113,10 @@ public class QueryCriteriaBuilder implements CriteriaBuilder {
 
         root.where(where);
 
-        // LIMITED BY $pageSize
-        root.setLimit((int) searchCondition.getPageSize());
+        if(this.limitEnabled) {
+            // LIMITED BY $pageSize
+            root.setLimit((int) searchCondition.getPageSize());
+        }
 
         // TODO: GROUP BY
         if(query.isHasAgg()){

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntitySerDeserHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntitySerDeserHelper.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntitySerDeserHelper.java
index 651ee19..3750f71 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntitySerDeserHelper.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntitySerDeserHelper.java
@@ -28,6 +28,8 @@ import org.apache.commons.beanutils.PropertyUtils;
 import org.apache.torque.ColumnImpl;
 import org.apache.torque.util.ColumnValues;
 import org.apache.torque.util.JdbcTypedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.beans.PropertyDescriptor;
 import java.io.IOException;
@@ -44,6 +46,7 @@ import java.util.Map;
  * @since 3/26/15
  */
 public class JdbcEntitySerDeserHelper {
+    private final static Logger LOG = LoggerFactory.getLogger(JdbcEntitySerDeserHelper.class);
     /**
      *
      * @param row
@@ -55,7 +58,7 @@ public class JdbcEntitySerDeserHelper {
      * @throws InvocationTargetException
      * @throws NoSuchMethodException
      */
-    public static <E extends TaggedLogAPIEntity> E buildEntity(Map<String, Object> row, JdbcEntityDefinition entityDefinition) throws IllegalAccessException, InstantiationException, InvocationTargetException, NoSuchMethodException {
+    public static <E extends TaggedLogAPIEntity> E buildEntity(Map<String, Object> row, JdbcEntityDefinition entityDefinition) throws IOException {
         EntityDefinition ed = entityDefinition.getInternal();
 
         Class<? extends TaggedLogAPIEntity> clazz = ed.getEntityClass();
@@ -63,24 +66,35 @@ public class JdbcEntitySerDeserHelper {
             throw new NullPointerException("Entity class of service "+ed.getService()+" is null");
         }
 
-        TaggedLogAPIEntity obj = clazz.newInstance();
-        Map<String, Qualifier> map = ed.getDisplayNameMap();
+        TaggedLogAPIEntity obj = null;
+        try {
+            obj = clazz.newInstance();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(),e.getCause());
+            throw new IOException(e);
+        }
+        Map<String, Qualifier> rawmap = ed.getDisplayNameMap();
+        // rdbms may contains field which is not case insensitive, we need convert all into lower case
+        Map<String, Qualifier> map = new HashMap<>();
+        for(Map.Entry<String, Qualifier> e : rawmap.entrySet()){
+            map.put(e.getKey().toLowerCase(), e.getValue());
+        }
         for(Map.Entry<String, Object> entry : row.entrySet()){
             // timestamp;
-            if(JdbcConstants.TIMESTAMP_COLUMN_NAME.equals(entry.getKey())){
+            if(JdbcConstants.TIMESTAMP_COLUMN_NAME.equalsIgnoreCase(entry.getKey())){
                 obj.setTimestamp((Long) entry.getValue());
                 continue;
             }
 
             // set metric as prefix for generic metric
             if(entityDefinition.getInternal().getService().equals(GenericMetricEntity.GENERIC_METRIC_SERVICE) &&
-                    JdbcConstants.METRIC_NAME_COLUMN_NAME.equals(entry.getKey())){
+                    JdbcConstants.METRIC_NAME_COLUMN_NAME.equalsIgnoreCase(entry.getKey())){
                 obj.setPrefix((String) entry.getValue());
                 continue;
             }
 
             // rowkey: uuid
-            if(JdbcConstants.ROW_KEY_COLUMN_NAME.equals(entry.getKey())){
+            if(JdbcConstants.ROW_KEY_COLUMN_NAME.equalsIgnoreCase(entry.getKey())){
                 obj.setEncodedRowkey((String) entry.getValue());
                 continue;
             }
@@ -91,16 +105,37 @@ public class JdbcEntitySerDeserHelper {
                 if(obj.getTags() == null){
                     obj.setTags(new HashMap<String, String>());
                 }
-                obj.getTags().put(entry.getKey(), (String) entry.getValue());
+                // get normalized tag name, not efficient, but we need make it work first
+                String key = null;
+                if(ed.getTags() != null) {
+                    for (String tag : ed.getTags()) {
+                        if (tag.equalsIgnoreCase(entry.getKey())) {
+                            key = tag;
+                            break;
+                        }
+                    }
+                }
+                try {
+                    obj.getTags().put(key == null ? entry.getKey() : key, (String) entry.getValue());
+                }catch (ClassCastException ex){
+                    LOG.error("Tag value {} = {} is not String",key,entry.getValue(),ex);
+                    throw ex;
+                }
                 continue;
             }
 
             // parse different types of qualifiers
             String fieldName = q.getDisplayName();
             // PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(obj, fieldName);
-            PropertyDescriptor pd = getPropertyDescriptor(obj,fieldName);
-            if(entry.getValue() != null){
-                pd.getWriteMethod().invoke(obj, entry.getValue());
+            PropertyDescriptor pd = null;
+            try {
+                pd = getPropertyDescriptor(obj,fieldName);
+                if(entry.getValue() != null) {
+                    pd.getWriteMethod().invoke(obj, entry.getValue());
+                }
+            } catch (Exception ex){
+                LOG.error("Failed to set value  {} = {}",fieldName,entry.getValue(),ex);
+                throw new IOException(String.format("Failed to set value  %s = %s",fieldName,entry.getValue()),ex);
             }
         }
 
@@ -166,7 +201,19 @@ public class JdbcEntitySerDeserHelper {
             if(serDeser==null){
                 throw new IOException("SQLSerDeser for column: "+columnName+" is null");
             }
-            Object value = serDeser.readValue(resultSet, columnName, entityDefinition);
+            Object value;
+
+            if (entityDefinition.isField(columnName)) {
+                try {
+                    value = serDeser.toJavaTypedValue(resultSet, entityDefinition.getColumnType(columnName), columnName, entityDefinition.getColumnQualifier(columnName));
+                } catch (NoSuchFieldException e) {
+                    LOG.error("No field {} in entity {}", columnName, entityDefinition.getInternal().getEntityClass());
+                    throw new IOException(String.format("No field %s in entity %s", columnName, entityDefinition.getInternal().getEntityClass()), e);
+                }
+            }else{
+                // treat as tag or others
+                value = resultSet.getObject(columnName);
+            }
             row.put(columnName,value);
         }
         return row;
@@ -205,7 +252,8 @@ public class JdbcEntitySerDeserHelper {
 
             Class<?> fieldType = qualifier.getSerDeser().type();
             JdbcSerDeser jdbcSerDeser = JdbcEntityDefinitionManager.getJdbcSerDeser(fieldType);
-            JdbcTypedValue jdbcTypedValue = jdbcSerDeser.getJdbcTypedValue(fieldValue, fieldType);
+
+            JdbcTypedValue jdbcTypedValue = jdbcSerDeser.toJdbcTypedValue(fieldValue, fieldType, qualifier);
             columnValues.put(new ColumnImpl(tableName,displayName),jdbcTypedValue);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityUtils.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityUtils.java
new file mode 100644
index 0000000..b2788ea
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityUtils.java
@@ -0,0 +1,26 @@
+package org.apache.eagle.storage.jdbc.entity;
+
+import org.apache.eagle.log.entity.meta.Qualifier;
+import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class JdbcEntityUtils {
+    public static Qualifier getColumnQualifier(JdbcEntityDefinition entityDefinition,String columnName){
+        return entityDefinition.getInternal().getDisplayNameMap().get(columnName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityWriter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityWriter.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityWriter.java
index 072174e..eb5a074 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityWriter.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityWriter.java
@@ -33,5 +33,5 @@ public interface JdbcEntityWriter<E extends TaggedLogAPIEntity> {
      * @return primary keys' list
      * @throws Exception
      */
-    public List<String> write(List<E> entities) throws Exception;
+    List<String> write(List<E> entities) throws Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityDeleterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityDeleterImpl.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityDeleterImpl.java
index 35e1078..4ebd792 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityDeleterImpl.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityDeleterImpl.java
@@ -61,7 +61,7 @@ public class JdbcEntityDeleterImpl<E extends TaggedLogAPIEntity> implements Jdbc
 
     @Override
     public int deleteByQuery(CompiledQuery query) throws Exception {
-        QueryCriteriaBuilder criteriaBuilder = new QueryCriteriaBuilder(query,this.jdbcEntityDefinition.getJdbcTableName());
+        QueryCriteriaBuilder criteriaBuilder = new QueryCriteriaBuilder(query,this.jdbcEntityDefinition,false);
         Criteria criteria = criteriaBuilder.build();
         return deleteByCriteria(criteria);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityReaderImpl.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityReaderImpl.java
index d0d6a61..a7b93b5 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityReaderImpl.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityReaderImpl.java
@@ -24,6 +24,7 @@ import org.apache.eagle.storage.jdbc.entity.JdbcEntityReader;
 import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition;
 import org.apache.eagle.storage.operation.CompiledQuery;
 import org.apache.commons.lang.time.StopWatch;
+import org.apache.torque.ColumnImpl;
 import org.apache.torque.criteria.Criteria;
 import org.apache.torque.om.mapper.RecordMapper;
 import org.apache.torque.sql.SqlBuilder;
@@ -46,9 +47,10 @@ public class JdbcEntityReaderImpl implements JdbcEntityReader {
         this.jdbcEntityDefinition = jdbcEntityDefinition;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public <E extends Object> List<E> query(CompiledQuery query) throws Exception {
-        QueryCriteriaBuilder criteriaBuilder = new QueryCriteriaBuilder(query,this.jdbcEntityDefinition.getJdbcTableName());
+        QueryCriteriaBuilder criteriaBuilder = new QueryCriteriaBuilder(query,this.jdbcEntityDefinition);
         Criteria criteria = criteriaBuilder.build();
         String displaySql = SqlBuilder.buildQuery(criteria).getDisplayString();
 
@@ -88,6 +90,7 @@ public class JdbcEntityReaderImpl implements JdbcEntityReader {
         try {
             stopWatch.start();
             TorqueStatementPeerImpl peer = ConnectionManagerFactory.getInstance().getStatementExecutor();
+            criteria.addSelectColumn(new ColumnImpl(jdbcEntityDefinition.getJdbcTableName(),"*"));
             result = peer.delegate().doSelect(criteria, recordMapper);
             LOG.info(String.format("Read %s records in %s ms (sql: %s)",result.size(),stopWatch.getTime(),displaySql));
         }catch (Exception ex){

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityUpdaterImpl.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityUpdaterImpl.java
index 6f9bf3d..4baa216 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityUpdaterImpl.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityUpdaterImpl.java
@@ -16,7 +16,10 @@
  */
 package org.apache.eagle.storage.jdbc.entity.impl;
 
+import org.apache.commons.configuration.ConfigurationFactory;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.log.entity.old.RowkeyHelper;
 import org.apache.eagle.storage.jdbc.conn.ConnectionManager;
 import org.apache.eagle.storage.jdbc.conn.ConnectionManagerFactory;
 import org.apache.eagle.storage.jdbc.conn.impl.TorqueStatementPeerImpl;
@@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -59,7 +63,11 @@ public class JdbcEntityUpdaterImpl<E extends TaggedLogAPIEntity> implements Jdbc
         try {
             for (E entity : entities) {
                 String primaryKey = entity.getEncodedRowkey();
-                PrimaryKeyCriteriaBuilder pkBuilder = new PrimaryKeyCriteriaBuilder(Arrays.asList(primaryKey), this.jdbcEntityDefinition.getJdbcTableName());
+                if(primaryKey==null) {
+                    primaryKey = ConnectionManagerFactory.getInstance().getStatementExecutor().getPrimaryKeyBuilder().build(entity);
+                    entity.setEncodedRowkey(primaryKey);
+                }
+                PrimaryKeyCriteriaBuilder pkBuilder = new PrimaryKeyCriteriaBuilder(Collections.singletonList(primaryKey), this.jdbcEntityDefinition.getJdbcTableName());
                 Criteria selectCriteria = pkBuilder.build();
                 if(LOG.isDebugEnabled()) LOG.debug("Updating by query: "+SqlBuilder.buildQuery(selectCriteria).getDisplayString());
                 ColumnValues columnValues = JdbcEntitySerDeserHelper.buildColumnValues(entity, this.jdbcEntityDefinition);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityWriterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityWriterImpl.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityWriterImpl.java
index d1dc9bd..de15384 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityWriterImpl.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityWriterImpl.java
@@ -20,17 +20,22 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.storage.jdbc.conn.ConnectionManager;
 import org.apache.eagle.storage.jdbc.conn.ConnectionManagerFactory;
 import org.apache.eagle.storage.jdbc.conn.impl.TorqueStatementPeerImpl;
+import org.apache.eagle.storage.jdbc.criteria.impl.PrimaryKeyCriteriaBuilder;
 import org.apache.eagle.storage.jdbc.entity.JdbcEntitySerDeserHelper;
 import org.apache.eagle.storage.jdbc.entity.JdbcEntityWriter;
 import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition;
 import org.apache.commons.lang.time.StopWatch;
+import org.apache.torque.ConstraintViolationException;
+import org.apache.torque.criteria.Criteria;
 import org.apache.torque.om.ObjectKey;
+import org.apache.torque.sql.SqlBuilder;
 import org.apache.torque.util.ColumnValues;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -68,17 +73,33 @@ public class JdbcEntityWriterImpl<E extends TaggedLogAPIEntity> implements JdbcE
                 entity.setEncodedRowkey(peer.getPrimaryKeyBuilder().build(entity));
                 ColumnValues columnValues = JdbcEntitySerDeserHelper.buildColumnValues(entity, this.jdbcEntityDefinition);
 
-                // TODO: implement batch insert for better performance
-                ObjectKey key = peer.delegate().doInsert(columnValues,connection);
-
+                ObjectKey key = null;
                 try {
+                    // TODO: implement batch insert for better performance
+                    key = peer.delegate().doInsert(columnValues,connection);
+
                     if (key != null) {
                         keys.add((String) key.getValue());
                     } else {
                         keys.add(entity.getEncodedRowkey());
                     }
                 } catch (ClassCastException ex) {
+                    assert key != null;
                     throw new RuntimeException("Key is not in type of String (VARCHAR) , but JdbcType (java.sql.Types): " + key.getJdbcType() + ", value: " + key.getValue(), ex);
+                } catch (ConstraintViolationException e){
+                    // Override with updating if duplicated key exception
+                    if(e.getMessage().contains("The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by")){
+                        String primaryKey = entity.getEncodedRowkey();
+                        if(primaryKey==null) {
+                            primaryKey = ConnectionManagerFactory.getInstance().getStatementExecutor().getPrimaryKeyBuilder().build(entity);
+                            entity.setEncodedRowkey(primaryKey);
+                        }
+                        PrimaryKeyCriteriaBuilder pkBuilder = new PrimaryKeyCriteriaBuilder(Collections.singletonList(primaryKey), this.jdbcEntityDefinition.getJdbcTableName());
+                        Criteria selectCriteria = pkBuilder.build();
+                        if(LOG.isDebugEnabled()) LOG.debug("Updating by query: "+ SqlBuilder.buildQuery(selectCriteria).getDisplayString());
+                        peer.delegate().doUpdate(selectCriteria, columnValues, connection);
+                        keys.add(primaryKey);
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/IJdbcEntityDDLManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/IJdbcEntityDDLManager.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/IJdbcEntityDDLManager.java
new file mode 100644
index 0000000..9e3f10b
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/IJdbcEntityDDLManager.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.storage.jdbc.schema;
+
+/**
+ * @since 3/27/15
+ */
+public interface IJdbcEntityDDLManager {
+    void init();
+    void reinit();
+    void shutdown();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinition.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinition.java
index 16cee99..eb5e874 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinition.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinition.java
@@ -19,8 +19,12 @@ package org.apache.eagle.storage.jdbc.schema;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.log.entity.meta.EntityDefinition;
 import org.apache.eagle.log.entity.meta.Qualifier;
+import org.apache.eagle.storage.jdbc.JdbcConstants;
 import org.apache.eagle.storage.jdbc.schema.serializer.JdbcSerDeser;
 
+import java.util.Map;
+import java.util.Set;
+
 /**
  * @since 3/26/15
  */
@@ -52,42 +56,42 @@ public class JdbcEntityDefinition {
     }
 
     public Class<?> getColumnType(String fieldName) throws NoSuchFieldException {
-        return internal.getEntityClass().getField(fieldName).getType();
+        if (fieldName.equals(JdbcConstants.TIMESTAMP_COLUMN_NAME)){
+            return Long.class;
+        }else if(fieldName.equals(JdbcConstants.ROW_KEY_COLUMN_NAME)) {
+            return String.class;
+        }else if(fieldName.equals(JdbcConstants.METRIC_NAME_COLUMN_NAME)){
+            return String.class;
+        }
+        for(String realField:internal.getDisplayNameMap().keySet()){
+            if(realField.equalsIgnoreCase(fieldName)){
+                return internal.getEntityClass().getDeclaredField(realField).getType();
+            }
+        }
+        throw new NoSuchFieldException(fieldName);
     }
 
-    /**
-     *
-     * TODO: Generate table schema DDL according entity definition
-     *
-     * @link https://db.apache.org/ddlutils/
-     *
-     * CREATE TABLE ${prefix}${tableName}{
-     *      prefix prefix;
-     *      encodedRowkey varchar;
-     *      intField1 int;
-     *      longField bitint;
-     *      tag varchar;
-     * } PRIMARY KEY(encodedRowkey);
-     *
-     * CREATE TABLE ${metricTable}{
-     *      encodedRowkey varchar;
-     *      prefix varchar;
-     *      intField1 int;
-     *      longField bitint;
-     *      tag varchar;
-     * } PRIMARY KEY(rowkey,prefix);
-     *
-     * @param tagsFields
-     * @return
-     */
-    @SuppressWarnings("unused")
-    public String getJdbcSchemaDDL(String[] tagsFields){
-        throw new RuntimeException("TODO: not implemented yet");
+    public Class<?> getColumnTypeOrNull(String fieldName){
+        try {
+            return getColumnType(fieldName);
+        } catch (NoSuchFieldException e) {
+            return null;
+        }
+    }
+
+    public Integer getJdbcColumnTypeCodeOrNull(String fieldName){
+        Class<?> columnType;
+        try {
+            columnType = getColumnType(fieldName);
+            return JdbcEntityDefinitionManager.getJdbcType(columnType);
+        } catch (NoSuchFieldException e) {
+            return null;
+        }
     }
 
     @SuppressWarnings("unchecked")
     public JdbcSerDeser getJdbcSerDeser(String columnName) {
-        Qualifier qualifier = this.internal.getQualifierNameMap().get(columnName);
+        Qualifier qualifier = this.getColumnQualifier(columnName);
         if(qualifier == null){
             return JdbcEntityDefinitionManager.DEFAULT_JDBC_SERDESER;
         }else {
@@ -98,4 +102,22 @@ public class JdbcEntityDefinition {
     public boolean isGenericMetric(){
         return this.internal.getEntityClass().equals(GenericMetricEntity.class);
     }
+
+    public boolean isField(String columnName){
+        for(String name:this.internal.getDisplayNameMap().keySet()){
+            if(name.equalsIgnoreCase(columnName)){
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public Qualifier getColumnQualifier(String columnName) {
+        for(Map.Entry<String,Qualifier> entry:this.internal.getDisplayNameMap().entrySet()){
+            if(entry.getKey().equalsIgnoreCase(columnName)){
+                return entry.getValue();
+            }
+        }
+        return null;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java
index cfad442..cf163fb 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java
@@ -18,9 +18,12 @@ package org.apache.eagle.storage.jdbc.schema;
 
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
 import org.apache.eagle.log.entity.meta.EntitySerDeser;
 import org.apache.eagle.storage.jdbc.schema.serializer.JdbcSerDeser;
 import org.apache.eagle.storage.jdbc.schema.serializer.DefaultJdbcSerDeser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Types;
 import java.util.HashMap;
@@ -33,9 +36,13 @@ import java.util.Map;
  * @since 3/27/15
  */
 public class JdbcEntityDefinitionManager {
+    private final static Logger LOG = LoggerFactory.getLogger(JdbcEntityDefinitionManager.class);
     private final static Map<Class<? extends TaggedLogAPIEntity>,JdbcEntityDefinition> sqlEntityDefinitionCache = new HashMap<Class<? extends TaggedLogAPIEntity>,JdbcEntityDefinition>();
+    private static Boolean initialized = false;
 
     public static JdbcEntityDefinition getJdbcEntityDefinition(EntityDefinition entityDefinition){
+        checkInit();
+
         Class<? extends TaggedLogAPIEntity> entityClass = entityDefinition.getEntityClass();
         JdbcEntityDefinition jdbcEntityDefinition = sqlEntityDefinitionCache.get(entityClass);
         if(jdbcEntityDefinition == null){
@@ -45,9 +52,37 @@ public class JdbcEntityDefinitionManager {
         return jdbcEntityDefinition;
     }
 
+    public static Map<Class<? extends TaggedLogAPIEntity>,JdbcEntityDefinition> getJdbcEntityDefinitionMap(){
+        checkInit();
+        return sqlEntityDefinitionCache;
+    }
+
+    public static JdbcEntityDefinition getJdbcEntityDefinition(Class<? extends TaggedLogAPIEntity> clazz) throws IllegalAccessException, InstantiationException {
+        checkInit();
+        return getJdbcEntityDefinition(EntityDefinitionManager.getEntityDefinitionByEntityClass(clazz));
+    }
+
+    private static void checkInit(){
+        if (!initialized) {
+            try {
+                Map<String,EntityDefinition> entries = EntityDefinitionManager.entities();
+                for (Map.Entry<String, EntityDefinition> entry : entries.entrySet()) {
+                    Class<? extends TaggedLogAPIEntity> entityClass = entry.getValue().getEntityClass();
+                    JdbcEntityDefinition jdbcEntityDefinition = sqlEntityDefinitionCache.get(entityClass);
+                    if(jdbcEntityDefinition == null){
+                        jdbcEntityDefinition = new JdbcEntityDefinition(entry.getValue());
+                        sqlEntityDefinitionCache.put(entityClass, jdbcEntityDefinition);
+                    }
+                }
+                initialized = true;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     public static void load(){
-        // TODO: 1. load all SQLEntityDefinition on init
-        // TODO: 2. do more initializing works
+        checkInit();
     }
 
     public static DefaultJdbcSerDeser DEFAULT_JDBC_SERDESER = new DefaultJdbcSerDeser();
@@ -83,15 +118,19 @@ public class JdbcEntityDefinitionManager {
      * @see java.sql.Types
      *
      * @param fieldType entity field type class
-     * @return java.sql.Types
+     * @return java.sql.Types, return Types.NULL if not found
      */
     public static Integer getJdbcType(Class<?> fieldType) {
-        if(!_classJdbcType.containsKey(fieldType)){
-            throw new IllegalArgumentException("Unable to locate jdbc type for: "+fieldType);
+        if(fieldType == null){
+            return Types.NULL;
+        } else if(!_classJdbcType.containsKey(fieldType)){
+            LOG.debug("Unable to locate simple jdbc type for: {}, return type as JAVA_OBJECT",fieldType);
+            return Types.JAVA_OBJECT;
         }
         return _classJdbcType.get(fieldType);
     }
 
+
     /**
      * Register fieldType with SQL types
      *
@@ -119,7 +158,10 @@ public class JdbcEntityDefinitionManager {
         registerJdbcType(double.class, Types.DOUBLE);
         registerJdbcType(long.class, Types.BIGINT);
         registerJdbcType(short.class, Types.INTEGER);
-        registerJdbcType(char[].class, Types.VARCHAR);
+       //  registerJdbcType(char[].class, Types.VARCHAR);
         registerJdbcType(char.class, Types.CHAR);
+
+        registerJdbcType(Boolean.class, Types.BOOLEAN);
+        registerJdbcType(boolean.class, Types.BOOLEAN);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntitySchemaManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntitySchemaManager.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntitySchemaManager.java
new file mode 100644
index 0000000..8489da7
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntitySchemaManager.java
@@ -0,0 +1,224 @@
+package org.apache.eagle.storage.jdbc.schema;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.ddlutils.Platform;
+import org.apache.ddlutils.PlatformFactory;
+import org.apache.ddlutils.model.*;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.meta.Qualifier;
+import org.apache.eagle.storage.jdbc.JdbcConstants;
+import org.apache.eagle.storage.jdbc.conn.ConnectionConfig;
+import org.apache.eagle.storage.jdbc.conn.ConnectionConfigFactory;
+import org.apache.eagle.storage.jdbc.conn.ConnectionManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Collection;
+import java.util.Map;
+
+public class JdbcEntitySchemaManager implements IJdbcEntityDDLManager {
+    private final static Logger LOG = LoggerFactory.getLogger(JdbcEntitySchemaManager.class);
+    private Database database;
+    private Platform platform;
+
+    private static IJdbcEntityDDLManager instance;
+
+    private JdbcEntitySchemaManager(){
+        instance = null;
+        ConnectionConfig config = ConnectionConfigFactory.getFromEagleConfig();
+        this.platform = PlatformFactory.createNewPlatformInstance(config.getAdapter());
+        Connection connection = null;
+        try {
+            connection = ConnectionManagerFactory.getInstance().getConnection();
+            this.database = platform.readModelFromDatabase(connection,config.getDatabaseName());
+            LOG.info("Loaded "+database);
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage(),e);
+        } finally {
+            if(connection!=null){
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    LOG.warn(e.getMessage(),e);
+                }
+            }
+        }
+    }
+
+    public static IJdbcEntityDDLManager getInstance(){
+        if(instance == null){
+            instance = new JdbcEntitySchemaManager();
+        }
+        return instance;
+    }
+
+    @Override
+    public void init() {
+        Connection connection = null;
+        try {
+            Database _database = identifyNewTables();
+            if(_database.getTableCount() >0 ) {
+                LOG.info("Creating {} new tables (totally {} tables)", _database.getTableCount(),database.getTableCount());
+                connection = ConnectionManagerFactory.getInstance().getConnection();
+                this.platform.createTables(connection,_database, false, true);
+                LOG.info("Created {} new tables: ",_database.getTableCount(),_database.getTables());
+            } else {
+                LOG.debug("All the {} tables have already been created, no new tables", database.getTableCount());
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(),e);
+            throw new IllegalStateException(e);
+        } finally {
+            if(connection != null){
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    LOG.warn(e.getMessage(),e);
+                }
+            }
+        }
+    }
+
+    private Database identifyNewTables(){
+        Database _database = new Database();
+        _database.setName(database.getName());
+        Collection<JdbcEntityDefinition> entityDefinitions = JdbcEntityDefinitionManager.getJdbcEntityDefinitionMap().values();
+        LOG.info("Initializing database and creating tables");
+        for (JdbcEntityDefinition entityDefinition : entityDefinitions) {
+            if (database.findTable(entityDefinition.getJdbcTableName()) == null) {
+                Table table = createTable(entityDefinition);
+                LOG.info("Creating {}", table.toVerboseString());
+                _database.addTable(table);
+                database.addTable(table);
+            } else {
+                LOG.debug("Table {} already exists", entityDefinition.getJdbcTableName());
+            }
+        }
+        return _database;
+    }
+
+    @Override
+    public void reinit(){
+        Connection connection = null;
+        try {
+            identifyNewTables();
+            connection = ConnectionManagerFactory.getInstance().getConnection();
+            this.platform.createTables(connection,database, true, true);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(),e);
+            throw new IllegalStateException(e);
+        } finally {
+            if(connection != null){
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    LOG.warn(e.getMessage(),e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        this.platform.shutdownDatabase();
+    }
+
+    private Table createTable(JdbcEntityDefinition entityDefinition){
+        Table table = new Table();
+        table.setName(entityDefinition.getJdbcTableName());
+        buildTable(entityDefinition,table);
+        return table;
+    }
+
+    private Column createTagColumn(String tagName){
+        Column tagColumn = new Column();
+        tagColumn.setName(tagName);
+        tagColumn.setTypeCode(Types.VARCHAR);
+        tagColumn.setJavaName(tagName);
+//        tagColumn.setScale(1024);
+        tagColumn.setSize(String.valueOf(JdbcConstants.DEFAULT_VARCHAR_SIZE));
+        tagColumn.setDefaultValue(null);
+        tagColumn.setDescription("eagle entity tag column for "+tagName);
+        return tagColumn;
+    }
+
+    private void buildTable(JdbcEntityDefinition entityDefinition, Table table){
+        // METRIC
+        if(entityDefinition.getInternal().getService()
+                .equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){
+            Column metricColumn = new Column();
+            metricColumn.setName(JdbcConstants.METRIC_NAME_COLUMN_NAME);
+            metricColumn.setTypeCode(Types.VARCHAR);
+//            metricColumn.setSizeAndScale(1024,1024);
+            metricColumn.setDescription("eagle entity metric column");
+            table.addColumn(metricColumn);
+        }
+
+        // ROWKEY
+        Column pkColumn = new Column();
+        pkColumn.setName(JdbcConstants.ROW_KEY_COLUMN_NAME);
+        pkColumn.setPrimaryKey(true);
+        pkColumn.setRequired(true);
+        pkColumn.setTypeCode(Types.VARCHAR);
+        pkColumn.setSize("1024");
+//        pkColumn.setSizeAndScale(1024,10240);
+
+        pkColumn.setDescription("eagle entity row-key column");
+        table.addColumn(pkColumn);
+
+        // TIMESTAMP
+        Column tsColumn = new Column();
+        tsColumn.setName(JdbcConstants.TIMESTAMP_COLUMN_NAME);
+        tsColumn.setTypeCode(Types.BIGINT);
+        tsColumn.setDescription("eagle entity timestamp column");
+        table.addColumn(tsColumn);
+
+        // TAGS
+        if(entityDefinition.getInternal().getTags() != null) {
+//            Index index = new UniqueIndex();
+            for (String tag : entityDefinition.getInternal().getTags()) {
+                Column tagColumn = createTagColumn(tag);
+                tagColumn.setSize(String.valueOf(JdbcConstants.DEFAULT_VARCHAR_SIZE));
+                table.addColumn(tagColumn);
+//                IndexColumn indexColumn = new IndexColumn();
+//                indexColumn.setName(tag);
+//                indexColumn.setOrdinalPosition(0);
+//                index.addColumn(indexColumn);
+//                index.setName(entityDefinition.getJdbcTableName()+"_tags_unique_index");
+            }
+//            TODO: enable index when experiencing performance issue on tag filtering.
+//            table.addIndex(index);
+        }
+
+        for(Map.Entry<String,Qualifier> entry: entityDefinition.getInternal().getDisplayNameMap().entrySet()){
+            Column fieldColumn = new Column();
+            fieldColumn.setName(entry.getKey());
+            fieldColumn.setJavaName(entry.getKey());
+            Integer typeCode = entityDefinition.getJdbcColumnTypeCodeOrNull(entry.getKey());
+            typeCode = typeCode == null? Types.VARCHAR:typeCode;
+            if(typeCode == Types.VARCHAR) fieldColumn.setSize(String.valueOf(JdbcConstants.DEFAULT_VARCHAR_SIZE));
+            fieldColumn.setTypeCode(typeCode);
+            fieldColumn.setDescription("eagle field column "+entry.getKey()+":"+entityDefinition.getColumnTypeOrNull(entry.getKey()));
+            table.addColumn(fieldColumn);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/JdbcEntityDdlManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/JdbcEntityDdlManager.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/JdbcEntityDdlManager.java
deleted file mode 100644
index 1e622a5..0000000
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/JdbcEntityDdlManager.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.jdbc.schema.ddl;
-
-/**
- * @since 3/27/15
- */
-public interface JdbcEntityDdlManager {
-    void init();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/package-info.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/package-info.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/package-info.java
deleted file mode 100644
index b732b31..0000000
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Manage RDBMS schemas according java entity definition
- *
- * TODO: not implemented yet
- *
- * @since 3/31/15
- */
-package org.apache.eagle.storage.jdbc.schema.ddl;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/package-info.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/package-info.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/package-info.java
new file mode 100644
index 0000000..adc653a
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Manage RDBMS schemas according java entity definition
+ *
+ * TODO: not implemented yet
+ *
+ * @since 3/31/15
+ */
+package org.apache.eagle.storage.jdbc.schema;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/DefaultJdbcSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/DefaultJdbcSerDeser.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/DefaultJdbcSerDeser.java
index f096d5f..3414e8c 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/DefaultJdbcSerDeser.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/DefaultJdbcSerDeser.java
@@ -16,13 +16,15 @@
  */
 package org.apache.eagle.storage.jdbc.schema.serializer;
 
-import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition;
+import org.apache.eagle.log.entity.meta.Qualifier;
+import org.apache.eagle.storage.jdbc.JdbcConstants;
 import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinitionManager;
 import org.apache.torque.util.JdbcTypedValue;
 
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
 
 /**
  * @since 3/26/15
@@ -30,12 +32,19 @@ import java.sql.SQLException;
 public class DefaultJdbcSerDeser<T,R> implements JdbcSerDeser<T> {
 
     @Override
-    public T readValue(ResultSet result, String fieldName, JdbcEntityDefinition JdbcEntityDefinition) throws IOException {
+    public T toJavaTypedValue(ResultSet result, Class<?> fieldType, String fieldName, Qualifier qualifier) throws IOException {
+        int jdbcType = JdbcEntityDefinitionManager.getJdbcType(fieldType);
         try {
-            Object val = result.getObject(fieldName);
-            return (T) val;
+            if(Types.JAVA_OBJECT == jdbcType){
+                byte[] bytes = result.getBytes(fieldName);
+                return (T) qualifier.getSerDeser().deserialize(bytes);
+            } else if(Types.BOOLEAN == jdbcType){
+                return (T) new Boolean(result.getBoolean(fieldName));
+            } else {
+                return (T) result.getObject(fieldName);
+            }
         } catch (SQLException e) {
-            throw new IOException(e);
+            throw new IOException("Field: "+fieldName+", java type:"+fieldType+", jdbc type: "+jdbcType,e);
         }
     }
 
@@ -46,7 +55,13 @@ public class DefaultJdbcSerDeser<T,R> implements JdbcSerDeser<T> {
      * @return
      */
     @Override
-    public JdbcTypedValue getJdbcTypedValue(Object fieldValue, Class<?> fieldType) {
-        return new JdbcTypedValue(fieldValue, JdbcEntityDefinitionManager.getJdbcType(fieldType));
+    public JdbcTypedValue toJdbcTypedValue(Object fieldValue, Class<?> fieldType, Qualifier qualifier) {
+        int jdbcTypeCode = JdbcEntityDefinitionManager.getJdbcType(fieldType);
+        if(Types.JAVA_OBJECT == jdbcTypeCode){
+            byte[] bytes = qualifier.getSerDeser().serialize(fieldValue);
+            return new JdbcTypedValue(bytes, JdbcConstants.DEFAULT_TYPE_FOR_COMPLEX_TYPE);
+        } else {
+            return new JdbcTypedValue(fieldValue, jdbcTypeCode);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JdbcSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JdbcSerDeser.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JdbcSerDeser.java
index 3d4f793..ffc5ea2 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JdbcSerDeser.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JdbcSerDeser.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.storage.jdbc.schema.serializer;
 
+import org.apache.eagle.log.entity.meta.Qualifier;
 import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition;
 import org.apache.torque.util.JdbcTypedValue;
 
@@ -26,6 +27,21 @@ import java.sql.ResultSet;
  * @since 3/26/15
  */
 public interface JdbcSerDeser<T> {
-    T readValue(ResultSet result, String fieldName, JdbcEntityDefinition JdbcEntityDefinition) throws IOException;
-    JdbcTypedValue getJdbcTypedValue(Object fieldValue, Class<?> fieldType);
+    /**
+     * for entity read
+     * convert value from jdbc storage into user-typed object
+     * @param result
+     * @return
+     * @throws IOException
+     */
+    T toJavaTypedValue(ResultSet result, Class<?> fieldType, String fieldName, Qualifier qualifier) throws IOException;
+
+    /**
+     * for write entity
+     * convert user-typed fieldValue into fieldType-compatible value and persist that value into jdbc storage
+     * @param fieldValue
+     * @param fieldType
+     * @return
+     */
+    JdbcTypedValue toJdbcTypedValue(Object fieldValue, Class<?> fieldType, Qualifier qualifier);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JsonJdbcSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JsonJdbcSerDeser.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JsonJdbcSerDeser.java
index cece94d..5a934b8 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JsonJdbcSerDeser.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JsonJdbcSerDeser.java
@@ -16,7 +16,7 @@
  */
 package org.apache.eagle.storage.jdbc.schema.serializer;
 
-import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition;
+import org.apache.eagle.log.entity.meta.Qualifier;
 import org.apache.torque.util.JdbcTypedValue;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -33,21 +33,19 @@ public class JsonJdbcSerDeser<T extends Object> implements JdbcSerDeser<T> {
 
     @SuppressWarnings("unchecked")
     @Override
-    public T readValue(ResultSet result, String fieldName, JdbcEntityDefinition jdbcEntityDefinition) throws IOException {
+    public T toJavaTypedValue(ResultSet result, Class<?> fieldType, String fieldName, Qualifier qualifier) throws IOException {
         try {
             String jsonString = result.getString(fieldName);
-            return (T) objectMapper.readValue(jsonString, jdbcEntityDefinition.getColumnType(fieldName));
+            return (T) objectMapper.readValue(jsonString, fieldType);
         } catch (IOException e) {
            throw e;
         } catch (SQLException e) {
             throw new IOException(e);
-        } catch (NoSuchFieldException e) {
-            throw new IOException(e);
         }
     }
 
     @Override
-    public JdbcTypedValue getJdbcTypedValue(Object fieldValue, Class<?> fieldType) {
+    public JdbcTypedValue toJdbcTypedValue(Object fieldValue, Class<?> fieldType, Qualifier qualifier) {
         try {
             return new JdbcTypedValue(objectMapper.writeValueAsString(objectMapper.writeValueAsString(fieldValue)), Types.VARCHAR);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
index c0ba13c..0b4178e 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
@@ -23,7 +23,6 @@ import org.apache.eagle.log.entity.meta.EntityDefinition;
 import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
 import org.apache.eagle.log.entity.test.TestTimeSeriesAPIEntity;
 import org.apache.eagle.storage.DataStorageManager;
-import org.apache.eagle.storage.exception.IllegalDataStorageTypeException;
 import org.apache.eagle.storage.exception.QueryCompileException;
 import org.apache.eagle.storage.operation.CompiledQuery;
 import org.apache.eagle.storage.operation.RawQuery;
@@ -35,60 +34,77 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 
 public class TestJdbcStorage {
-
     JdbcStorage storage;
     EntityDefinition entityDefinition;
+    long baseTimestamp;
     final static Logger LOG = LoggerFactory.getLogger(TestJdbcStorage.class);
 
     @Before
-    public void setUp() throws IOException, IllegalAccessException, InstantiationException, IllegalDataStorageTypeException {
+    public void setUp() throws Exception {
+        entityDefinition = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestTimeSeriesAPIEntity.class);
+        entityDefinition.setTags(new String[]{"cluster","datacenter","random"});
         storage = (JdbcStorage) DataStorageManager.getDataStorageByEagleConfig();
         storage.init();
-        entityDefinition = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestTimeSeriesAPIEntity.class);
+        GregorianCalendar gc = new GregorianCalendar();
+        gc.clear();
+        gc.set(2014, 1, 6, 1, 40, 12);
+        gc.setTimeZone(TimeZone.getTimeZone("UTC"));
+        baseTimestamp = gc.getTime().getTime();
+        System.out.println("timestamp:" + baseTimestamp);
     }
 
-    //@Test
+    @Test
     public void testReadBySimpleQuery() throws QueryCompileException, IOException {
         RawQuery rawQuery = new RawQuery();
-        rawQuery.setQuery("TestTimeSeriesAPIEntity[]{*}");
-        rawQuery.setStartTime("2014-01-06 01:40:02");
-        rawQuery.setEndTime("2016-01-06 01:40:02");
+        rawQuery.setQuery("TestTimeSeriesAPIEntity[@cluster=\"c4ut\"]{*}");
+        System.out.println(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp));
+        rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp));
+        rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(baseTimestamp+2000));
         rawQuery.setPageSize(1000);
         CompiledQuery query = new CompiledQuery(rawQuery);
         QueryResult<TestTimeSeriesAPIEntity> result = storage.query(query, entityDefinition);
         Assert.assertNotNull(result);
     }
 
-//    @Test
+    @Test
     public void testReadByComplexQuery() throws QueryCompileException, IOException {
         RawQuery rawQuery = new RawQuery();
-        rawQuery.setQuery("TestTimeSeriesAPIEntity[@cluster=\"cluster\" AND @field4 > 1000 AND @field7 CONTAINS \"subtext\" OR @jobID =\"jobID\" ]{@field1,@field2}");
-        rawQuery.setStartTime("2015-01-06 01:40:02");
-        rawQuery.setEndTime("2016-01-06 01:40:02");
+        rawQuery.setQuery("TestTimeSeriesAPIEntity[@cluster=\"c4ut\" AND @field4 > 1000 OR @datacenter =\"d4ut\" ]{@field1,@field2}");
+        rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp));
+        rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp + 2000));
+        rawQuery.setPageSize(1000);
+        CompiledQuery query = new CompiledQuery(rawQuery);
+        storage.query(query,entityDefinition);
+    }
+
+    @Test
+    public void testReadByComplexQueryWithLike() throws QueryCompileException, IOException {
+        RawQuery rawQuery = new RawQuery();
+        rawQuery.setQuery("TestTimeSeriesAPIEntity[@cluster=\"c4ut\" AND @field4 > 1000 AND @field7 CONTAINS \"99404f47e309\" OR @datacenter =\"d4ut\" ]{@field1,@field2}");
+        rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp));
+        rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp + 2000));
         rawQuery.setPageSize(1000);
         CompiledQuery query = new CompiledQuery(rawQuery);
         storage.query(query,entityDefinition);
     }
 
-    //@Test
+    @Test
     public void testWrite() throws IOException {
         List<TestTimeSeriesAPIEntity> entityList = new ArrayList<TestTimeSeriesAPIEntity>();
-
         int i= 0;
         while( i++ < 1000){
-            entityList.add(newInstance());
+            TestTimeSeriesAPIEntity entity = newInstance();
+
+            entityList.add(entity);
         }
         ModifyResult<String> result = storage.create(entityList, entityDefinition);
         Assert.assertTrue(result.getSize() > 0);
     }
 
-    //@Test
+    @Test
     public void testWriteAndRead() throws IOException, QueryCompileException {
         // record insert init time
         long startTime = System.currentTimeMillis();
@@ -103,18 +119,18 @@ public class TestJdbcStorage {
         // record insertion finish time
         long endTime = System.currentTimeMillis();
 
-            // init read in time range [startTime, endTime)
+        // init read in time range [startTime, endTime)
         RawQuery rawQuery = new RawQuery();
         rawQuery.setQuery("TestTimeSeriesAPIEntity[]{*}");
         rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime));
-        rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime+1));
-        rawQuery.setPageSize(1000000);
+        rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime+1000));
+        rawQuery.setPageSize(10000);
         CompiledQuery query = new CompiledQuery(rawQuery);
         QueryResult queryResult = storage.query(query, entityDefinition);
         Assert.assertTrue(queryResult.getSize() >= 1000);
     }
 
-    //@Test
+    @Test
     public void testWriteAndAggregation() throws IOException, QueryCompileException {
         // record insert init time
         long startTime = System.currentTimeMillis();
@@ -133,14 +149,14 @@ public class TestJdbcStorage {
         RawQuery rawQuery = new RawQuery();
         rawQuery.setQuery("TestTimeSeriesAPIEntity[]<@cluster,@datacenter>{count,max(@field1),min(@field2),sum(@field3)}");
         rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime));
-        rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime));
+        rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime+1000));
         rawQuery.setPageSize(1000000);
         CompiledQuery query = new CompiledQuery(rawQuery);
         QueryResult queryResult = storage.query(query, entityDefinition);
         Assert.assertTrue(queryResult.getSize() >= 1);
     }
 
-    //@Test
+    @Test
     public void testWriteAndDelete() throws IOException, QueryCompileException {
         // record insert init time
         long startTime = System.currentTimeMillis();
@@ -158,15 +174,15 @@ public class TestJdbcStorage {
         // delete in time range [startTime, endTime)
         RawQuery rawQuery = new RawQuery();
         rawQuery.setQuery("TestTimeSeriesAPIEntity[]{*}");
-        rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime));
-        rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime));
+        rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime-1000));
+        rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime+1000));
         rawQuery.setPageSize(1000000);
         CompiledQuery query = new CompiledQuery(rawQuery);
         ModifyResult<String> queryResult = storage.delete(query, entityDefinition);
         Assert.assertTrue(queryResult.getSize() >= 1000);
     }
 
-    //@Test
+    @Test
     public void testWriteAndUpdate() throws IOException, QueryCompileException {
         // Write 1000 entities
         List<TestTimeSeriesAPIEntity> entityList = new ArrayList<TestTimeSeriesAPIEntity>();
@@ -190,7 +206,7 @@ public class TestJdbcStorage {
      *
      * @throws IOException
      */
-    //@Test
+//    @Test
     public void testWriterPerformance() throws IOException {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
@@ -221,13 +237,14 @@ public class TestJdbcStorage {
         instance.setTags(new HashMap<String, String>() {{
             put("cluster", "c4ut");
             put("datacenter", "d4ut");
+            put("random",UUID.randomUUID().toString());
         }});
         instance.setTimestamp(System.currentTimeMillis());
         return instance;
     }
 
     @Test
-    public void test() {
+    public void testInitSuccessfully() {
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/conn/TestConnectionFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/conn/TestConnectionFactory.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/conn/TestConnectionFactory.java
deleted file mode 100644
index 78b0210..0000000
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/conn/TestConnectionFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.jdbc.conn;
-
-import junit.framework.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-/**
- * @since 3/27/15
- */
-public class TestConnectionFactory {
-    final static Logger LOG = LoggerFactory.getLogger(TestConnectionFactory.class);
-
-//    @Test
-    public void testConnection(){
-        try {
-            Connection connection = ConnectionManagerFactory.getInstance().getConnection();
-            Statement statement = connection.createStatement();
-            ResultSet resultSet = statement.executeQuery("select 1");
-            Assert.assertTrue(resultSet.next());
-            Assert.assertEquals(1, resultSet.getInt(1));
-        } catch (SQLException e) {
-            LOG.error(e.getMessage(),e);
-            Assert.fail(e.getMessage());
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Test
-    public void test() {
-
-    }
-}



Mime
View raw message