gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject [10/13] gora git commit: Rename gora-cassandra-cql module into gora-cassandra module
Date Wed, 23 Aug 2017 22:16:41 GMT
http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
deleted file mode 100644
index 928370c..0000000
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
+++ /dev/null
@@ -1,836 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.gora.cassandra.serializers;
-
-import com.datastax.driver.core.querybuilder.BuiltStatement;
-import com.datastax.driver.core.querybuilder.Delete;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.querybuilder.Select;
-import com.datastax.driver.core.querybuilder.Update;
-import com.datastax.driver.mapping.annotations.UDT;
-import org.apache.avro.Schema;
-import org.apache.gora.cassandra.bean.CassandraKey;
-import org.apache.gora.cassandra.bean.ClusterKeyField;
-import org.apache.gora.cassandra.bean.Field;
-import org.apache.gora.cassandra.bean.KeySpace;
-import org.apache.gora.cassandra.bean.PartitionKeyField;
-import org.apache.gora.cassandra.query.CassandraQuery;
-import org.apache.gora.cassandra.store.CassandraMapping;
-import org.apache.gora.cassandra.store.CassandraStore;
-import org.apache.gora.query.Query;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class is used create Cassandra Queries.
- */
-class CassandraQueryFactory {
-
-  private static final Logger LOG = LoggerFactory.getLogger(CassandraQueryFactory.class);
-
-  /**
-   * This method returns the CQL query to create key space.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html
-   *
-   * @param mapping Cassandra Mapping {@link CassandraMapping}
-   * @return CQL Query
-   */
-  static String getCreateKeySpaceQuery(CassandraMapping mapping) {
-    KeySpace keySpace = mapping.getKeySpace();
-    StringBuilder stringBuffer = new StringBuilder();
-    stringBuffer.append("CREATE KEYSPACE IF NOT EXISTS ").append(keySpace.getName()).append(" WITH REPLICATION = { 'class' : ");
-    KeySpace.PlacementStrategy placementStrategy = keySpace.getPlacementStrategy();
-    stringBuffer.append("'").append(placementStrategy).append("'").append(", ").append("'");
-    switch (placementStrategy) {
-      case SimpleStrategy:
-        stringBuffer.append("replication_factor").append("'").append(" : ").append(keySpace.getReplicationFactor()).append(" }");
-        break;
-      case NetworkTopologyStrategy:
-        boolean isCommaNeeded = false;
-        for (Map.Entry<String, Integer> entry : keySpace.getDataCenters().entrySet()) {
-          if (isCommaNeeded) {
-            stringBuffer.append(", '");
-          }
-          stringBuffer.append(entry.getKey()).append("'").append(" : ").append(entry.getValue());
-          isCommaNeeded = true;
-        }
-        stringBuffer.append(" }");
-        break;
-    }
-    if (keySpace.isDurableWritesEnabled()) {
-      stringBuffer.append(" AND DURABLE_WRITES = ").append(keySpace.isDurableWritesEnabled());
-    }
-    return stringBuffer.toString();
-  }
-
-  /**
-   * This method returns the CQL query to table.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_table_r.html
-   * <p>
-   * Trick : To have a consistency of the order of the columns, first we append partition keys, second cluster keys and finally other columns.
-   * It's very much needed to follow the same order in other CRUD operations as well.
-   *
-   * @param mapping Cassandra mapping {@link CassandraMapping}
-   * @return CQL Query
-   */
-  static String getCreateTableQuery(CassandraMapping mapping) {
-    StringBuilder stringBuffer = new StringBuilder();
-    stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" (");
-    CassandraKey cassandraKey = mapping.getCassandraKey();
-    // appending Cassandra Persistent columns into db schema
-    processFieldsForCreateTableQuery(mapping.getFieldList(), false, stringBuffer);
-
-    if (cassandraKey != null) {
-      processFieldsForCreateTableQuery(cassandraKey.getFieldList(), true, stringBuffer);
-      List<PartitionKeyField> partitionKeys = cassandraKey.getPartitionKeyFields();
-      if (partitionKeys != null) {
-        stringBuffer.append(", PRIMARY KEY (");
-        boolean isCommaNeededToApply = false;
-        for (PartitionKeyField keyField : partitionKeys) {
-          if (isCommaNeededToApply) {
-            stringBuffer.append(",");
-          }
-          if (keyField.isComposite()) {
-            stringBuffer.append("(");
-            boolean isCommaNeededHere = false;
-            for (Field field : keyField.getFields()) {
-              if (isCommaNeededHere) {
-                stringBuffer.append(", ");
-              }
-              stringBuffer.append(field.getColumnName());
-              isCommaNeededHere = true;
-            }
-            stringBuffer.append(")");
-          } else {
-            stringBuffer.append(keyField.getColumnName());
-          }
-          isCommaNeededToApply = true;
-        }
-        stringBuffer.append(")");
-      }
-    }
-
-    stringBuffer.append(")");
-    boolean isWithNeeded = true;
-    if (Boolean.parseBoolean(mapping.getProperty("compactStorage"))) {
-      stringBuffer.append(" WITH COMPACT STORAGE ");
-      isWithNeeded = false;
-    }
-
-    String id = mapping.getProperty("id");
-    if (id != null) {
-      if (isWithNeeded) {
-        stringBuffer.append(" WITH ");
-      } else {
-        stringBuffer.append(" AND ");
-      }
-      stringBuffer.append("ID = '").append(id).append("'");
-      isWithNeeded = false;
-    }
-    if (cassandraKey != null) {
-      List<ClusterKeyField> clusterKeyFields = cassandraKey.getClusterKeyFields();
-      if (clusterKeyFields != null) {
-        if (isWithNeeded) {
-          stringBuffer.append(" WITH ");
-        } else {
-          stringBuffer.append(" AND ");
-        }
-        stringBuffer.append(" CLUSTERING ORDER BY (");
-        boolean isCommaNeededToApply = false;
-        for (ClusterKeyField keyField : clusterKeyFields) {
-          if (isCommaNeededToApply) {
-            stringBuffer.append(", ");
-          }
-          stringBuffer.append(keyField.getColumnName()).append(" ");
-          if (keyField.getOrder() != null) {
-            stringBuffer.append(keyField.getOrder());
-          }
-          isCommaNeededToApply = true;
-        }
-        stringBuffer.append(")");
-      }
-    }
-    return stringBuffer.toString();
-  }
-
-  private static void processFieldsForCreateTableQuery(List<Field> fields, boolean isCommaNeeded, StringBuilder stringBuilder) {
-    for (Field field : fields) {
-      if (isCommaNeeded) {
-        stringBuilder.append(", ");
-      }
-      stringBuilder.append(field.getColumnName()).append(" ").append(field.getType());
-      boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static"));
-      boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
-      if (isStaticColumn) {
-        stringBuilder.append(" STATIC");
-      }
-      if (isPrimaryKey) {
-        stringBuilder.append("  PRIMARY KEY ");
-      }
-      isCommaNeeded = true;
-    }
-  }
-
-  /**
-   * This method returns the CQL query to drop table.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html
-   *
-   * @param mapping Cassandra Mapping {@link CassandraMapping}
-   * @return CQL query
-   */
-  static String getDropTableQuery(CassandraMapping mapping) {
-    return "DROP TABLE IF EXISTS " + mapping.getKeySpace().getName() + "." + mapping.getCoreName();
-  }
-
-  /**
-   * This method returns the CQL query to drop key space.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_keyspace_r.html
-   *
-   * @param mapping Cassandra Mapping {@link CassandraMapping}
-   * @return CQL query
-   */
-  static String getDropKeySpaceQuery(CassandraMapping mapping) {
-    return "DROP KEYSPACE IF EXISTS " + mapping.getKeySpace().getName();
-  }
-
-  /**
-   * This method returns the CQL query to truncate (removes all the data) in the table.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/truncate_r.html
-   *
-   * @param mapping Cassandra Mapping {@link CassandraMapping}
-   * @return CQL query
-   */
-  static String getTruncateTableQuery(CassandraMapping mapping) {
-    return QueryBuilder.truncate(mapping.getKeySpace().getName(), mapping.getCoreName()).getQueryString();
-  }
-
-  /**
-   * This method return the CQL query to insert data in to the table.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/insert_r.html
-   *
-   * @param mapping Cassandra Mapping {@link CassandraMapping}
-   * @param fields  available fields
-   * @return CQL Query
-   */
-  static String getInsertDataQuery(CassandraMapping mapping, List<String> fields) {
-    String[] columnNames = getColumnNames(mapping, fields);
-    String[] objects = new String[fields.size()];
-    Arrays.fill(objects, "?");
-    return QueryBuilder.insertInto(mapping.getKeySpace().getName(), mapping.getCoreName()).values(columnNames, objects).getQueryString();
-  }
-
-  /**
-   * This method return the CQL query to delete a persistent in the table.
-   * refer : http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlDelete.html
-   *
-   * @param mapping Cassandra Mapping {@link CassandraMapping}
-   * @param fields  filed list to be deleted
-   * @return CQL Query
-   */
-  static String getDeleteDataQuery(CassandraMapping mapping, List<String> fields) {
-    String[] columnNames = getColumnNames(mapping, fields);
-    String[] objects = new String[fields.size()];
-    Arrays.fill(objects, "?");
-    Delete delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName());
-    return processKeys(columnNames, delete);
-  }
-
-  private static String processKeys(String[] columnNames, BuiltStatement delete) {
-    BuiltStatement query = null;
-    boolean isWhereNeeded = true;
-    for (String columnName : columnNames) {
-      if (isWhereNeeded) {
-        if (delete instanceof Delete) {
-          query = ((Delete) delete).where(QueryBuilder.eq(columnName, "?"));
-        } else {
-          query = ((Select) delete).where(QueryBuilder.eq(columnName, "?"));
-        }
-        isWhereNeeded = false;
-      } else {
-        if (delete instanceof Delete) {
-          query = ((Delete.Where) query).and(QueryBuilder.eq(columnName, "?"));
-        } else {
-          query = ((Select.Where) query).and(QueryBuilder.eq(columnName, "?"));
-        }
-      }
-    }
-    return query != null ? query.getQueryString() : null;
-  }
-
-  /**
-   * This method returns the CQL Select query to retrieve data from the table.
-   * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html
-   *
-   * @param mapping   Cassandra Mapping {@link CassandraMapping}
-   * @param keyFields key fields
-   * @return CQL Query
-   */
-  static String getSelectObjectQuery(CassandraMapping mapping, List<String> keyFields) {
-    Select select = QueryBuilder.select().from(mapping.getKeySpace().getName(), mapping.getCoreName());
-    if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
-      select.allowFiltering();
-    }
-    String[] columnNames = getColumnNames(mapping, keyFields);
-    return processKeys(columnNames, select);
-  }
-
-  /**
-   * This method returns CQL Select query to retrieve data from the table with given fields.
-   * This method is used for Avro Serialization
-   * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html
-   *
-   * @param mapping   Cassandra Mapping {@link CassandraMapping}
-   * @param fields    Given fields to retrieve
-   * @param keyFields key fields
-   * @return CQL Query
-   */
-  static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, String[] fields, List<String> keyFields) {
-    Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName());
-    if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
-      select.allowFiltering();
-    }
-    String[] columnNames = getColumnNames(mapping, keyFields);
-    return processKeys(columnNames, select);
-  }
-
-  /**
-   * This method returns CQL Select query to retrieve data from the table with given fields.
-   * This method is used for Native Serialization
-   * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html
-   *
-   * @param mapping Cassandra Mapping {@link CassandraMapping}
-   * @param fields  Given fields to retrieve
-   * @return CQL Query
-   */
-  static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, String[] fields) {
-    String cqlQuery = null;
-    String[] columnNames = getColumnNames(mapping, Arrays.asList(fields));
-    Select select = QueryBuilder.select(columnNames).from(mapping.getKeySpace().getName(), mapping.getCoreName());
-    if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
-      select.allowFiltering();
-    }
-    CassandraKey cKey = mapping.getCassandraKey();
-    if (cKey != null) {
-      Select.Where query = null;
-      boolean isWhereNeeded = true;
-      for (Field field : cKey.getFieldList()) {
-        if (isWhereNeeded) {
-          query = select.where(QueryBuilder.eq(field.getColumnName(), "?"));
-          isWhereNeeded = false;
-        } else {
-          query = query.and(QueryBuilder.eq(field.getColumnName(), "?"));
-        }
-      }
-      cqlQuery = query != null ? query.getQueryString() : null;
-    } else {
-      for (Field field : mapping.getFieldList()) {
-        boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
-        if (isPrimaryKey) {
-          cqlQuery = select.where(QueryBuilder.eq(field.getColumnName(), "?")).getQueryString();
-          break;
-        }
-      }
-    }
-    return cqlQuery;
-  }
-
-
-  /**
-   * This method returns CQL Query for execute method. This CQL contains a Select Query to retrieve data from the table
-   *
-   * @param mapping        Cassandra Mapping {@link CassandraMapping}
-   * @param cassandraQuery Query {@link CassandraQuery}
-   * @param objects        object list
-   * @return CQL Query
-   */
-  static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, String[] fields) {
-    long limit = cassandraQuery.getLimit();
-    Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName());
-    if (limit > 0) {
-      select = select.limit((int) limit);
-    }
-    if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
-      select.allowFiltering();
-    }
-    return processQuery(cassandraQuery, select, mapping, objects);
-  }
-
-  private static String processQuery(Query cassandraQuery, BuiltStatement select, CassandraMapping mapping, List<Object> objects) {
-    String primaryKey = null;
-    BuiltStatement query = null;
-    Object startKey = cassandraQuery.getStartKey();
-    Object endKey = cassandraQuery.getEndKey();
-    Object key = cassandraQuery.getKey();
-    boolean isWhereNeeded = true;
-    if (key != null) {
-      if (mapping.getCassandraKey() != null) {
-        ArrayList<String> cassandraKeys = new ArrayList<>();
-        ArrayList<Object> cassandraValues = new ArrayList<>();
-        AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
-        String[] columnKeys = getColumnNames(mapping, cassandraKeys);
-        for (int i = 0; i < cassandraKeys.size(); i++) {
-          if (isWhereNeeded) {
-            if (select instanceof Select) {
-              query = ((Select) select).where(QueryBuilder.eq(columnKeys[i], "?"));
-            } else if (select instanceof Delete) {
-              query = ((Delete) select).where(QueryBuilder.eq(columnKeys[i], "?"));
-            } else {
-              query = ((Update.Assignments) select).where(QueryBuilder.eq(columnKeys[i], "?"));
-            }
-            objects.add(cassandraValues.get(i));
-            isWhereNeeded = false;
-          } else {
-            if (select instanceof Select) {
-              query = ((Select.Where) query).and(QueryBuilder.eq(columnKeys[i], "?"));
-            } else if (select instanceof Delete) {
-              query = ((Delete.Where) query).and(QueryBuilder.eq(columnKeys[i], "?"));
-            } else {
-              query = ((Update.Where) query).and(QueryBuilder.eq(columnKeys[i], "?"));
-            }
-            objects.add(cassandraValues.get(i));
-          }
-        }
-      } else {
-        primaryKey = getPKey(mapping.getFieldList());
-        if (select instanceof Select) {
-          query = ((Select) select).where(QueryBuilder.eq(primaryKey, "?"));
-        } else if (select instanceof Delete) {
-          query = ((Delete) select).where(QueryBuilder.eq(primaryKey, "?"));
-        } else {
-          query = ((Update.Assignments) select).where(QueryBuilder.eq(primaryKey, "?"));
-        }
-        objects.add(key);
-      }
-    } else {
-      if (startKey != null) {
-        if (mapping.getCassandraKey() != null) {
-          ArrayList<String> cassandraKeys = new ArrayList<>();
-          ArrayList<Object> cassandraValues = new ArrayList<>();
-          AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues);
-          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
-          for (int i = 0; i < cassandraKeys.size(); i++) {
-            if (isWhereNeeded) {
-              if (select instanceof Select) {
-                query = ((Select) select).where(QueryBuilder.gte(columnKeys[i], "?"));
-              } else if (select instanceof Delete) {
-                /*
-                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
-                 */
-                throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
-              } else {
-                query = ((Update.Assignments) select).where(QueryBuilder.gte(columnKeys[i], "?"));
-              }
-              objects.add(cassandraValues.get(i));
-              isWhereNeeded = false;
-            } else {
-              if (select instanceof Select) {
-                query = ((Select.Where) query).and(QueryBuilder.gte(columnKeys[i], "?"));
-              } else if (select instanceof Delete) {
-                       /*
-                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
-                 */
-                throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
-              } else {
-                query = ((Update.Where) query).and(QueryBuilder.gte(columnKeys[i], "?"));
-              }
-              objects.add(cassandraValues.get(i));
-            }
-          }
-        } else {
-          primaryKey = getPKey(mapping.getFieldList());
-          if (select instanceof Select) {
-            query = ((Select) select).where(QueryBuilder.gte(primaryKey, "?"));
-          } else if (select instanceof Delete) {
-                           /*
-                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
-                 */
-            throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
-          } else {
-            query = ((Update.Assignments) select).where(QueryBuilder.gte(primaryKey, "?"));
-          }
-          objects.add(startKey);
-          isWhereNeeded = false;
-        }
-      }
-      if (endKey != null) {
-        if (mapping.getCassandraKey() != null) {
-          ArrayList<String> cassandraKeys = new ArrayList<>();
-          ArrayList<Object> cassandraValues = new ArrayList<>();
-          AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues);
-          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
-          for (int i = 0; i < cassandraKeys.size(); i++) {
-            if (isWhereNeeded) {
-              if (select instanceof Select) {
-                query = ((Select) select).where(QueryBuilder.lte(columnKeys[i], "?"));
-              } else if (select instanceof Delete) {
-                               /*
-                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
-                 */
-                throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
-              } else {
-                query = ((Update.Assignments) select).where(QueryBuilder.lte(columnKeys[i], "?"));
-              }
-              objects.add(cassandraValues.get(i));
-              isWhereNeeded = false;
-            } else {
-              if (select instanceof Select) {
-                query = ((Select.Where) query).and(QueryBuilder.lte(columnKeys[i], "?"));
-              } else if (select instanceof Delete) {
-                               /*
-                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
-                 */
-                throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
-              } else {
-                query = ((Update.Where) query).and(QueryBuilder.lte(columnKeys[i], "?"));
-              }
-              objects.add(cassandraValues.get(i));
-            }
-          }
-        } else {
-          primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList());
-          if (isWhereNeeded) {
-            if (select instanceof Select) {
-              query = ((Select) select).where(QueryBuilder.lte(primaryKey, "?"));
-            } else if (select instanceof Delete) {
-                             /*
-                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
-                 */
-              throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
-            } else {
-              query = ((Update.Assignments) select).where(QueryBuilder.lte(primaryKey, "?"));
-            }
-          } else {
-            if (select instanceof Select) {
-              query = ((Select.Where) query).and(QueryBuilder.lte(primaryKey, "?"));
-            } else if (select instanceof Delete) {
-              /*
-                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
-                */
-              throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
-            } else {
-              query = ((Update.Where) query).and(QueryBuilder.lte(primaryKey, "?"));
-            }
-          }
-          objects.add(endKey);
-        }
-      }
-    }
-    if (startKey == null && endKey == null && key == null) {
-      return select.getQueryString();
-    }
-    return query != null ? query.getQueryString() : null;
-  }
-
-  private static String[] getColumnNames(CassandraMapping mapping, List<String> fields) {
-    ArrayList<String> columnNames = new ArrayList<>();
-    for (String field : fields) {
-      Field fieldBean = mapping.getFieldFromFieldName(field);
-      CassandraKey cassandraKey = mapping.getCassandraKey();
-      Field keyBean = null;
-      if (cassandraKey != null) {
-        keyBean = cassandraKey.getFieldFromFieldName(field);
-      }
-      if (fieldBean != null) {
-        columnNames.add(fieldBean.getColumnName());
-      } else if (keyBean != null) {
-        columnNames.add(keyBean.getColumnName());
-      } else {
-        LOG.warn("{} field is ignored, couldn't find relevant field in the persistent mapping", field);
-      }
-    }
-    return columnNames.toArray(new String[0]);
-  }
-
-  private static String getPKey(List<Field> fields) {
-    for (Field field : fields) {
-      boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
-      if (isPrimaryKey) {
-        return field.getColumnName();
-      }
-    }
-    return null;
-  }
-
-  /**
-   * This method returns CQL Qeury for DeleteByQuery method.
-   * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlDelete.html
-   *
-   * @param mapping        Cassandra Mapping {@link CassandraMapping}
-   * @param cassandraQuery Cassandra Query {@link CassandraQuery}
-   * @param objects        field values
-   * @return CQL Query
-   */
-  static String getDeleteByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) {
-    String[] columns = null;
-    if (cassandraQuery.getFields() != null) {
-      columns = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
-    }
-    Delete delete;
-    if (columns != null) {
-      delete = QueryBuilder.delete(columns).from(mapping.getKeySpace().getName(), mapping.getCoreName());
-    } else {
-      delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName());
-    }
-    return processQuery(cassandraQuery, delete, mapping, objects);
-  }
-
-  /**
-   * This method returns the CQL Query for UpdateByQuery method
-   * refer : http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlUpdate.html
-   *
-   * @param mapping        Cassandra mapping {@link CassandraMapping}
-   * @param cassandraQuery Cassandra Query {@link CassandraQuery}
-   * @param objects        field Objects list
-   * @return CQL Query
-   */
-  static String getUpdateByQueryForAvro(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, Schema schema) {
-    Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName());
-    Update.Assignments updateAssignments = null;
-    if (cassandraQuery instanceof CassandraQuery) {
-      String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
-        for (String column : columnNames) {
-          updateAssignments = update.with(QueryBuilder.set(column, "?"));
-          Field field = mapping.getFieldFromColumnName(column);
-          Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field.getFieldName());
-          try {
-            Schema schemaField = schema.getField(field.getFieldName()).schema();
-            objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value, field));
-          } catch (NullPointerException e) {
-            throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + ".");
-          }
-        }
-    } else {
-      throw new RuntimeException("Please use Cassandra Query object to invoke, UpdateByQuery method.");
-    }
-    return processQuery(cassandraQuery, updateAssignments, mapping, objects);
-  }
-
-
-  /**
-   * This method returns the CQL Query for UpdateByQuery method
-   * refer : http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlUpdate.html
-   *
-   * @param mapping        Cassandra mapping {@link CassandraMapping}
-   * @param cassandraQuery Cassandra Query {@link CassandraQuery}
-   * @param objects        field Objects list
-   * @return CQL Query
-   */
-  static String getUpdateByQueryForNative(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) {
-    Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName());
-    Update.Assignments updateAssignments = null;
-    if (cassandraQuery instanceof CassandraQuery) {
-      String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
-        for (String column : columnNames) {
-          updateAssignments = update.with(QueryBuilder.set(column, "?"));
-          objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName()));
-        }
-    } else {
-      throw new RuntimeException("Please use Cassandra Query object to invoke, UpdateByQuery method.");
-    }
-    return processQuery(cassandraQuery, updateAssignments, mapping, objects);
-  }
-
-
-  private static void populateFieldsToQuery(Schema schema, StringBuilder builder) throws Exception {
-    switch (schema.getType()) {
-      case INT:
-        builder.append("int");
-        break;
-      case MAP:
-        builder.append("map<text,");
-        populateFieldsToQuery(schema.getValueType(), builder);
-        builder.append(">");
-        break;
-      case ARRAY:
-        builder.append("list<");
-        populateFieldsToQuery(schema.getElementType(), builder);
-        builder.append(">");
-        break;
-      case LONG:
-        builder.append("bigint");
-        break;
-      case FLOAT:
-        builder.append("float");
-        break;
-      case DOUBLE:
-        builder.append("double");
-        break;
-      case BOOLEAN:
-        builder.append("boolean");
-        break;
-      case BYTES:
-        builder.append("blob");
-        break;
-      case RECORD:
-        builder.append("frozen<").append(schema.getName()).append(">");
-        break;
-      case STRING:
-      case FIXED:
-      case ENUM:
-        builder.append("text");
-        break;
-      case UNION:
-        for (Schema unionElementSchema : schema.getTypes()) {
-          if (unionElementSchema.getType().equals(Schema.Type.RECORD)) {
-            String recordName = unionElementSchema.getName();
-            if (!builder.toString().contains(recordName)) {
-              builder.append("frozen<").append(recordName).append(">");
-            } else {
-              LOG.warn("Same Field Type can't be mapped recursively. This is not supported with Cassandra UDT types, Please use byte dataType for recursive mapping.");
-              throw new Exception("Same Field Type has mapped recursively");
-            }
-            break;
-          } else if (!unionElementSchema.getType().equals(Schema.Type.NULL)) {
-            populateFieldsToQuery(unionElementSchema, builder);
-            break;
-          }
-        }
-        break;
-    }
-  }
-
-  static void processRecord(Schema recordSchema, StringBuilder stringBuilder) {
-    boolean isCommaNeeded = false;
-    for (Schema.Field field : recordSchema.getFields()) {
-      if (isCommaNeeded) {
-        stringBuilder.append(", ");
-      }
-      String fieldName = field.name();
-      stringBuilder.append(fieldName).append(" ");
-      try {
-        populateFieldsToQuery(field.schema(), stringBuilder);
-        isCommaNeeded = true;
-      } catch (Exception e) {
-        int i = stringBuilder.indexOf(fieldName);
-        if (i != -1) {
-          stringBuilder.delete(i, i + fieldName.length());
-          isCommaNeeded = false;
-        }
-      }
-    }
-  }
-
-  static String getCreateUDTTypeForNative(CassandraMapping mapping, Class persistentClass, String udtType, String fieldName) throws NoSuchFieldException {
-    StringBuilder stringBuffer = new StringBuilder();
-    Class udtClass = persistentClass.getDeclaredField(fieldName).getType();
-    UDT annotation = (UDT) udtClass.getAnnotation(UDT.class);
-    if (annotation != null) {
-      stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" (");
-      boolean isCommaNeeded = false;
-      for (java.lang.reflect.Field udtField : udtClass.getDeclaredFields()) {
-        com.datastax.driver.mapping.annotations.Field fieldAnnotation = udtField.getDeclaredAnnotation(com.datastax.driver.mapping.annotations.Field.class);
-        if (fieldAnnotation != null) {
-          if (isCommaNeeded) {
-            stringBuffer.append(", ");
-          }
-          if (!fieldAnnotation.name().isEmpty()) {
-            stringBuffer.append(fieldAnnotation.name()).append(" ");
-          } else {
-            stringBuffer.append(udtField.getName()).append(" ");
-          }
-          stringBuffer.append(dataType(udtField, null));
-          isCommaNeeded = true;
-        }
-      }
-      stringBuffer.append(")");
-    } else {
-      throw new RuntimeException("");
-    }
-    return stringBuffer.toString();
-  }
-
-  static String getCreateUDTTypeForAvro(CassandraMapping mapping, String udtType, Schema fieldSchema) {
-    StringBuilder stringBuffer = new StringBuilder();
-    stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" (");
-    CassandraQueryFactory.processRecord(fieldSchema, stringBuffer);
-    stringBuffer.append(")");
-    return stringBuffer.toString();
-  }
-
-  private static String dataType(java.lang.reflect.Field field, Type fieldType) {
-    String type;
-    if (field != null) {
-      type = field.getType().getName();
-    } else {
-      type = fieldType.getTypeName();
-    }
-    String dataType;
-    String s = type;
-    if (s.equals("java.lang.String") || s.equals("java.lang.CharSequence")) {
-      dataType = "text";
-    } else if (s.equals("int") || s.equals("java.lang.Integer")) {
-      dataType = "int";
-    } else if (s.equals("double") || s.equals("java.lang.Double")) {
-      dataType = "double";
-    } else if (s.equals("float") || s.equals("java.lang.Float")) {
-      dataType = "float";
-    } else if (s.equals("boolean") || s.equals("java.lang.Boolean")) {
-      dataType = "boolean";
-    } else if (s.equals("java.util.UUID")) {
-      dataType = "uuid";
-    } else if (s.equals("java.lang.Long")) {
-      dataType = "bigint";
-    } else if (s.equals("java.math.BigDecimal")) {
-      dataType = "decimal";
-    } else if (s.equals("java.net.InetAddress")) {
-      dataType = "inet";
-    } else if (s.equals("java.math.BigInteger")) {
-      dataType = "varint";
-    } else if (s.equals("java.nio.ByteBuffer")) {
-      dataType = "blob";
-    } else if (s.contains("Map")) {
-      ParameterizedType mapType;
-      if (field != null) {
-        mapType = (ParameterizedType) field.getGenericType();
-      } else {
-        mapType = (ParameterizedType) fieldType;
-      }
-      Type value1 = mapType.getActualTypeArguments()[0];
-      Type value2 = mapType.getActualTypeArguments()[1];
-      dataType = "map<" + dataType(null, value1) + "," + dataType(null, value2) + ">";
-    } else if (s.contains("List")) {
-      ParameterizedType listType;
-      if (field != null) {
-        listType = (ParameterizedType) field.getGenericType();
-      } else {
-        listType = (ParameterizedType) fieldType;
-      }
-      Type value = listType.getActualTypeArguments()[0];
-      dataType = "list<" + dataType(null, value) + ">";
-    } else if (s.contains("Set")) {
-      ParameterizedType setType;
-      if (field != null) {
-        setType = (ParameterizedType) field.getGenericType();
-      } else {
-        setType = (ParameterizedType) fieldType;
-      }
-      Type value = setType.getActualTypeArguments()[0];
-      dataType = "set<" + dataType(null, value) + ">";
-    } else {
-      throw new RuntimeException("Unsupported Cassandra DataType");
-    }
-    return dataType;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
deleted file mode 100644
index 208428e..0000000
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.gora.cassandra.serializers;
-
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.KeyspaceMetadata;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.TableMetadata;
-import org.apache.gora.cassandra.bean.Field;
-import org.apache.gora.cassandra.store.CassandraClient;
-import org.apache.gora.cassandra.store.CassandraMapping;
-import org.apache.gora.cassandra.store.CassandraStore;
-import org.apache.gora.persistency.Persistent;
-import org.apache.gora.query.Query;
-import org.apache.gora.query.Result;
-import org.apache.gora.store.DataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-/**
- * This is the abstract Cassandra Serializer class.
- */
-public abstract class CassandraSerializer<K, T extends Persistent> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
-
-  protected Class<K> keyClass;
-
-  protected Class<T> persistentClass;
-
-  protected CassandraMapping mapping;
-
-  protected CassandraClient client;
-
-  protected String readConsistencyLevel;
-
-  protected String writeConsistencyLevel;
-
-  protected Map<String, String> userDefineTypeMaps;
-
-  CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) {
-    this.keyClass = keyClass;
-    this.persistentClass = persistantClass;
-    this.client = cc;
-    this.mapping = mapping;
-    this.readConsistencyLevel = client.getReadConsistencyLevel();
-    this.writeConsistencyLevel = client.getWriteConsistencyLevel();
-  }
-
-  /**
-   * This method returns the Cassandra Serializer according the Cassandra serializer property.
-   *
-   * @param cc        Cassandra Client
-   * @param type      Serialization type
-   * @param dataStore Cassandra DataStore
-   * @param mapping   Cassandra Mapping
-   * @param <K>       key class
-   * @param <T>       persistent class
-   * @return Serializer
-   */
-  public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) {
-    CassandraStore.SerializerType serType = type == null || type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH));
-    CassandraSerializer serializer;
-    switch (serType) {
-      case AVRO:
-        serializer = new AvroSerializer(cc, dataStore, mapping);
-        break;
-      case NATIVE:
-      default:
-        serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
-    }
-    return serializer;
-  }
-
-  /**
-   * In this method persistent class been analyzed to find inner records with UDT type, this method should call in every Cassandra serialization Constructor.
-   *
-   * @throws Exception
-   */
-  protected abstract void analyzePersistent() throws Exception;
-
-
-  public void createSchema() {
-    LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName());
-    this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping));
-    for (Map.Entry udtType : userDefineTypeMaps.entrySet()) {
-      LOG.debug("creating Cassandra User Define Type {}", udtType.getKey());
-      this.client.getSession().execute((String) udtType.getValue());
-    }
-    LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName());
-    this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping));
-  }
-
-  public void deleteSchema() {
-    LOG.debug("dropping Cassandra table {}", mapping.getCoreName());
-    this.client.getSession().execute(CassandraQueryFactory.getDropTableQuery(mapping));
-    LOG.debug("dropping Cassandra keyspace {}", mapping.getKeySpace().getName());
-    this.client.getSession().execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping));
-  }
-
-  public void close() {
-    this.client.close();
-  }
-
-  public void truncateSchema() {
-    LOG.debug("truncating Cassandra table {}", mapping.getCoreName());
-    this.client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
-  }
-
-  public boolean schemaExists() {
-    KeyspaceMetadata keyspace = this.client.getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName());
-    if (keyspace != null) {
-      TableMetadata table = keyspace.getTable(mapping.getCoreName());
-      return table != null;
-    } else {
-      return false;
-    }
-  }
-
-  protected String[] getFields() {
-    List<String> fields = new ArrayList<>();
-    for (Field field : mapping.getFieldList()) {
-      fields.add(field.getFieldName());
-    }
-    return fields.toArray(new String[0]);
-  }
-
-  /**
-   * Inserts the persistent Object
-   *
-   * @param key   key value
-   * @param value persistent value
-   */
-  public abstract void put(K key, T value);
-
-  /**
-   * Retrieves the persistent value according to the key
-   *
-   * @param key key value
-   * @return persistent value
-   */
-  public abstract T get(K key);
-
-  /**
-   * Deletes persistent value according to the key
-   *
-   * @param key key value
-   * @return isDeleted
-   */
-  public abstract boolean delete(K key);
-
-  /**
-   * Retrieves the persistent value according to the key and fields
-   *
-   * @param key    key value
-   * @param fields fields
-   * @return persistent value
-   */
-  public abstract T get(K key, String[] fields);
-
-  /**
-   * Executes the given query and returns the results.
-   *
-   * @param dataStore Cassandra data store
-   * @param query     Cassandra Query
-   * @return Cassandra Result
-   */
-  public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query);
-
-  /**
-   * Update the persistent objects
-   *
-   * @param query Cassandra Query
-   * @return isUpdated
-   */
-  public abstract boolean updateByQuery(Query query);
-
-  public long deleteByQuery(Query query) {
-    List<Object> objectArrayList = new ArrayList<>();
-    if (query.getKey() == null && query.getEndKey() == null && query.getStartKey() == null) {
-      if (query.getFields() == null) {
-        client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
-      } else {
-        LOG.error("Delete by Query is not supported for the Queries which didn't specify Query keys with fields.");
-      }
-    } else {
-      String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList);
-      ResultSet results;
-      SimpleStatement statement;
-      if (objectArrayList.size() == 0) {
-        statement = new SimpleStatement(cqlQuery);
-      } else {
-        statement = new SimpleStatement(cqlQuery, objectArrayList.toArray());
-      }
-      if (writeConsistencyLevel != null) {
-        statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
-      }
-      results = client.getSession().execute(statement);
-      LOG.debug("Delete by Query was applied : " + results.wasApplied());
-    }
-    LOG.info("Delete By Query method doesn't return the deleted element count.");
-    return 0;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
deleted file mode 100644
index bf28ee0..0000000
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.gora.cassandra.serializers;
-
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.datastax.driver.mapping.Result;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.gora.cassandra.bean.Field;
-import org.apache.gora.cassandra.query.CassandraResultSet;
-import org.apache.gora.cassandra.store.CassandraClient;
-import org.apache.gora.cassandra.store.CassandraMapping;
-import org.apache.gora.persistency.Persistent;
-import org.apache.gora.query.Query;
-import org.apache.gora.store.DataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * This Class contains the operation relates to Native Serialization.
- */
-class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
-
-  private static final Logger LOG = LoggerFactory.getLogger(NativeSerializer.class);
-
-  private Mapper<T> mapper;
-
-  NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
-    super(cassandraClient, keyClass, persistentClass, mapping);
-    try {
-      analyzePersistent();
-    } catch (Exception e) {
-      throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage());
-    }
-    this.createSchema();
-    MappingManager mappingManager = new MappingManager(cassandraClient.getSession());
-    mapper = mappingManager.mapper(persistentClass);
-    if (cassandraClient.getWriteConsistencyLevel() != null) {
-      mapper.setDefaultDeleteOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getWriteConsistencyLevel())));
-      mapper.setDefaultSaveOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getWriteConsistencyLevel())));
-    }
-    if (cassandraClient.getReadConsistencyLevel() != null) {
-      mapper.setDefaultGetOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getReadConsistencyLevel())));
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param key
-   * @param value
-   */
-  @Override
-  public void put(Object key, Persistent value) {
-    LOG.debug("Object is saved with key : {} and value : {}", key, value);
-    mapper.save((T) value);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param key
-   * @return
-   */
-  @Override
-  public T get(Object key) {
-    T object = mapper.get(key);
-    if (object != null) {
-      LOG.debug("Object is found for key : {}", key);
-    } else {
-      LOG.debug("Object is not found for key : {}", key);
-    }
-    return object;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param key
-   * @return
-   */
-  @Override
-  public boolean delete(Object key) {
-    LOG.debug("Object is deleted for key : {}", key);
-    mapper.delete(key);
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param key
-   * @param fields
-   * @return
-   */
-  @Override
-  public Persistent get(Object key, String[] fields) {
-    if (fields == null) {
-      fields = getFields();
-    }
-    String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields);
-    SimpleStatement statement = new SimpleStatement(cqlQuery, key);
-    if (readConsistencyLevel != null) {
-      statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
-    }
-    ResultSet results = client.getSession().execute(statement);
-    Result<T> objects = mapper.map(results);
-    List<T> objectList = objects.all();
-    if (objectList != null) {
-      LOG.debug("Object is found for key : {}", key);
-      return objectList.get(0);
-    }
-    LOG.debug("Object is not found for key : {}", key);
-    return null;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws Exception
-   */
-  @Override
-  protected void analyzePersistent() throws Exception {
-    userDefineTypeMaps = new HashMap<>();
-    for (Field field : mapping.getFieldList()) {
-      String fieldType = field.getType();
-      if (fieldType.contains("frozen")) {
-        String udtType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">"));
-        String createQuery = CassandraQueryFactory.getCreateUDTTypeForNative(mapping, persistentClass, udtType, field.getFieldName());
-        userDefineTypeMaps.put(udtType, createQuery);
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param query
-   * @return
-   */
-  @Override
-  public boolean updateByQuery(Query query) {
-    List<Object> objectArrayList = new ArrayList<>();
-    String cqlQuery = CassandraQueryFactory.getUpdateByQueryForNative(mapping, query, objectArrayList);
-    ResultSet results;
-    SimpleStatement statement;
-    if (objectArrayList.size() == 0) {
-      statement = new SimpleStatement(cqlQuery);
-    } else {
-      statement = new SimpleStatement(cqlQuery, objectArrayList.toArray());
-    }
-    if (writeConsistencyLevel != null) {
-      statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
-    }
-    results = client.getSession().execute(statement);
-    return results.wasApplied();
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param dataStore
-   * @param query
-   * @return
-   */
-  @Override
-  public org.apache.gora.query.Result execute(DataStore dataStore, Query query) {
-    List<Object> objectArrayList = new ArrayList<>();
-    String[] fields = query.getFields();
-    if (fields != null) {
-      fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
-    } else {
-      fields = mapping.getAllFieldsIncludingKeys();
-    }
-    CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
-    String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields);
-    ResultSet results;
-    if (objectArrayList.size() == 0) {
-      results = client.getSession().execute(cqlQuery);
-    } else {
-      results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
-    }
-    Result<T> objects = mapper.map(results);
-    Iterator iterator = objects.iterator();
-    while (iterator.hasNext()) {
-      T result = (T) iterator.next();
-      K key = getKey(result);
-      cassandraResult.addResultElement(key, result);
-    }
-    return cassandraResult;
-  }
-
-  private K getKey(T object) {
-    String keyField = null;
-    for (Field field : mapping.getFieldList()) {
-      boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
-      if (isPrimaryKey) {
-        keyField = field.getFieldName();
-        break;
-      }
-    }
-    K key;
-    Method keyMethod = null;
-    try {
-      for (Method method : this.persistentClass.getMethods()) {
-        if (method.getName().equalsIgnoreCase("get" + keyField)) {
-          keyMethod = method;
-        }
-      }
-      key = (K) keyMethod.invoke(object);
-    } catch (Exception e) {
-      try {
-        key = (K) this.persistentClass.getField(keyField).get(object);
-      } catch (Exception e1) {
-        throw new RuntimeException("Field" + keyField + " is not accessible in " + persistentClass + " : " + e1.getMessage());
-      }
-    }
-    return key;
-  }
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
deleted file mode 100644
index ce1e3e7..0000000
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains Cassandra store related util classes for serializer.
- */
-package org.apache.gora.cassandra.serializers;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
deleted file mode 100644
index f672884..0000000
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
+++ /dev/null
@@ -1,535 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.gora.cassandra.store;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.HostDistance;
-import com.datastax.driver.core.PoolingOptions;
-import com.datastax.driver.core.ProtocolOptions;
-import com.datastax.driver.core.ProtocolVersion;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.TypeCodec;
-import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
-import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
-import com.datastax.driver.core.policies.DefaultRetryPolicy;
-import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
-import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
-import com.datastax.driver.core.policies.FallthroughRetryPolicy;
-import com.datastax.driver.core.policies.LatencyAwarePolicy;
-import com.datastax.driver.core.policies.LoggingRetryPolicy;
-import com.datastax.driver.core.policies.RoundRobinPolicy;
-import com.datastax.driver.core.policies.TokenAwarePolicy;
-import com.datastax.driver.extras.codecs.arrays.DoubleArrayCodec;
-import com.datastax.driver.extras.codecs.arrays.FloatArrayCodec;
-import com.datastax.driver.extras.codecs.arrays.IntArrayCodec;
-import com.datastax.driver.extras.codecs.arrays.LongArrayCodec;
-import com.datastax.driver.extras.codecs.arrays.ObjectArrayCodec;
-import com.datastax.driver.extras.codecs.date.SimpleDateCodec;
-import com.datastax.driver.extras.codecs.date.SimpleTimestampCodec;
-import com.datastax.driver.extras.codecs.jdk8.OptionalCodec;
-import org.apache.gora.cassandra.bean.Field;
-import org.jdom.Document;
-import org.jdom.Element;
-import org.jdom.JDOMException;
-import org.jdom.input.SAXBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-
-/**
- * This class provides the Cassandra Client Connection.
- * Initialize the Cassandra Connection according to the Properties.
- */
-public class CassandraClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
-
-  private Cluster cluster;
-
-  private Session session;
-
-  private CassandraMapping mapping;
-
-  private String readConsistencyLevel;
-
-  private String writeConsistencyLevel;
-
-  public Session getSession() {
-    return session;
-  }
-
-  public Cluster getCluster() {
-    return cluster;
-  }
-
-  void initialize(Properties properties, CassandraMapping mapping) throws Exception {
-    Cluster.Builder builder = Cluster.builder();
-    List<String> codecs = readCustomCodec(properties);
-    builder = populateSettings(builder, properties);
-    this.mapping = mapping;
-    this.cluster = builder.build();
-    if (codecs != null) {
-      registerCustomCodecs(codecs);
-    }
-    readConsistencyLevel = properties.getProperty(CassandraStoreParameters.READ_CONSISTENCY_LEVEL);
-    writeConsistencyLevel = properties.getProperty(CassandraStoreParameters.WRITE_CONSISTENCY_LEVEL);
-    registerOptionalCodecs();
-    this.session = this.cluster.connect();
-  }
-
-  private void registerOptionalCodecs() {
-    // Optional Codecs for natives
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.ascii()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.bigint()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.blob()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cboolean()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cdouble()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cfloat()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cint()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.counter()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.date()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.decimal()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.inet()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.smallInt()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.time()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.timestamp()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.timeUUID()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.tinyInt()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.varint()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.varchar()));
-    this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.uuid()));
-    // Optional Array Codecs
-    this.cluster.getConfiguration().getCodecRegistry().register(new IntArrayCodec());
-    this.cluster.getConfiguration().getCodecRegistry().register(new DoubleArrayCodec());
-    this.cluster.getConfiguration().getCodecRegistry().register(new FloatArrayCodec());
-    this.cluster.getConfiguration().getCodecRegistry().register(new LongArrayCodec());
-    this.cluster.getConfiguration().getCodecRegistry().register(new ObjectArrayCodec<>(
-            DataType.list(DataType.varchar()),
-            String[].class,
-            TypeCodec.varchar()));
-    // Optional Time Codecs
-    this.cluster.getConfiguration().getCodecRegistry().register(new SimpleDateCodec());
-    this.cluster.getConfiguration().getCodecRegistry().register(new SimpleTimestampCodec());
-
-    for (Field field : this.mapping.getFieldList()) {
-      String columnType = field.getType().toLowerCase(Locale.ENGLISH);
-      //http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cql_data_types_c.html
-      if (columnType.contains("list")) {
-        columnType = columnType.substring(columnType.indexOf("<") + 1, columnType.indexOf(">"));
-        this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.list(getTypeCodec(columnType))));
-      } else if (columnType.contains("set")) {
-        columnType = columnType.substring(columnType.indexOf("<") + 1, columnType.indexOf(">"));
-        this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.set(getTypeCodec(columnType))));
-      } else if (columnType.contains("map")) {
-        String[] columnTypes = columnType.substring(columnType.indexOf("<") + 1, columnType.indexOf(">")).split(",");
-        this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.map(TypeCodec.set(getTypeCodec(columnTypes[0])), TypeCodec.set(getTypeCodec(columnTypes[1])))));
-      }
-    }
-  }
-
-  private TypeCodec getTypeCodec(String columnType) {
-    TypeCodec typeCodec;
-    switch (columnType) {
-      case "ascii":
-        typeCodec = TypeCodec.ascii();
-        break;
-      case "bigint":
-        typeCodec = TypeCodec.bigint();
-        break;
-      case "blob":
-        typeCodec = TypeCodec.blob();
-        break;
-      case "boolean":
-        typeCodec = TypeCodec.cboolean();
-        break;
-      case "counter":
-        typeCodec = TypeCodec.counter();
-        break;
-      case "date":
-        typeCodec = TypeCodec.date();
-        break;
-      case "decimal":
-        typeCodec = TypeCodec.decimal();
-        break;
-      case "double":
-        typeCodec = TypeCodec.cdouble();
-        break;
-      case "float":
-        typeCodec = TypeCodec.cfloat();
-        break;
-      case "inet":
-        typeCodec = TypeCodec.inet();
-        break;
-      case "int":
-        typeCodec = TypeCodec.cint();
-        break;
-      case "smallint":
-        typeCodec = TypeCodec.smallInt();
-        break;
-      case "time":
-        typeCodec = TypeCodec.time();
-        break;
-      case "timestamp":
-        typeCodec = TypeCodec.timestamp();
-        break;
-      case "timeuuid":
-        typeCodec = TypeCodec.timeUUID();
-        break;
-      case "tinyint":
-        typeCodec = TypeCodec.tinyInt();
-        break;
-      case "uuid":
-        typeCodec = TypeCodec.uuid();
-        break;
-      case "varint":
-        typeCodec = TypeCodec.varint();
-        break;
-      case "varchar":
-      case "text":
-        typeCodec = TypeCodec.varchar();
-        break;
-      default:
-        LOG.error("Unsupported Cassandra datatype: {} ", columnType);
-        throw new RuntimeException("Unsupported Cassandra datatype: " + columnType);
-    }
-    return typeCodec;
-  }
-
-  private Cluster.Builder populateSettings(Cluster.Builder builder, Properties properties) {
-    String serversParam = properties.getProperty(CassandraStoreParameters.CASSANDRA_SERVERS);
-    String[] servers = serversParam.split(",");
-    for (String server : servers) {
-      builder = builder.addContactPoint(server);
-    }
-    String portProp = properties.getProperty(CassandraStoreParameters.PORT);
-    if (portProp != null) {
-      builder = builder.withPort(Integer.parseInt(portProp));
-    }
-    String clusterNameProp = properties.getProperty(CassandraStoreParameters.CLUSTER_NAME);
-    if (clusterNameProp != null) {
-      builder = builder.withClusterName(clusterNameProp);
-    }
-    String compressionProp = properties.getProperty(CassandraStoreParameters.COMPRESSION);
-    if (compressionProp != null) {
-      builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compressionProp));
-    }
-    builder = this.populateCredentials(properties, builder);
-    builder = this.populateLoadBalancingProp(properties, builder);
-    String enableJMXProp = properties.getProperty(CassandraStoreParameters.ENABLE_JMX_REPORTING);
-    if (!Boolean.parseBoolean(enableJMXProp)) {
-      builder = builder.withoutJMXReporting();
-    }
-    String enableMetricsProp = properties.getProperty(CassandraStoreParameters.ENABLE_METRICS);
-    if (!Boolean.parseBoolean(enableMetricsProp)) {
-      builder = builder.withoutMetrics();
-    }
-    builder = this.populatePoolingSettings(properties, builder);
-    String versionProp = properties.getProperty(CassandraStoreParameters.PROTOCOL_VERSION);
-    if (versionProp != null) {
-      builder = builder.withProtocolVersion(ProtocolVersion.fromInt(Integer.parseInt(versionProp)));
-    }
-    builder = this.populateQueryOptions(properties, builder);
-    builder = this.populateReconnectPolicy(properties, builder);
-    builder = this.populateRetrytPolicy(properties, builder);
-    builder = this.populateSocketOptions(properties, builder);
-    String enableSSLProp = properties.getProperty(CassandraStoreParameters.ENABLE_SSL);
-    if (enableSSLProp != null) {
-      if (Boolean.parseBoolean(enableSSLProp)) {
-        builder = builder.withSSL();
-      }
-    }
-    return builder;
-  }
-
-  private Cluster.Builder populateLoadBalancingProp(Properties properties, Cluster.Builder builder) {
-    String loadBalancingProp = properties.getProperty(CassandraStoreParameters.LOAD_BALANCING_POLICY);
-    if (loadBalancingProp != null) {
-      switch (loadBalancingProp) {
-        case "LatencyAwareRoundRobinPolicy":
-          builder = builder.withLoadBalancingPolicy(LatencyAwarePolicy.builder(new RoundRobinPolicy()).build());
-          break;
-        case "RoundRobinPolicy":
-          builder = builder.withLoadBalancingPolicy(new RoundRobinPolicy());
-          break;
-        case "DCAwareRoundRobinPolicy": {
-          String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER);
-          boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean(
-                  properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL));
-          if (dataCenter != null && !dataCenter.isEmpty()) {
-            if (allowRemoteDCsForLocalConsistencyLevel) {
-              builder = builder.withLoadBalancingPolicy(
-                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter)
-                              .allowRemoteDCsForLocalConsistencyLevel().build());
-            } else {
-              builder = builder.withLoadBalancingPolicy(
-                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build());
-            }
-          } else {
-            if (allowRemoteDCsForLocalConsistencyLevel) {
-              builder = builder.withLoadBalancingPolicy(
-                      (DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build()));
-            } else {
-              builder = builder.withLoadBalancingPolicy((DCAwareRoundRobinPolicy.builder().build()));
-            }
-          }
-          break;
-        }
-        case "TokenAwareRoundRobinPolicy":
-          builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
-          break;
-        case "TokenAwareDCAwareRoundRobinPolicy": {
-          String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER);
-          boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean(
-                  properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL));
-          if (dataCenter != null && !dataCenter.isEmpty()) {
-            if (allowRemoteDCsForLocalConsistencyLevel) {
-              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
-                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter)
-                              .allowRemoteDCsForLocalConsistencyLevel().build()));
-            } else {
-              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
-                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build()));
-            }
-          } else {
-            if (allowRemoteDCsForLocalConsistencyLevel) {
-              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
-                      DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build()));
-            } else {
-              builder = builder.withLoadBalancingPolicy(
-                      new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()));
-            }
-          }
-          break;
-        }
-        default:
-          LOG.error("Unsupported Cassandra load balancing  policy: {} ", loadBalancingProp);
-          break;
-      }
-    }
-    return builder;
-  }
-
-  private Cluster.Builder populateCredentials(Properties properties, Cluster.Builder builder) {
-    String usernameProp = properties.getProperty(CassandraStoreParameters.USERNAME);
-    String passwordProp = properties.getProperty(CassandraStoreParameters.PASSWORD);
-    if (usernameProp != null) {
-      builder = builder.withCredentials(usernameProp, passwordProp);
-    }
-    return builder;
-  }
-
-  private Cluster.Builder populatePoolingSettings(Properties properties, Cluster.Builder builder) {
-    String localCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_CORE_CONNECTIONS_PER_HOST);
-    String remoteCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_CORE_CONNECTIONS_PER_HOST);
-    String localMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_CONNECTIONS_PER_HOST);
-    String remoteMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_CONNECTIONS_PER_HOST);
-    String localNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.LOCAL_NEW_CONNECTION_THRESHOLD);
-    String remoteNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.REMOTE_NEW_CONNECTION_THRESHOLD);
-    String localMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_REQUESTS_PER_CONNECTION);
-    String remoteMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_REQUESTS_PER_CONNECTION);
-    PoolingOptions options = new PoolingOptions();
-    if (localCoreConnectionsPerHost != null) {
-      options.setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localCoreConnectionsPerHost));
-    }
-    if (remoteCoreConnectionsPerHost != null) {
-      options.setCoreConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteCoreConnectionsPerHost));
-    }
-    if (localMaxConnectionsPerHost != null) {
-      options.setMaxConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localMaxConnectionsPerHost));
-    }
-    if (remoteMaxConnectionsPerHost != null) {
-      options.setMaxConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteMaxConnectionsPerHost));
-    }
-    if (localNewConnectionThreshold != null) {
-      options.setNewConnectionThreshold(HostDistance.LOCAL, Integer.parseInt(localNewConnectionThreshold));
-    }
-    if (remoteNewConnectionThreshold != null) {
-      options.setNewConnectionThreshold(HostDistance.REMOTE, Integer.parseInt(remoteNewConnectionThreshold));
-    }
-    if (localMaxRequestsPerConnection != null) {
-      options.setMaxRequestsPerConnection(HostDistance.LOCAL, Integer.parseInt(localMaxRequestsPerConnection));
-    }
-    if (remoteMaxRequestsPerConnection != null) {
-      options.setMaxRequestsPerConnection(HostDistance.REMOTE, Integer.parseInt(remoteMaxRequestsPerConnection));
-    }
-    builder = builder.withPoolingOptions(options);
-    return builder;
-  }
-
-  private Cluster.Builder populateQueryOptions(Properties properties, Cluster.Builder builder) {
-    String consistencyLevelProp = properties.getProperty(CassandraStoreParameters.CONSISTENCY_LEVEL);
-    String serialConsistencyLevelProp = properties.getProperty(CassandraStoreParameters.SERIAL_CONSISTENCY_LEVEL);
-    String fetchSize = properties.getProperty(CassandraStoreParameters.FETCH_SIZE);
-    QueryOptions options = new QueryOptions();
-    if (consistencyLevelProp != null) {
-      options.setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevelProp));
-    }
-    if (serialConsistencyLevelProp != null) {
-      options.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevelProp));
-    }
-    if (fetchSize != null) {
-      options.setFetchSize(Integer.parseInt(fetchSize));
-    }
-    return builder.withQueryOptions(options);
-  }
-
-  private Cluster.Builder populateReconnectPolicy(Properties properties, Cluster.Builder builder) {
-    String reconnectionPolicy = properties.getProperty(CassandraStoreParameters.RECONNECTION_POLICY);
-    if (reconnectionPolicy != null) {
-      switch (reconnectionPolicy) {
-        case "ConstantReconnectionPolicy": {
-          String constantReconnectionPolicyDelay = properties.getProperty(CassandraStoreParameters.CONSTANT_RECONNECTION_POLICY_DELAY);
-          ConstantReconnectionPolicy policy = new ConstantReconnectionPolicy(Long.parseLong(constantReconnectionPolicyDelay));
-          builder = builder.withReconnectionPolicy(policy);
-          break;
-        }
-        case "ExponentialReconnectionPolicy": {
-          String exponentialReconnectionPolicyBaseDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_BASE_DELAY);
-          String exponentialReconnectionPolicyMaxDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_MAX_DELAY);
-
-          ExponentialReconnectionPolicy policy = new ExponentialReconnectionPolicy(Long.parseLong(exponentialReconnectionPolicyBaseDelay),
-                  Long.parseLong(exponentialReconnectionPolicyMaxDelay));
-          builder = builder.withReconnectionPolicy(policy);
-          break;
-        }
-        default:
-          LOG.error("Unsupported reconnection policy : {} ", reconnectionPolicy);
-      }
-    }
-    return builder;
-  }
-
-  private Cluster.Builder populateRetrytPolicy(Properties properties, Cluster.Builder builder) {
-    String retryPolicy = properties.getProperty(CassandraStoreParameters.RETRY_POLICY);
-    if (retryPolicy != null) {
-      switch (retryPolicy) {
-        case "DefaultRetryPolicy":
-          builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE);
-          break;
-        case "DowngradingConsistencyRetryPolicy":
-          builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE);
-          break;
-        case "FallthroughRetryPolicy":
-          builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE);
-          break;
-        case "LoggingDefaultRetryPolicy":
-          builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE));
-          break;
-        case "LoggingDowngradingConsistencyRetryPolicy":
-          builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));
-          break;
-        case "LoggingFallthroughRetryPolicy":
-          builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE));
-          break;
-        default:
-          LOG.error("Unsupported retry policy : {} ", retryPolicy);
-          break;
-      }
-    }
-    return builder;
-  }
-
-  private Cluster.Builder populateSocketOptions(Properties properties, Cluster.Builder builder) {
-    String connectionTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.CONNECTION_TIMEOUT_MILLIS);
-    String keepAliveProp = properties.getProperty(CassandraStoreParameters.KEEP_ALIVE);
-    String readTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.READ_TIMEOUT_MILLIS);
-    String receiveBufferSizeProp = properties.getProperty(CassandraStoreParameters.RECEIVER_BUFFER_SIZE);
-    String reuseAddress = properties.getProperty(CassandraStoreParameters.REUSE_ADDRESS);
-    String sendBufferSize = properties.getProperty(CassandraStoreParameters.SEND_BUFFER_SIZE);
-    String soLinger = properties.getProperty(CassandraStoreParameters.SO_LINGER);
-    String tcpNoDelay = properties.getProperty(CassandraStoreParameters.TCP_NODELAY);
-    SocketOptions options = new SocketOptions();
-    if (connectionTimeoutMillisProp != null) {
-      options.setConnectTimeoutMillis(Integer.parseInt(connectionTimeoutMillisProp));
-    }
-    if (keepAliveProp != null) {
-      options.setKeepAlive(Boolean.parseBoolean(keepAliveProp));
-    }
-    if (readTimeoutMillisProp != null) {
-      options.setReadTimeoutMillis(Integer.parseInt(readTimeoutMillisProp));
-    }
-    if (receiveBufferSizeProp != null) {
-      options.setReceiveBufferSize(Integer.parseInt(receiveBufferSizeProp));
-    }
-    if (reuseAddress != null) {
-      options.setReuseAddress(Boolean.parseBoolean(reuseAddress));
-    }
-    if (sendBufferSize != null) {
-      options.setSendBufferSize(Integer.parseInt(sendBufferSize));
-    }
-    if (soLinger != null) {
-      options.setSoLinger(Integer.parseInt(soLinger));
-    }
-    if (tcpNoDelay != null) {
-      options.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelay));
-    }
-    return builder.withSocketOptions(options);
-  }
-
-  private List<String> readCustomCodec(Properties properties) throws JDOMException, IOException {
-    String filename = properties.getProperty(CassandraStoreParameters.CUSTOM_CODEC_FILE);
-    if (filename != null) {
-      List<String> codecs = new ArrayList<>();
-      SAXBuilder builder = new SAXBuilder();
-      Document doc = builder.build(getClass().getClassLoader().getResourceAsStream(filename));
-      List<Element> codecElementList = doc.getRootElement().getChildren("codec");
-      for (Element codec : codecElementList) {
-        codecs.add(codec.getValue());
-      }
-      return codecs;
-    }
-    return null;
-  }
-
-  /**
-   * This method returns configured read consistency level.
-   * @return read Consistency level
-   */
-  public String getReadConsistencyLevel() {
-    return readConsistencyLevel;
-  }
-
-  /**
-   * This method returns configured write consistency level.
-   * @return write Consistency level
-   */
-  public String getWriteConsistencyLevel() {
-    return writeConsistencyLevel;
-  }
-
-
-  public void close() {
-    this.session.close();
-    this.cluster.close();
-  }
-
-  private void registerCustomCodecs(List<String> codecs) throws Exception {
-    for (String codec : codecs) {
-      this.cluster.getConfiguration().getCodecRegistry().register((TypeCodec<?>) Class.forName(codec).newInstance());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
deleted file mode 100644
index 7d5ac05..0000000
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.gora.cassandra.store;
-
-import org.apache.gora.cassandra.bean.CassandraKey;
-import org.apache.gora.cassandra.bean.Field;
-import org.apache.gora.cassandra.bean.KeySpace;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class represents the Cassandra Mapping.
- */
-public class CassandraMapping {
-
-  private static final String PRIMARY_KEY = "primarykey";
-  private CassandraKey cassandraKey;
-  private Map<String, String> tableProperties;
-  private Class keyClass;
-  private Class persistentClass;
-  private KeySpace keySpace;
-  private List<Field> fieldList;
-  private Field inlinedDefinedPartitionKey;
-  private String coreName;
-
-  /**
-   * Constructor of the class
-   */
-  CassandraMapping() {
-    this.fieldList = new ArrayList<>();
-    this.tableProperties = new HashMap<>();
-  }
-
-  /**
-   * This method returns the KeySpace in the mapping file,
-   *
-   * @return Key space {@link KeySpace}
-   */
-  public KeySpace getKeySpace() {
-    return keySpace;
-  }
-
-  /**
-   * This method set the KeySpace in the Cassandra mapping.
-   *
-   * @param keySpace Key space {@link KeySpace}
-   */
-  void setKeySpace(KeySpace keySpace) {
-    this.keySpace = keySpace;
-  }
-
-  /**
-   * Thi method returns only the fields which belongs only for the Persistent Object.
-   *
-   * @return List of Fields
-   */
-  public List<Field> getFieldList() {
-    return fieldList;
-  }
-
-  /**
-   * This method returns the Persistent Object's Field from the mapping, according to the FieldName.
-   *
-   * @param fieldName Field Name
-   * @return Field {@link Field}
-   */
-  public Field getFieldFromFieldName(String fieldName) {
-    for (Field field1 : fieldList) {
-      if (field1.getFieldName().equalsIgnoreCase(fieldName)) {
-        return field1;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * This method returns the Persistent Object's Field from the mapping, according to the ColumnName.
-   *
-   * @param columnName Column Name
-   * @return Field {@link Field}
-   */
-  public Field getFieldFromColumnName(String columnName) {
-    for (Field field1 : fieldList) {
-      if (field1.getColumnName().equalsIgnoreCase(columnName)) {
-        return field1;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * This method returns the Field Names
-   *
-   * @return array of Field Names
-   */
-  public String[] getFieldNames() {
-    List<String> fieldNames = new ArrayList<>(fieldList.size());
-    for (Field field : fieldList) {
-      fieldNames.add(field.getFieldName());
-    }
-    String[] fieldNameArray = new String[fieldNames.size()];
-    return fieldNames.toArray(fieldNameArray);
-  }
-
-  /**
-   * This method returns partition keys
-   *
-   * @return partitionKeys
-   */
-  public String[] getAllFieldsIncludingKeys() {
-    List<String> fieldNames = new ArrayList<>(fieldList.size());
-    for (Field field : fieldList) {
-      fieldNames.add(field.getFieldName());
-    }
-    if (cassandraKey != null) {
-      for (Field field : cassandraKey.getFieldList()) {
-        fieldNames.add(field.getFieldName());
-      }
-    }
-    String[] fieldNameArray = new String[fieldNames.size()];
-    return fieldNames.toArray(fieldNameArray);
-  }
-
-  /**
-   * This method return all the fields which involves with partition keys, Including composite Keys
-   * @return field Names
-   */
-  public String[] getAllKeys() {
-    List<String> fieldNames = new ArrayList<>();
-    Field keyField = getInlinedDefinedPartitionKey();
-    if (cassandraKey != null) {
-      for (Field field : cassandraKey.getFieldList()) {
-        fieldNames.add(field.getFieldName());
-      }
-    } else {
-      fieldNames.add(keyField.getFieldName());
-    }
-    String[] fieldNameArray = new String[fieldNames.size()];
-    return fieldNames.toArray(fieldNameArray);
-  }
-
-  public CassandraKey getCassandraKey() {
-    return cassandraKey;
-  }
-
-  void setCassandraKey(CassandraKey cassandraKey) {
-    this.cassandraKey = cassandraKey;
-  }
-
-  public String getCoreName() {
-    return coreName;
-  }
-
-  void setCoreName(String coreName) {
-    this.coreName = coreName;
-  }
-
-  void addCassandraField(Field field) {
-    this.fieldList.add(field);
-  }
-
-  void addProperty(String key, String value) {
-    this.tableProperties.put(key, value);
-  }
-
-  public String getProperty(String key) {
-    return this.tableProperties.get(key);
-  }
-
-  private Field getDefaultCassandraKey() {
-    Field field = new Field();
-    field.setFieldName("defaultId");
-    field.setColumnName("defaultId");
-    field.setType("varchar");
-    field.addProperty("primarykey", "true");
-    return field;
-  }
-
-  public Class getKeyClass() {
-    return keyClass;
-  }
-
-  public void setKeyClass(Class keyClass) {
-    this.keyClass = keyClass;
-  }
-
-  public Class getPersistentClass() {
-    return persistentClass;
-  }
-
-  void setPersistentClass(Class persistentClass) {
-    this.persistentClass = persistentClass;
-  }
-
-  /**
-   * This method return the Inlined defined Partition Key,
-   * If there isn't any inlined define partition keys,
-   * this method returns default predefined partition key "defaultId".
-   *
-   * @return Partition Key {@link Field}
-   */
-  public Field getInlinedDefinedPartitionKey() {
-    if (inlinedDefinedPartitionKey != null) {
-      return inlinedDefinedPartitionKey;
-    } else {
-      for (Field field : fieldList) {
-        if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) {
-          inlinedDefinedPartitionKey = field;
-          break;
-        }
-      }
-      if (inlinedDefinedPartitionKey == null) {
-        return getDefaultCassandraKey();
-      }
-      return inlinedDefinedPartitionKey;
-    }
-  }
-
-  void finalized() {
-    Field field = getInlinedDefinedPartitionKey();
-    if (!fieldList.contains(field) && cassandraKey == null) {
-      fieldList.add(field);
-    }
-  }
-}


Mime
View raw message