gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject [11/37] gora git commit: Refactored the code
Date Wed, 23 Aug 2017 20:55:09 GMT
Refactored the code


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/5e383ef9
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/5e383ef9
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/5e383ef9

Branch: refs/heads/master
Commit: 5e383ef974541804382a639b79e3efbf6b03d2b2
Parents: 89109b8
Author: madhawa <madhawa30@gmail.com>
Authored: Wed Jun 28 23:45:18 2017 +0530
Committer: madhawa <madhawa30@gmail.com>
Committed: Wed Jun 28 23:48:54 2017 +0530

----------------------------------------------------------------------
 gora-cassandra-cql/pom.xml                      |  10 -
 gora-cassandra-cql/src/examples/java/.gitignore |  15 -
 .../generated/nativeSerialization/User.java     |  66 ++
 .../persistent/CassandraNativePersistent.java   | 109 ++++
 .../cassandra/serializers/AvroSerializer.java   |  51 ++
 .../serializers/CassandraNativePersistent.java  |  91 ---
 .../serializers/CassandraQueryFactory.java      | 248 ++++++++
 .../serializers/CassandraSerializer.java        | 105 +++
 .../cassandra/serializers/NativeSerializer.java |  61 ++
 .../gora/cassandra/store/CassandraClient.java   | 368 +++++++++++
 .../store/CassandraMappingBuilder.java          | 234 +++++++
 .../cassandra/store/CassandraQueryFactory.java  | 224 -------
 .../gora/cassandra/store/CassandraStore.java    | 636 ++-----------------
 .../store/CassandraStoreParameters.java         |   2 +-
 .../test/conf/avro/gora-cassandra-mapping.xml   |  73 +++
 .../gora-cassandra-mapping.xml                  |   2 +-
 gora-cassandra-cql/src/test/java/.gitignore     |  15 -
 ...stCassandraStoreWithNativeSerialization.java |  25 +-
 .../nativeSerialization/DateAsStringCodec.java  |  17 +
 .../test/nativeSerialization/User.java          |  66 --
 20 files changed, 1404 insertions(+), 1014 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/pom.xml
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/pom.xml b/gora-cassandra-cql/pom.xml
index d56e07a..13e5a1a 100644
--- a/gora-cassandra-cql/pom.xml
+++ b/gora-cassandra-cql/pom.xml
@@ -110,16 +110,6 @@
     </build>
 
     <dependencies>
-
-        <!-- OSX JDK 7 -->
-        <!-- should be removed once fixed in Cassandra -->
-        <dependency>
-            <groupId>org.xerial.snappy</groupId>
-            <artifactId>snappy-java</artifactId>
-            <version>1.0.5-M3</version>
-            <scope>test</scope>
-        </dependency>
-
         <!-- Gora Internal Dependencies -->
         <dependency>
             <groupId>org.apache.gora</groupId>

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/examples/java/.gitignore
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/examples/java/.gitignore b/gora-cassandra-cql/src/examples/java/.gitignore
deleted file mode 100644
index 09697dc..0000000
--- a/gora-cassandra-cql/src/examples/java/.gitignore
+++ /dev/null
@@ -1,15 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java
new file mode 100644
index 0000000..1e810a0
--- /dev/null
+++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java
@@ -0,0 +1,66 @@
+package org.apache.gora.cassandra.example.generated.nativeSerialization;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.PartitionKey;
+import com.datastax.driver.mapping.annotations.Table;
+import com.datastax.driver.mapping.annotations.Transient;
+import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
+
+import java.util.Date;
+import java.util.UUID;
+
+/**
+ * Created by madhawa on 6/23/17.
+ */
+
+@Table(keyspace = "nativeTestKeySpace", name = "users",
+        readConsistency = "QUORUM",
+        writeConsistency = "QUORUM",
+        caseSensitiveKeyspace = false,
+        caseSensitiveTable = false)
+public class User extends CassandraNativePersistent {
+  @PartitionKey
+  @Column(name = "user_id")
+  private UUID userId;
+  @Column(name = "name")
+  private String name;
+  @Column(name = "dob")
+  private Date dateOfBirth;
+
+  @Transient
+  private boolean dirty;
+
+  public User() {
+
+  }
+
+  public User(UUID userId, String name, Date dateOfBirth) {
+    this.userId = userId;
+    this.name = name;
+    this.dateOfBirth = dateOfBirth;
+  }
+
+  public void setUserId(UUID userId) {
+    this.userId = userId;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public void setDateOfBirth(Date dateOfBirth) {
+    this.dateOfBirth = dateOfBirth;
+  }
+
+  public UUID getUserId() {
+    return userId;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public Date getDateOfBirth() {
+    return dateOfBirth;
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
new file mode 100644
index 0000000..bd17dcd
--- /dev/null
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
@@ -0,0 +1,109 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.gora.cassandra.persistent;
+
+import com.datastax.driver.mapping.annotations.Transient;
+import org.apache.avro.Schema;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.Tombstone;
+import org.apache.gora.persistency.impl.PersistentBase;
+
+import java.util.List;
+
+/**
+ * This class should be used with Native Cassandra Serialization.
+ */
+public abstract class CassandraNativePersistent implements Persistent {
+  @Transient
+  @Override
+  public void clear() {
+
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty(int fieldIndex) {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty(String field) {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public void setDirty() {
+
+  }
+
+  @Transient
+  @Override
+  public void setDirty(int fieldIndex) {
+
+  }
+
+  @Transient
+  @Override
+  public void setDirty(String field) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty(int fieldIndex) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty(String field) {
+
+  }
+
+  @Transient
+  @Override
+  public Tombstone getTombstone() {
+    return null;
+  }
+
+  @Transient
+  @Override
+  public List<Schema.Field> getUnmanagedFields() {
+    return null;
+  }
+
+  @Transient
+  @Override
+  public Persistent newInstance() {
+    return null;
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty() {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public void clearDirty() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
new file mode 100644
index 0000000..bb9d99c
--- /dev/null
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
@@ -0,0 +1,51 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import org.apache.gora.cassandra.store.CassandraClient;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.persistency.impl.PersistentBase;
+
+/**
+ * Created by madhawa on 6/26/17.
+ */
+public class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
+
+
+  public AvroSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
+    super(cassandraClient, keyClass, persistentClass, mapping);
+  }
+
+
+  @Override
+  public PersistentBase get(Object key) {
+    return null;
+  }
+
+  @Override
+  public void put(Object key, Object value) {
+
+  }
+
+  @Override
+  public boolean delete(Object key) {
+    return false;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraNativePersistent.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraNativePersistent.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraNativePersistent.java
deleted file mode 100644
index c493433..0000000
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraNativePersistent.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.gora.cassandra.serializers;
-
-import com.datastax.driver.mapping.annotations.Transient;
-import org.apache.avro.Schema;
-import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.Tombstone;
-
-import java.util.List;
-
-/**
- * This class should be used with Native Cassandra Serialization.
- */
-public abstract class CassandraNativePersistent implements Persistent {
-  @Transient
-  @Override
-  public void clear() {
-
-  }
-
-  @Transient
-  @Override
-  public boolean isDirty(int fieldIndex) {
-    return false;
-  }
-
-  @Transient
-  @Override
-  public boolean isDirty(String field) {
-    return false;
-  }
-
-  @Transient
-  @Override
-  public void setDirty() {
-
-  }
-
-  @Transient
-  @Override
-  public void setDirty(int fieldIndex) {
-
-  }
-
-  @Transient
-  @Override
-  public void setDirty(String field) {
-
-  }
-
-  @Transient
-  @Override
-  public void clearDirty(int fieldIndex) {
-
-  }
-
-  @Transient
-  @Override
-  public void clearDirty(String field) {
-
-  }
-
-  @Transient
-  @Override
-  public Tombstone getTombstone() {
-    return null;
-  }
-
-  @Transient
-  @Override
-  public List<Schema.Field> getUnmanagedFields() {
-    return null;
-  }
-
-  @Transient
-  @Override
-  public Persistent newInstance() {
-    return null;
-  }
-
-  @Transient
-  @Override
-  public boolean isDirty() {
-    return false;
-  }
-
-  @Transient
-  @Override
-  public void clearDirty() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
new file mode 100644
index 0000000..84f5ccb
--- /dev/null
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
@@ -0,0 +1,248 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.gora.cassandra.serializers;
+
+import org.apache.gora.cassandra.bean.CassandraKey;
+import org.apache.gora.cassandra.bean.ClusterKeyField;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.bean.KeySpace;
+import org.apache.gora.cassandra.bean.PartitionKeyField;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.persistency.Persistent;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is used create Cassandra Queries.
+ */
+class CassandraQueryFactory {
+
+  /**
+   * This method returns the CQL query to create key space.
+   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html
+   *
+   * @param mapping Cassandra Mapping
+   * @return CQL Query
+   */
+  static String getCreateKeySpaceQuery(CassandraMapping mapping) {
+    KeySpace keySpace = mapping.getKeySpace();
+    StringBuilder stringBuffer = new StringBuilder();
+    stringBuffer.append("CREATE KEYSPACE IF NOT EXISTS ").append(keySpace.getName()).append(" WITH REPLICATION = { 'class' : ");
+    KeySpace.PlacementStrategy placementStrategy = keySpace.getPlacementStrategy();
+    stringBuffer.append("'").append(placementStrategy).append("'").append(", ").append("'");
+    switch (placementStrategy) {
+      case SimpleStrategy:
+        stringBuffer.append("replication_factor").append("'").append(" : ").append(keySpace.getReplicationFactor()).append(" }");
+        break;
+      case NetworkTopologyStrategy:
+        boolean isCommaNeeded = false;
+        for (Map.Entry<String, Integer> entry : keySpace.getDataCenters().entrySet()) {
+          if (isCommaNeeded) {
+            stringBuffer.append(", '");
+          }
+          stringBuffer.append(entry.getKey()).append("'").append(" : ").append(entry.getValue());
+          isCommaNeeded = true;
+        }
+        stringBuffer.append(" }");
+        break;
+    }
+
+    if (keySpace.isDurableWritesEnabled()) {
+      stringBuffer.append(" AND DURABLE_WRITES = ").append(keySpace.isDurableWritesEnabled());
+    }
+    return stringBuffer.toString();
+  }
+
+  /**
+   * This method returns the CQL query to table.
+   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_table_r.html
+   * <p>
+   * Trick : To have a consistency of the order of the columns, first we append partition keys, second cluster keys and finally other columns.
+   * It's very much needed to follow the same order in other CRUD operations as well.
+   *
+   * @param mapping Cassandra mapping
+   * @return CQL
+   */
+  static String getCreateTableQuery(CassandraMapping mapping) {
+    StringBuilder stringBuffer = new StringBuilder();
+    stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" (");
+    boolean isCommaNeeded = false;
+    CassandraKey cassandraKey = mapping.getCassandraKey();
+    // appending Cassandra key columns into db schema
+    if (cassandraKey != null) {
+      for (PartitionKeyField partitionKeyField : cassandraKey.getPartitionKeyFields()) {
+        if (partitionKeyField.isComposite()) {
+          for (Field compositeField : partitionKeyField.getFields()) {
+            stringBuffer = processFields(stringBuffer, compositeField, isCommaNeeded);
+          }
+
+        } else {
+          stringBuffer = processFields(stringBuffer, partitionKeyField, isCommaNeeded);
+        }
+        isCommaNeeded = true;
+      }
+      for (ClusterKeyField clusterKeyField : cassandraKey.getClusterKeyFields()) {
+        stringBuffer = processFields(stringBuffer, clusterKeyField, isCommaNeeded);
+      }
+    }
+    // appending Other columns
+    for (Field field : mapping.getFieldList()) {
+      if (isCommaNeeded) {
+        stringBuffer.append(", ");
+      }
+      stringBuffer.append(field.getColumnName()).append(" ").append(field.getType());
+      boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static"));
+      boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
+      if (isStaticColumn) {
+        stringBuffer.append(" STATIC");
+      }
+      if (isPrimaryKey) {
+        stringBuffer.append("  PRIMARY KEY ");
+      }
+      isCommaNeeded = true;
+    }
+
+    if (cassandraKey != null) {
+      List<PartitionKeyField> pkey = cassandraKey.getPartitionKeyFields();
+      if (pkey != null) {
+        stringBuffer.append(", PRIMARY KEY (");
+        boolean isCommaNeededToApply = false;
+        for (PartitionKeyField keyField : pkey) {
+          if (isCommaNeededToApply) {
+            stringBuffer.append(",");
+          }
+          if (keyField.isComposite()) {
+            stringBuffer.append("(");
+            boolean isCommaNeededHere = false;
+            for (Field field : keyField.getFields()) {
+              if (isCommaNeededHere) {
+                stringBuffer.append(", ");
+              }
+              stringBuffer.append(field.getColumnName());
+              isCommaNeededHere = true;
+            }
+            stringBuffer.append(")");
+          } else {
+            stringBuffer.append(keyField.getColumnName());
+          }
+          isCommaNeededToApply = true;
+        }
+        stringBuffer.append(")");
+      }
+    }
+
+    stringBuffer.append(")");
+    boolean isWithNeeded = true;
+    if (Boolean.parseBoolean(mapping.getProperty("compactStorage"))) {
+      stringBuffer.append(" WITH COMPACT STORAGE ");
+      isWithNeeded = false;
+    }
+
+    String id = mapping.getProperty("id");
+    if (id != null) {
+      if (isWithNeeded) {
+        stringBuffer.append(" WITH ");
+      } else {
+        stringBuffer.append(" AND ");
+      }
+      stringBuffer.append("ID = '").append(id).append("'");
+      isWithNeeded = false;
+    }
+    if (cassandraKey != null) {
+      List<ClusterKeyField> clusterKeyFields = cassandraKey.getClusterKeyFields();
+      if (clusterKeyFields != null) {
+        if (isWithNeeded) {
+          stringBuffer.append(" WITH ");
+        } else {
+          stringBuffer.append(" AND ");
+        }
+        stringBuffer.append(" CLUSTERING ORDER BY (");
+        boolean isCommaNeededToApply = false;
+        for (ClusterKeyField keyField : clusterKeyFields) {
+          if (isCommaNeededToApply) {
+            stringBuffer.append(", ");
+          }
+          stringBuffer.append(keyField.getColumnName()).append(" ");
+          if (keyField.getOrder() != null) {
+            stringBuffer.append(keyField.getOrder());
+          }
+          isCommaNeededToApply = true;
+        }
+        stringBuffer.append(")");
+      }
+    }
+    return stringBuffer.toString();
+  }
+
+  private static StringBuilder processFields(StringBuilder stringBuilder, Field field, boolean isCommaNeeded) {
+    if (isCommaNeeded) {
+      stringBuilder.append(", ");
+    }
+    stringBuilder.append(field.getColumnName()).append(" ").append(field.getType());
+    boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static"));
+    if (isStaticColumn) {
+      stringBuilder.append(" STATIC");
+    }
+    return stringBuilder;
+  }
+
+  /**
+   * This method returns the CQL query to drop table.
+   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html
+   *
+   * @param mapping Cassandra Mapping
+   * @return CQL query
+   */
+  static String getDropTableQuery(CassandraMapping mapping) {
+    return "DROP TABLE IF EXISTS " + mapping.getKeySpace().getName() + "." + mapping.getCoreName();
+  }
+
+  /**
+   * This method returns the CQL query to drop key space.
+   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_keyspace_r.html
+   *
+   * @param mapping Cassandra Mapping
+   * @return CQL query
+   */
+  static String getDropKeySpaceQuery(CassandraMapping mapping) {
+    return "DROP KEYSPACE IF EXISTS " + mapping.getKeySpace().getName();
+  }
+
+  /**
+   * This method returns the CQL query to truncate (removes all the data) in the table.
+   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/truncate_r.html
+   *
+   * @param mapping Cassandra Mapping
+   * @return CQL query
+   */
+  static String getTruncateTableQuery(CassandraMapping mapping) {
+    return "TRUNCATE TABLE " + mapping.getKeySpace().getName() + "." + mapping.getCoreName();
+  }
+
+  /**
+   *
+   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/insert_r.html
+   * @return
+   */
+  static String getInsertDataQuery(CassandraMapping mapping, Object obj) {
+//    ( (Persistent) obj).getS
+    StringBuilder stringBuffer = new StringBuilder();
+//    o
+return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
new file mode 100644
index 0000000..09272ce
--- /dev/null
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
@@ -0,0 +1,105 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.TableMetadata;
+import org.apache.gora.cassandra.store.CassandraClient;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.cassandra.store.CassandraStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+
+/**
+ * Created by madhawa on 6/26/17.
+ */
+public abstract class CassandraSerializer<K, T> {
+  CassandraClient client;
+
+  private Class<K> keyClass;
+
+  private Class<T> persistentClass;
+
+  private CassandraMapping mapping;
+
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
+
+  CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) {
+    this.keyClass = keyClass;
+    this.persistentClass = persistantClass;
+    this.client = cc;
+    this.mapping = mapping;
+  }
+
+  public void createSchema() {
+    LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName());
+    this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping));
+    LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName());
+    this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping));
+  }
+
+  public void deleteSchema() {
+    LOG.debug("dropping Cassandra table {}", mapping.getCoreName());
+    this.client.getSession().execute(CassandraQueryFactory.getDropTableQuery(mapping));
+    LOG.debug("dropping Cassandra keyspace {}", mapping.getKeySpace().getName());
+    this.client.getSession().execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping));
+  }
+
+  public void close() {
+    this.client.close();
+  }
+
+  public void truncateSchema() {
+    LOG.debug("truncating Cassandra table {}", mapping.getCoreName());
+    this.client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
+  }
+
+  public boolean schemaExists() {
+    KeyspaceMetadata keyspace = this.client.getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName());
+    if (keyspace != null) {
+      TableMetadata table = keyspace.getTable(mapping.getCoreName());
+      return table != null;
+    } else {
+      return false;
+    }
+  }
+
+  public static <K, T> CassandraSerializer getSerializer(CassandraClient cc, String type, final Class<K> keyClass, final Class<T> persistentClass, CassandraMapping mapping) {
+    CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH));
+    CassandraSerializer ser;
+    switch (serType) {
+      case AVRO:
+        ser = new AvroSerializer(cc,keyClass, persistentClass, mapping);
+        break;
+      case NATIVE:
+      default:
+        ser = new NativeSerializer(cc, keyClass, persistentClass, mapping);
+
+    }
+    return ser;
+  }
+
+  public abstract void put(K key, T value);
+
+  public abstract T get(K key);
+
+  public abstract boolean delete(K key);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
new file mode 100644
index 0000000..0c30cba
--- /dev/null
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
+import org.apache.gora.cassandra.store.CassandraClient;
+import org.apache.gora.cassandra.store.CassandraMapping;
+
+/**
+ * Created by madhawa on 6/26/17.
+ */
+public class NativeSerializer<K, T extends CassandraNativePersistent> extends CassandraSerializer {
+
+  private Mapper<T> mapper;
+
+
+
+
+  @Override
+  public void put(Object key, Object value) {
+    mapper.save((T)value);
+  }
+
+  @Override
+  public T get(Object key) {
+    return mapper.get(key);
+  }
+
+  @Override
+  public boolean delete(Object key) {
+    mapper.delete(key);
+    return true;
+  }
+
+
+
+  public NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
+    super(cassandraClient, keyClass, persistentClass, mapping);
+    this.createSchema();
+    MappingManager mappingManager = new MappingManager(cassandraClient.getSession());
+    mapper = mappingManager.mapper(persistentClass);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
new file mode 100644
index 0000000..847343e
--- /dev/null
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
@@ -0,0 +1,368 @@
+package org.apache.gora.cassandra.store;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.TypeCodec;
+import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.DefaultRetryPolicy;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.FallthroughRetryPolicy;
+import com.datastax.driver.core.policies.LatencyAwarePolicy;
+import com.datastax.driver.core.policies.LoggingRetryPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by madhawa on 6/28/17.
+ */
+public class CassandraClient {
+
+  public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
+
+
+  private Cluster cluster;
+
+  public Session getSession() {
+    return session;
+  }
+
+  public Cluster getCluster() {
+    return cluster;
+  }
+
+  private Session session;
+
+
+  public void initialize(Properties properties) throws Exception {
+    Cluster.Builder builder = Cluster.builder();
+    List<String> codecs = readCustomCodec(properties);
+    builder = populateSettings(builder, properties);
+    this.cluster = builder.build();
+    if (codecs != null) {
+      registerCustomCodecs(codecs);
+    }
+    this.session = this.cluster.connect();
+  }
+
+
+  private Cluster.Builder populateSettings(Cluster.Builder builder, Properties properties) {
+    String serversParam = properties.getProperty(CassandraStoreParameters.CASSANDRA_SERVERS);
+    String[] servers = serversParam.split(",");
+    for (String server : servers) {
+      builder = builder.addContactPoint(server);
+    }
+    String portProp = properties.getProperty(CassandraStoreParameters.PORT);
+    if (portProp != null) {
+      builder = builder.withPort(Integer.parseInt(portProp));
+    }
+    String clusterNameProp = properties.getProperty(CassandraStoreParameters.CLUSTER_NAME);
+    if (clusterNameProp != null) {
+      builder = builder.withClusterName(clusterNameProp);
+    }
+    String compressionProp = properties.getProperty(CassandraStoreParameters.COMPRESSION);
+    if (compressionProp != null) {
+      builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compressionProp));
+    }
+    builder = this.populateCredentials(properties, builder);
+    builder = this.populateLoadBalancingProp(properties, builder);
+    String enableJMXProp = properties.getProperty(CassandraStoreParameters.ENABLE_JMX_REPORTING);
+    if (!Boolean.parseBoolean(enableJMXProp)) {
+      builder = builder.withoutJMXReporting();
+    }
+    String enableMetricsProp = properties.getProperty(CassandraStoreParameters.ENABLE_METRICS);
+    if (!Boolean.parseBoolean(enableMetricsProp)) {
+      builder = builder.withoutMetrics();
+    }
+    builder = this.populatePoolingSettings(properties, builder);
+    String versionProp = properties.getProperty(CassandraStoreParameters.PROTOCOL_VERSION);
+    if (versionProp != null) {
+      builder = builder.withProtocolVersion(ProtocolVersion.fromInt(Integer.parseInt(versionProp)));
+    }
+    builder = this.populateQueryOptions(properties, builder);
+    builder = this.populateReconnectPolicy(properties, builder);
+    builder = this.populateRetrytPolicy(properties, builder);
+    builder = this.populateSocketOptions(properties, builder);
+    String enableSSLProp = properties.getProperty(CassandraStoreParameters.ENABLE_SSL);
+    if (enableSSLProp != null) {
+      if (Boolean.parseBoolean(enableSSLProp)) {
+        builder = builder.withSSL();
+      }
+    }
+    return builder;
+  }
+
+
+  private Cluster.Builder populateLoadBalancingProp(Properties properties, Cluster.Builder builder) {
+    String loadBalancingProp = properties.getProperty(CassandraStoreParameters.LOAD_BALANCING_POLICY);
+    if (loadBalancingProp != null) {
+      switch (loadBalancingProp) {
+        case "LatencyAwareRoundRobinPolicy":
+          builder = builder.withLoadBalancingPolicy(LatencyAwarePolicy.builder(new RoundRobinPolicy()).build());
+          break;
+        case "RoundRobinPolicy":
+          builder = builder.withLoadBalancingPolicy(new RoundRobinPolicy());
+          break;
+        case "DCAwareRoundRobinPolicy": {
+          String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER);
+          boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean(
+                  properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL));
+          if (dataCenter != null && !dataCenter.isEmpty()) {
+            if (allowRemoteDCsForLocalConsistencyLevel) {
+              builder = builder.withLoadBalancingPolicy(
+                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter)
+                              .allowRemoteDCsForLocalConsistencyLevel().build());
+            } else {
+              builder = builder.withLoadBalancingPolicy(
+                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build());
+            }
+          } else {
+            if (allowRemoteDCsForLocalConsistencyLevel) {
+              builder = builder.withLoadBalancingPolicy(
+                      (DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build()));
+            } else {
+              builder = builder.withLoadBalancingPolicy((DCAwareRoundRobinPolicy.builder().build()));
+            }
+          }
+          break;
+        }
+        case "TokenAwareRoundRobinPolicy":
+          builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
+          break;
+        case "TokenAwareDCAwareRoundRobinPolicy": {
+          String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER);
+          boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean(
+                  properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL));
+          if (dataCenter != null && !dataCenter.isEmpty()) {
+            if (allowRemoteDCsForLocalConsistencyLevel) {
+              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
+                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter)
+                              .allowRemoteDCsForLocalConsistencyLevel().build()));
+            } else {
+              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
+                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build()));
+            }
+          } else {
+            if (allowRemoteDCsForLocalConsistencyLevel) {
+              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
+                      DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build()));
+            } else {
+              builder = builder.withLoadBalancingPolicy(
+                      new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()));
+            }
+          }
+          break;
+        }
+        default:
+          LOG.error("Unsupported Cassandra load balancing  policy: {} ", loadBalancingProp);
+          break;
+      }
+    }
+    return builder;
+  }
+
+  private Cluster.Builder populateCredentials(Properties properties, Cluster.Builder builder) {
+    String usernameProp = properties.getProperty(CassandraStoreParameters.USERNAME);
+    String passwordProp = properties.getProperty(CassandraStoreParameters.PASSWORD);
+    if (usernameProp != null) {
+      builder = builder.withCredentials(usernameProp, passwordProp);
+    }
+    return builder;
+  }
+
+  private Cluster.Builder populatePoolingSettings(Properties properties, Cluster.Builder builder) {
+    String localCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_CORE_CONNECTIONS_PER_HOST);
+    String remoteCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_CORE_CONNECTIONS_PER_HOST);
+    String localMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_CONNECTIONS_PER_HOST);
+    String remoteMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_CONNECTIONS_PER_HOST);
+    String localNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.LOCAL_NEW_CONNECTION_THRESHOLD);
+    String remoteNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.REMOTE_NEW_CONNECTION_THRESHOLD);
+    String localMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_REQUESTS_PER_CONNECTION);
+    String remoteMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_REQUESTS_PER_CONNECTION);
+    PoolingOptions options = new PoolingOptions();
+    if (localCoreConnectionsPerHost != null) {
+      options.setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localCoreConnectionsPerHost));
+    }
+    if (remoteCoreConnectionsPerHost != null) {
+      options.setCoreConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteCoreConnectionsPerHost));
+    }
+    if (localMaxConnectionsPerHost != null) {
+      options.setMaxConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localMaxConnectionsPerHost));
+    }
+    if (remoteMaxConnectionsPerHost != null) {
+      options.setMaxConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteMaxConnectionsPerHost));
+    }
+    if (localNewConnectionThreshold != null) {
+      options.setNewConnectionThreshold(HostDistance.LOCAL, Integer.parseInt(localNewConnectionThreshold));
+    }
+    if (remoteNewConnectionThreshold != null) {
+      options.setNewConnectionThreshold(HostDistance.REMOTE, Integer.parseInt(remoteNewConnectionThreshold));
+    }
+    if (localMaxRequestsPerConnection != null) {
+      options.setMaxRequestsPerConnection(HostDistance.LOCAL, Integer.parseInt(localMaxRequestsPerConnection));
+    }
+    if (remoteMaxRequestsPerConnection != null) {
+      options.setMaxRequestsPerConnection(HostDistance.REMOTE, Integer.parseInt(remoteMaxRequestsPerConnection));
+    }
+    builder = builder.withPoolingOptions(options);
+    return builder;
+  }
+
+  private Cluster.Builder populateQueryOptions(Properties properties, Cluster.Builder builder) {
+    String consistencyLevelProp = properties.getProperty(CassandraStoreParameters.CONSISTENCY_LEVEL);
+    String serialConsistencyLevelProp = properties.getProperty(CassandraStoreParameters.SERIAL_CONSISTENCY_LEVEL);
+    String fetchSize = properties.getProperty(CassandraStoreParameters.FETCH_SIZE);
+    QueryOptions options = new QueryOptions();
+    if (consistencyLevelProp != null) {
+      options.setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevelProp));
+    }
+    if (serialConsistencyLevelProp != null) {
+      options.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevelProp));
+    }
+    if (fetchSize != null) {
+      options.setFetchSize(Integer.parseInt(fetchSize));
+    }
+    return builder.withQueryOptions(options);
+  }
+
+  private Cluster.Builder populateReconnectPolicy(Properties properties, Cluster.Builder builder) {
+    String reconnectionPolicy = properties.getProperty(CassandraStoreParameters.RECONNECTION_POLICY);
+    if (reconnectionPolicy != null) {
+      switch (reconnectionPolicy) {
+        case "ConstantReconnectionPolicy": {
+          String constantReconnectionPolicyDelay = properties.getProperty(CassandraStoreParameters.CONSTANT_RECONNECTION_POLICY_DELAY);
+          ConstantReconnectionPolicy policy = new ConstantReconnectionPolicy(Long.parseLong(constantReconnectionPolicyDelay));
+          builder = builder.withReconnectionPolicy(policy);
+          break;
+        }
+        case "ExponentialReconnectionPolicy": {
+          String exponentialReconnectionPolicyBaseDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_BASE_DELAY);
+          String exponentialReconnectionPolicyMaxDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_MAX_DELAY);
+
+          ExponentialReconnectionPolicy policy = new ExponentialReconnectionPolicy(Long.parseLong(exponentialReconnectionPolicyBaseDelay),
+                  Long.parseLong(exponentialReconnectionPolicyMaxDelay));
+          builder = builder.withReconnectionPolicy(policy);
+          break;
+        }
+        default:
+          LOG.error("Unsupported reconnection policy : {} ", reconnectionPolicy);
+      }
+    }
+    return builder;
+  }
+
+  private Cluster.Builder populateRetrytPolicy(Properties properties, Cluster.Builder builder) {
+    String retryPolicy = properties.getProperty(CassandraStoreParameters.RETRY_POLICY);
+    if (retryPolicy != null) {
+      switch (retryPolicy) {
+        case "DefaultRetryPolicy":
+          builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE);
+          break;
+        case "DowngradingConsistencyRetryPolicy":
+          builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE);
+          break;
+        case "FallthroughRetryPolicy":
+          builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE);
+          break;
+        case "LoggingDefaultRetryPolicy":
+          builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE));
+          break;
+        case "LoggingDowngradingConsistencyRetryPolicy":
+          builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));
+          break;
+        case "LoggingFallthroughRetryPolicy":
+          builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE));
+          break;
+        default:
+          LOG.error("Unsupported retry policy : {} ", retryPolicy);
+          break;
+      }
+    }
+    return builder;
+  }
+
+  private Cluster.Builder populateSocketOptions(Properties properties, Cluster.Builder builder) {
+    String connectionTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.CONNECTION_TIMEOUT_MILLIS);
+    String keepAliveProp = properties.getProperty(CassandraStoreParameters.KEEP_ALIVE);
+    String readTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.READ_TIMEOUT_MILLIS);
+    String receiveBufferSizeProp = properties.getProperty(CassandraStoreParameters.RECEIVER_BUFFER_SIZE);
+    String reuseAddress = properties.getProperty(CassandraStoreParameters.REUSE_ADDRESS);
+    String sendBufferSize = properties.getProperty(CassandraStoreParameters.SEND_BUFFER_SIZE);
+    String soLinger = properties.getProperty(CassandraStoreParameters.SO_LINGER);
+    String tcpNoDelay = properties.getProperty(CassandraStoreParameters.TCP_NODELAY);
+    SocketOptions options = new SocketOptions();
+    if (connectionTimeoutMillisProp != null) {
+      options.setConnectTimeoutMillis(Integer.parseInt(connectionTimeoutMillisProp));
+    }
+    if (keepAliveProp != null) {
+      options.setKeepAlive(Boolean.parseBoolean(keepAliveProp));
+    }
+    if (readTimeoutMillisProp != null) {
+      options.setReadTimeoutMillis(Integer.parseInt(readTimeoutMillisProp));
+    }
+    if (receiveBufferSizeProp != null) {
+      options.setReceiveBufferSize(Integer.parseInt(receiveBufferSizeProp));
+    }
+    if (reuseAddress != null) {
+      options.setReuseAddress(Boolean.parseBoolean(reuseAddress));
+    }
+    if (sendBufferSize != null) {
+      options.setSendBufferSize(Integer.parseInt(sendBufferSize));
+    }
+    if (soLinger != null) {
+      options.setSoLinger(Integer.parseInt(soLinger));
+    }
+    if (tcpNoDelay != null) {
+      options.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelay));
+    }
+    return builder.withSocketOptions(options);
+  }
+
+
+  private List<String> readCustomCodec(Properties properties) throws JDOMException, IOException {
+    String filename = properties.getProperty(CassandraStoreParameters.CUSTOM_CODEC_FILE);
+    if (filename != null) {
+      List<String> codecs = new ArrayList<>();
+      SAXBuilder builder = new SAXBuilder();
+      Document doc = builder.build(getClass().getClassLoader().getResourceAsStream(filename));
+      List<Element> codecElementList = doc.getRootElement().getChildren("codec");
+      for (Element codec : codecElementList) {
+        codecs.add(codec.getValue());
+      }
+      return codecs;
+    }
+    return null;
+  }
+
+
+  public void close() {
+    this.session.close();
+    this.cluster.close();
+  }
+
+  private void registerCustomCodecs(List<String> codecs) throws Exception {
+    for (String codec : codecs) {
+      this.cluster.getConfiguration().getCodecRegistry().register((TypeCodec<?>) Class.forName(codec).newInstance());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
new file mode 100644
index 0000000..bb8c6ad
--- /dev/null
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
@@ -0,0 +1,234 @@
+package org.apache.gora.cassandra.store;
+
+import org.apache.gora.cassandra.bean.CassandraKey;
+import org.apache.gora.cassandra.bean.ClusterKeyField;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.bean.KeySpace;
+import org.apache.gora.cassandra.bean.PartitionKeyField;
+import org.apache.gora.persistency.Persistent;
+import org.jdom.Attribute;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Created by madhawa on 6/28/17.
+ */
+public class CassandraMappingBuilder<K, T extends Persistent> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraMappingBuilder.class);
+
+
+  private CassandraStore dataStore;
+
+
+  /**
+   * Constructor for builder to create the mapper.
+   *
+   * @param store
+   */
+  public CassandraMappingBuilder(final CassandraStore<K, T> store) {
+    this.dataStore = store;
+  }
+
+  /**
+   * In this method we reads the mapping file and creates the Cassandra Mapping.
+   *
+   * @param filename mapping file name
+   * @return @{@link CassandraMapping}
+   * @throws IOException
+   */
+  @SuppressWarnings("all")
+  public CassandraMapping readMapping(String filename) throws IOException {
+    CassandraMapping map = new CassandraMapping();
+    Class keyClass = dataStore.getKeyClass();
+    Class persistentClass = dataStore. getPersistentClass();
+    try {
+      SAXBuilder builder = new SAXBuilder();
+      Document doc = builder.build(getClass().getClassLoader().getResourceAsStream(filename));
+
+      List<Element> keyspaces = doc.getRootElement().getChildren("keyspace");
+      List<Element> classes = doc.getRootElement().getChildren("class");
+      List<Element> keys = doc.getRootElement().getChildren("cassandraKey");
+
+      boolean classMatched = false;
+      for (Element classElement : classes) {
+        if (classElement.getAttributeValue("keyClass").equals(
+                keyClass.getCanonicalName())
+                && classElement.getAttributeValue("name").equals(
+                persistentClass.getCanonicalName())) {
+
+          classMatched = true;
+          String tableName = classElement.getAttributeValue("table");
+          map.setCoreName(tableName);
+
+          List classAttributes = classElement.getAttributes();
+          for (Object anAttributeList : classAttributes) {
+            Attribute attribute = (Attribute) anAttributeList;
+            String attributeName = attribute.getName();
+            String attributeValue = attribute.getValue();
+            map.addProperty(attributeName, attributeValue);
+          }
+
+          List<Element> fields = classElement.getChildren("field");
+
+          for (Element field : fields) {
+            Field cassandraField = new Field();
+
+            List fieldAttributes = field.getAttributes();
+            processAttributes(fieldAttributes, cassandraField);
+            map.addCassandraField(cassandraField);
+          }
+          break;
+        }
+        LOG.warn("Check that 'keyClass' and 'name' parameters in gora-solr-mapping.xml "
+                + "match with intended values. A mapping mismatch has been found therefore "
+                + "no mapping has been initialized for class mapping at position "
+                + " {} in mapping file.", classes.indexOf(classElement));
+      }
+      if (!classMatched) {
+        LOG.error("Check that 'keyClass' and 'name' parameters in {} no mapping has been initialized for {} class mapping", filename, persistentClass);
+      }
+
+      String keyspaceName = map.getProperty("keyspace");
+      if (keyspaceName != null) {
+        KeySpace keyspace;
+        for (Element keyspaceElement : keyspaces) {
+          if (keyspaceName.equals(keyspaceElement.getAttributeValue("name"))) {
+            keyspace = new KeySpace();
+            List fieldAttributes = keyspaceElement.getAttributes();
+            for (Object attributeObject : fieldAttributes) {
+              Attribute attribute = (Attribute) attributeObject;
+              String attributeName = attribute.getName();
+              String attributeValue = attribute.getValue();
+              switch (attributeName) {
+                case "name":
+                  keyspace.setName(attributeValue);
+                  break;
+                case "durableWrite":
+                  keyspace.setDurableWritesEnabled(Boolean.parseBoolean(attributeValue));
+                  break;
+                default:
+                  keyspace.addProperty(attributeName, attributeValue);
+                  break;
+              }
+            }
+            Element placementStrategy = keyspaceElement.getChild("placementStrategy");
+            switch (KeySpace.PlacementStrategy.valueOf(placementStrategy.getAttributeValue("name"))) {
+              case SimpleStrategy:
+                keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.SimpleStrategy);
+                keyspace.setReplicationFactor(Integer.parseInt(placementStrategy.getAttributeValue("replication_factor")));
+                break;
+              case NetworkTopologyStrategy:
+                List<Element> dataCenters = placementStrategy.getChildren("datacenter");
+                keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.NetworkTopologyStrategy);
+                for (Element dataCenter : dataCenters) {
+                  String dataCenterName = dataCenter.getAttributeValue("name");
+                  Integer dataCenterReplicationFactor = Integer.valueOf(dataCenter.getAttributeValue("replication_factor"));
+                  keyspace.addDataCenter(dataCenterName, dataCenterReplicationFactor);
+                }
+                break;
+            }
+            map.setKeySpace(keyspace);
+            break;
+          }
+
+        }
+
+      }
+
+      for (Element key : keys) {
+        if (keyClass.getName().equals(key.getAttributeValue("name"))) {
+          CassandraKey cassandraKey = new CassandraKey(keyClass.getName());
+          Element partitionKeys = key.getChild("partitionKey");
+          Element clusterKeys = key.getChild("clusterKey");
+          List<Element> partitionKeyFields = partitionKeys.getChildren("field");
+          List<Element> partitionCompositeKeyFields = partitionKeys.getChildren("compositeKey");
+          // process non composite partition keys
+          for (Element partitionKeyField : partitionKeyFields) {
+            PartitionKeyField fieldKey = new PartitionKeyField();
+            List fieldAttributes = partitionKeyField.getAttributes();
+            processAttributes(fieldAttributes, fieldKey);
+            cassandraKey.addPartitionKeyField(fieldKey);
+          }
+          // process composite partitions keys
+          for (Element partitionCompositeKeyField : partitionCompositeKeyFields) {
+            PartitionKeyField compositeFieldKey = new PartitionKeyField();
+            compositeFieldKey.setComposite(true);
+            List<Element> compositeKeyFields = partitionCompositeKeyField.getChildren("field");
+            for (Element partitionKeyField : compositeKeyFields) {
+              PartitionKeyField fieldKey = new PartitionKeyField();
+              List fieldAttributes = partitionKeyField.getAttributes();
+              processAttributes(fieldAttributes, fieldKey);
+              compositeFieldKey.addField(fieldKey);
+            }
+            cassandraKey.addPartitionKeyField(compositeFieldKey);
+          }
+
+          //process cluster keys
+          List<Element> clusterKeyFields = clusterKeys.getChildren("field");
+          for (Element clusterKeyField : clusterKeyFields) {
+            ClusterKeyField keyField = new ClusterKeyField();
+            List fieldAttributes = clusterKeyField.getAttributes();
+            for (Object anAttributeList : fieldAttributes) {
+              Attribute attribute = (Attribute) anAttributeList;
+              String attributeName = attribute.getName();
+              String attributeValue = attribute.getValue();
+              switch (attributeName) {
+                case "name":
+                  keyField.setFieldName(attributeValue);
+                  break;
+                case "column":
+                  keyField.setColumnName(attributeValue);
+                  break;
+                case "type":
+                  keyField.setType(attributeValue);
+                  break;
+                case "order":
+                  keyField.setOrder(ClusterKeyField.Order.valueOf(attributeValue.toUpperCase(Locale.ENGLISH)));
+                  break;
+                default:
+                  keyField.addProperty(attributeName, attributeValue);
+                  break;
+              }
+            }
+            cassandraKey.addClusterKeyField(keyField);
+          }
+          map.setCassandraKey(cassandraKey);
+        }
+      }
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    return map;
+  }
+
+  private void processAttributes(List<Element> attributes, Field fieldKey) {
+    for (Object anAttributeList : attributes) {
+      Attribute attribute = (Attribute) anAttributeList;
+      String attributeName = attribute.getName();
+      String attributeValue = attribute.getValue();
+      switch (attributeName) {
+        case "name":
+          fieldKey.setFieldName(attributeValue);
+          break;
+        case "column":
+          fieldKey.setColumnName(attributeValue);
+          break;
+        case "type":
+          fieldKey.setType(attributeValue);
+          break;
+        default:
+          fieldKey.addProperty(attributeName, attributeValue);
+          break;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraQueryFactory.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraQueryFactory.java
deleted file mode 100644
index fc90c5f..0000000
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraQueryFactory.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.gora.cassandra.store;
-
-import org.apache.gora.cassandra.bean.CassandraKey;
-import org.apache.gora.cassandra.bean.ClusterKeyField;
-import org.apache.gora.cassandra.bean.Field;
-import org.apache.gora.cassandra.bean.KeySpace;
-import org.apache.gora.cassandra.bean.PartitionKeyField;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class is used create Cassandra Queries.
- */
-class CassandraQueryFactory {
-
-  /**
-   * This method returns the CQL query to create key space.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html
-   *
-   * @param mapping Cassandra Mapping
-   * @return CQL Query
-   */
-  static String getCreateKeySpaceQuery(CassandraMapping mapping) {
-    KeySpace keySpace = mapping.getKeySpace();
-    StringBuilder stringBuffer = new StringBuilder();
-    stringBuffer.append("CREATE KEYSPACE IF NOT EXISTS ").append(keySpace.getName()).append(" WITH REPLICATION = { 'class' : ");
-    KeySpace.PlacementStrategy placementStrategy = keySpace.getPlacementStrategy();
-    stringBuffer.append("'").append(placementStrategy).append("'").append(", ").append("'");
-    switch (placementStrategy) {
-      case SimpleStrategy:
-        stringBuffer.append("replication_factor").append("'").append(" : ").append(keySpace.getReplicationFactor()).append(" }");
-        break;
-      case NetworkTopologyStrategy:
-        boolean isCommaNeeded = false;
-        for (Map.Entry<String, Integer> entry : keySpace.getDataCenters().entrySet()) {
-          if (isCommaNeeded) {
-            stringBuffer.append(", '");
-          }
-          stringBuffer.append(entry.getKey()).append("'").append(" : ").append(entry.getValue());
-          isCommaNeeded = true;
-        }
-        stringBuffer.append(" }");
-        break;
-    }
-
-    if (keySpace.isDurableWritesEnabled()) {
-      stringBuffer.append(" AND DURABLE_WRITES = ").append(keySpace.isDurableWritesEnabled());
-    }
-    return stringBuffer.toString();
-  }
-
-  static String getCreateTableQuery(CassandraMapping mapping) {
-    StringBuilder stringBuffer = new StringBuilder();
-    stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" (");
-    boolean isCommaNeeded = false;
-    CassandraKey cassandraKey = mapping.getCassandraKey();
-    // appending Cassandra key columns into db schema
-    if (cassandraKey != null) {
-      for (PartitionKeyField partitionKeyField : cassandraKey.getPartitionKeyFields()) {
-        if (partitionKeyField.isComposite()) {
-          for (Field compositeField : partitionKeyField.getFields()) {
-            stringBuffer = processFields(stringBuffer, compositeField, isCommaNeeded);
-          }
-
-        } else {
-          stringBuffer = processFields(stringBuffer, partitionKeyField, isCommaNeeded);
-        }
-        isCommaNeeded = true;
-      }
-      for (ClusterKeyField clusterKeyField : cassandraKey.getClusterKeyFields()) {
-        stringBuffer = processFields(stringBuffer, clusterKeyField, isCommaNeeded);
-      }
-    }
-    // appending Other columns
-    for (Field field : mapping.getFieldList()) {
-      if (isCommaNeeded) {
-        stringBuffer.append(", ");
-      }
-      stringBuffer.append(field.getColumnName()).append(" ").append(field.getType());
-      boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static"));
-      boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
-      if (isStaticColumn) {
-        stringBuffer.append(" STATIC");
-      }
-      if (isPrimaryKey) {
-        stringBuffer.append("  PRIMARY KEY ");
-      }
-      isCommaNeeded = true;
-    }
-
-    if (cassandraKey != null) {
-      List<PartitionKeyField> pkey = cassandraKey.getPartitionKeyFields();
-      if (pkey != null) {
-        stringBuffer.append(", PRIMARY KEY (");
-        boolean isCommaNeededToApply = false;
-        for (PartitionKeyField keyField : pkey) {
-          if (isCommaNeededToApply) {
-            stringBuffer.append(",");
-          }
-          if (keyField.isComposite()) {
-            stringBuffer.append("(");
-            boolean isCommaNeededHere = false;
-            for (Field field : keyField.getFields()) {
-              if (isCommaNeededHere) {
-                stringBuffer.append(", ");
-              }
-              stringBuffer.append(field.getColumnName());
-              isCommaNeededHere = true;
-            }
-            stringBuffer.append(")");
-          } else {
-            stringBuffer.append(keyField.getColumnName());
-          }
-          isCommaNeededToApply = true;
-        }
-        stringBuffer.append(")");
-      }
-    }
-
-    stringBuffer.append(")");
-    boolean isWithNeeded = true;
-    if (Boolean.parseBoolean(mapping.getProperty("compactStorage"))) {
-      stringBuffer.append(" WITH COMPACT STORAGE ");
-      isWithNeeded = false;
-    }
-
-    String id = mapping.getProperty("id");
-    if (id != null) {
-      if (isWithNeeded) {
-        stringBuffer.append(" WITH ");
-      } else {
-        stringBuffer.append(" AND ");
-      }
-      stringBuffer.append("ID = '").append(id).append("'");
-      isWithNeeded = false;
-    }
-    if (cassandraKey != null) {
-      List<ClusterKeyField> clusterKeyFields = cassandraKey.getClusterKeyFields();
-      if (clusterKeyFields != null) {
-        if (isWithNeeded) {
-          stringBuffer.append(" WITH ");
-        } else {
-          stringBuffer.append(" AND ");
-        }
-        stringBuffer.append(" CLUSTERING ORDER BY (");
-        boolean isCommaNeededToApply = false;
-        for (ClusterKeyField keyField : clusterKeyFields) {
-          if (isCommaNeededToApply) {
-            stringBuffer.append(", ");
-          }
-          stringBuffer.append(keyField.getColumnName()).append(" ");
-          if (keyField.getOrder() != null) {
-            stringBuffer.append(keyField.getOrder());
-          }
-          isCommaNeededToApply = true;
-        }
-        stringBuffer.append(")");
-      }
-    }
-    return stringBuffer.toString();
-  }
-
-  private static StringBuilder processFields(StringBuilder stringBuilder, Field field, boolean isCommaNeeded) {
-    if (isCommaNeeded) {
-      stringBuilder.append(", ");
-    }
-    stringBuilder.append(field.getColumnName()).append(" ").append(field.getType());
-    boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static"));
-    if (isStaticColumn) {
-      stringBuilder.append(" STATIC");
-    }
-    return stringBuilder;
-  }
-
-  /**
-   * This method returns the CQL query to drop table.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html
-   *
-   * @param mapping Cassandra Mapping
-   * @return CQL query
-   */
-  static String getDropTableQuery(CassandraMapping mapping) {
-    return "DROP TABLE IF EXISTS " + mapping.getKeySpace().getName() + "." + mapping.getCoreName();
-  }
-
-  /**
-   * This method returns the CQL query to drop key space.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_keyspace_r.html
-   *
-   * @param mapping Cassandra Mapping
-   * @return CQL query
-   */
-  static String getDropKeySpaceQuery(CassandraMapping mapping) {
-    return "DROP KEYSPACE IF EXISTS " + mapping.getKeySpace().getName();
-  }
-
-  /**
-   * This method returns the CQL query to truncate (removes all the data) in the table.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/truncate_r.html
-   *
-   * @param mapping Cassandra Mapping
-   * @return CQL query
-   */
-  static String getTruncateTableQuery(CassandraMapping mapping) {
-    return "TRUNCATE TABLE " + mapping.getKeySpace().getName() + "." + mapping.getCoreName();
-  }
-}


Mime
View raw message