gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject [28/37] gora git commit: Add tests for avro
Date Wed, 23 Aug 2017 20:55:26 GMT
Add tests for avro


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/a9a3ad49
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/a9a3ad49
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/a9a3ad49

Branch: refs/heads/master
Commit: a9a3ad4922a45a76c8c6b1b76c011d72736e0c30
Parents: 962d7a6
Author: madhawa-gunasekara <madhawag@wso2.com>
Authored: Fri Aug 4 21:35:14 2017 +0530
Committer: madhawa-gunasekara <madhawag@wso2.com>
Committed: Thu Aug 10 23:45:56 2017 +0530

----------------------------------------------------------------------
 gora-cassandra-cql/pom.xml                      |   5 +-
 .../nativeSerialization/ComplexTypes.java       |  85 ++-
 .../generated/nativeSerialization/Customer.java |  81 +++
 .../generated/nativeSerialization/Document.java | 137 +++++
 .../generated/nativeSerialization/User.java     |  86 ++-
 .../org/apache/gora/cassandra/bean/Field.java   |   1 +
 .../persistent/CassandraNativePersistent.java   | 108 ----
 .../cassandra/query/CassandraResultSet.java     |   6 +-
 .../serializers/AvroCassandraUtils.java         |  20 +-
 .../cassandra/serializers/AvroSerializer.java   | 126 +++--
 .../serializers/CassandraQueryFactory.java      | 526 +++++++++----------
 .../serializers/CassandraSerializer.java        |  87 +--
 .../cassandra/serializers/NativeSerializer.java |  10 +-
 .../gora/cassandra/store/CassandraMapping.java  |   8 +-
 .../store/CassandraMappingBuilder.java          |   3 +
 .../gora/cassandra/store/CassandraStore.java    |  26 +-
 .../conf/avroUDT/gora-cassandra-mapping.xml     |  50 ++
 .../src/test/conf/gora.properties               |   4 +-
 .../conf/nativeUDT/gora-cassandra-mapping.xml   |  33 ++
 .../gora/cassandra/GoraCassandraTestDriver.java |   8 +-
 .../store/TestAvroSerializationWithUDT.java     |  91 ++++
 .../cassandra/store/TestCassandraStore.java     |  97 +++-
 .../TestCassandraStoreWithCassandraKey.java     |  68 +++
 .../store/TestNativeSerializationWithUDT.java   |  91 ++++
 pom.xml                                         |   4 +-
 25 files changed, 1248 insertions(+), 513 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/pom.xml
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/pom.xml b/gora-cassandra-cql/pom.xml
index ed98a18..b78db24 100644
--- a/gora-cassandra-cql/pom.xml
+++ b/gora-cassandra-cql/pom.xml
@@ -167,11 +167,10 @@
             </exclusions>
         </dependency>
 
-        <dependency>
+<!--        <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-transport-native-epoll</artifactId>
-            <version>4.0.37.Final</version>
-        </dependency>
+        </dependency>-->
 
         <dependency>
             <groupId>org.apache.gora</groupId>

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/ComplexTypes.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/ComplexTypes.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/ComplexTypes.java
index 797ea62..c161ef9 100644
--- a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/ComplexTypes.java
+++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/ComplexTypes.java
@@ -3,7 +3,10 @@ package org.apache.gora.cassandra.example.generated.nativeSerialization;
 import com.datastax.driver.mapping.annotations.Column;
 import com.datastax.driver.mapping.annotations.PartitionKey;
 import com.datastax.driver.mapping.annotations.Table;
-import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
+import com.datastax.driver.mapping.annotations.Transient;
+import org.apache.avro.Schema;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.Tombstone;
 
 import java.util.List;
 import java.util.Map;
@@ -18,7 +21,7 @@ import java.util.UUID;
         writeConsistency = "QUORUM",
         caseSensitiveKeyspace = false,
         caseSensitiveTable = true)
-public class ComplexTypes extends CassandraNativePersistent {
+public class ComplexTypes implements Persistent {
 
   @Column
   private List<String> listDataType;
@@ -98,4 +101,82 @@ public class ComplexTypes extends CassandraNativePersistent {
   public void setId(String id) {
     this.id = id;
   }
+
+  @Transient
+  @Override
+  public void clear() {
+
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty(int fieldIndex) {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty(String field) {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public void setDirty() {
+
+  }
+
+  @Transient
+  @Override
+  public void setDirty(int fieldIndex) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty(int fieldIndex) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty(String field) {
+
+  }
+
+  @Transient
+  @Override
+  public Tombstone getTombstone() {
+    return null;
+  }
+
+  @Transient
+  @Override
+  public List<Schema.Field> getUnmanagedFields() {
+    return null;
+  }
+
+  @Transient
+  @Override
+  public Persistent newInstance() {
+    return new ComplexTypes();
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty() {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public void setDirty(String field) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty() {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Customer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Customer.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Customer.java
new file mode 100644
index 0000000..1b2de10
--- /dev/null
+++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Customer.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.example.generated.nativeSerialization;
+
+import com.datastax.driver.mapping.annotations.Field;
+import com.datastax.driver.mapping.annotations.UDT;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@UDT(name = "customer", keyspace = "nativeTestKeySpace")
+public class Customer {
+
+  public Customer() {
+
+  }
+  @Field(name = "id")
+  private String id;
+
+  @Field(name = "name")
+  private String name;
+
+  @Field
+  private UUID age;
+
+  @Field(name = "coupon_code")
+  private String couponCode;
+
+  @Field(name = "address")
+  private String address;
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getCouponCode() {
+    return couponCode;
+  }
+
+  public void setCouponCode(String couponCode) {
+    this.couponCode = couponCode;
+  }
+
+  public String getAddress() {
+    return address;
+  }
+
+  public void setAddress(String address) {
+    this.address = address;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Document.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Document.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Document.java
new file mode 100644
index 0000000..f841a4e
--- /dev/null
+++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Document.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.example.generated.nativeSerialization;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.PartitionKey;
+import com.datastax.driver.mapping.annotations.Table;
+import com.datastax.driver.mapping.annotations.Transient;
+import org.apache.avro.Schema;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.Tombstone;
+
+import java.util.List;
+
+@Table(keyspace = "nativeTestKeySpace", name = "documents",
+        readConsistency = "QUORUM",
+        writeConsistency = "QUORUM",
+        caseSensitiveKeyspace = false,
+        caseSensitiveTable = false)
+public class Document implements Persistent {
+  @Column
+  Customer customer;
+  @PartitionKey
+  @Column
+  String defaultId;
+
+  public String getDefaultId() {
+    return defaultId;
+  }
+
+  public void setDefaultId(String defaultId) {
+    this.defaultId = defaultId;
+  }
+
+  public Customer getCustomer() {
+    return customer;
+  }
+
+  public void setCustomer(Customer customer) {
+    this.customer = customer;
+  }
+
+  @Transient
+  @Override
+  public void clear() {
+
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty(int fieldIndex) {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty(String field) {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public void setDirty() {
+
+  }
+
+  @Transient
+  @Override
+  public void setDirty(int fieldIndex) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty(int fieldIndex) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty(String field) {
+
+  }
+
+  @Transient
+  @Override
+  public Tombstone getTombstone() {
+    return null;
+  }
+
+  @Transient
+  @Override
+  public List<Schema.Field> getUnmanagedFields() {
+    return null;
+  }
+
+  @Transient
+  @Override
+  public Persistent newInstance() {
+    return new Document();
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty() {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public void setDirty(String field) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty() {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java
index c8d7a78..08abcd1 100644
--- a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java
+++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java
@@ -20,9 +20,13 @@ package org.apache.gora.cassandra.example.generated.nativeSerialization;
 import com.datastax.driver.mapping.annotations.Column;
 import com.datastax.driver.mapping.annotations.PartitionKey;
 import com.datastax.driver.mapping.annotations.Table;
-import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
+import com.datastax.driver.mapping.annotations.Transient;
+import org.apache.avro.Schema;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.Tombstone;
 
 import java.util.Date;
+import java.util.List;
 import java.util.UUID;
 
 /**
@@ -33,7 +37,7 @@ import java.util.UUID;
         writeConsistency = "QUORUM",
         caseSensitiveKeyspace = false,
         caseSensitiveTable = false)
-public class User extends CassandraNativePersistent {
+public class User implements Persistent {
   @PartitionKey
   @Column(name = "user_id")
   private UUID userId;
@@ -75,4 +79,82 @@ public class User extends CassandraNativePersistent {
   public void setDateOfBirth(Date dateOfBirth) {
     this.dateOfBirth = dateOfBirth;
   }
+
+  @Transient
+  @Override
+  public void clear() {
+
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty(int fieldIndex) {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty(String field) {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public void setDirty() {
+
+  }
+
+  @Transient
+  @Override
+  public void setDirty(int fieldIndex) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty(int fieldIndex) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty(String field) {
+
+  }
+
+  @Transient
+  @Override
+  public Tombstone getTombstone() {
+    return null;
+  }
+
+  @Transient
+  @Override
+  public List<Schema.Field> getUnmanagedFields() {
+    return null;
+  }
+
+  @Transient
+  @Override
+  public Persistent newInstance() {
+    return new User();
+  }
+
+  @Transient
+  @Override
+  public boolean isDirty() {
+    return false;
+  }
+
+  @Transient
+  @Override
+  public void setDirty(String field) {
+
+  }
+
+  @Transient
+  @Override
+  public void clearDirty() {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/Field.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/Field.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/Field.java
index 3bbda6d..237601d 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/Field.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/Field.java
@@ -30,6 +30,7 @@ public class Field {
   private String columnName;
 
   private String type;
+
   private Map<String, String> properties;
 
   public Field() {

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
deleted file mode 100644
index 9d6e103..0000000
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.gora.cassandra.persistent;
-
-import com.datastax.driver.mapping.annotations.Transient;
-import org.apache.avro.Schema;
-import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.Tombstone;
-
-import java.util.List;
-
-/**
- * This class should be used with Native Cassandra Serialization.
- */
-public abstract class CassandraNativePersistent implements Persistent {
-  @Transient
-  @Override
-  public void clear() {
-
-  }
-
-  @Transient
-  @Override
-  public boolean isDirty(int fieldIndex) {
-    return false;
-  }
-
-  @Transient
-  @Override
-  public boolean isDirty(String field) {
-    return false;
-  }
-
-  @Transient
-  @Override
-  public void setDirty() {
-
-  }
-
-  @Transient
-  @Override
-  public void setDirty(int fieldIndex) {
-
-  }
-
-  @Transient
-  @Override
-  public void clearDirty(int fieldIndex) {
-
-  }
-
-  @Transient
-  @Override
-  public void clearDirty(String field) {
-
-  }
-
-  @Transient
-  @Override
-  public Tombstone getTombstone() {
-    return null;
-  }
-
-  @Transient
-  @Override
-  public List<Schema.Field> getUnmanagedFields() {
-    return null;
-  }
-
-  @Transient
-  @Override
-  public Persistent newInstance() {
-    return null;
-  }
-
-  @Transient
-  @Override
-  public boolean isDirty() {
-    return false;
-  }
-
-  @Transient
-  @Override
-  public void setDirty(String field) {
-
-  }
-
-  @Transient
-  @Override
-  public void clearDirty() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
index c3b2e59..7ad106d 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
@@ -99,8 +99,10 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T
   }
 
   /**
-   * @param key
-   * @param token
+   * This method adds Result Element into result lists, So when user retrieves values from the Result these objects will be passed.
+   *
+   * @param key   key
+   * @param token persistent Object
    */
   public void addResultElement(K key, T token) {
     this.persistentKey.add(key);

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
index 7baa1b1..252cf7b 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
@@ -19,7 +19,6 @@
 package org.apache.gora.cassandra.serializers;
 
 import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificData;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.bean.CassandraKey;
 import org.apache.gora.cassandra.bean.Field;
@@ -91,20 +90,7 @@ class AvroCassandraUtils {
     switch (type) {
       // Record can be persist with two ways, udt and bytes
       case RECORD:
-        PersistentBase persistent = (PersistentBase) fieldValue;
-        if (field.getType().contains("frozen")) {
-          PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema());
-          for (Schema.Field member : fieldSchema.getFields()) {
-            if (member.pos() == 0 || !persistent.isDirty()) {
-              continue;
-            }
-            Schema memberSchema = member.schema();
-            Schema.Type memberType = memberSchema.getType();
-            Object memberValue = persistent.get(member.pos());
-            newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue, field));
-          }
-          fieldValue = newRecord;
-        } else if (field.getType().contains("blob")) {
+        if (field.getType().contains("blob")) {
           try {
             byte[] serializedBytes = HBaseByteInterface.toBytes(fieldValue, fieldSchema);
             fieldValue = ByteBuffer.wrap(serializedBytes);
@@ -112,13 +98,13 @@ class AvroCassandraUtils {
             LOG.error("Error occurred when serializing {} field. {}", new Object[]{field.getFieldName(), e.getMessage()});
           }
         } else {
-          throw new RuntimeException("");
+          throw new RuntimeException("Unsupported Data Type for Record, Currently Supported Data Types are blob and UDT for Records");
         }
         break;
       case MAP:
         Schema valueSchema = fieldSchema.getValueType();
         Schema.Type valuetype = valueSchema.getType();
-        HashMap<String, Object> map = new HashMap<>();
+        Map<String, Object> map = new HashMap<>();
         for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) fieldValue).entrySet()) {
           String mapKey = e.getKey().toString();
           Object mapValue = e.getValue();

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
index 57d03f1..58b57dc 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
@@ -17,11 +17,15 @@
 
 package org.apache.gora.cassandra.serializers;
 
+import com.datastax.driver.core.AbstractGettableData;
 import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
+import com.datastax.driver.core.UDTValue;
+import com.datastax.driver.core.UserType;
 import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.gora.cassandra.bean.CassandraKey;
 import org.apache.gora.cassandra.bean.Field;
@@ -36,10 +40,16 @@ import org.apache.gora.store.DataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 /**
  * This class contains the operations relates to Avro Serialization.
@@ -51,8 +61,8 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
 
   private DataStore<K, T> cassandraDataStore;
 
-  AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) {
-    super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
+  AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping, Schema schema) {
+    super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping, schema);
     this.cassandraDataStore = dataStore;
   }
 
@@ -71,7 +81,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     T obj = null;
     if (iterator.hasNext()) {
       obj = cassandraDataStore.newPersistent();
-      Row row = iterator.next();
+      AbstractGettableData row = (AbstractGettableData) iterator.next();
       populateValuesToPersistent(row, definitions, obj, fields);
     }
     return obj;
@@ -94,7 +104,35 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
           }
           if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName))) {
             Object value = persistentBase.get(f.pos());
-            value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field);
+            String fieldType = field.getType();
+            if (fieldType.contains("frozen")) {
+              fieldType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">"));
+              UserType userType = client.getSession().getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()).getUserType(fieldType);
+              UDTValue udtValue = userType.newValue();
+              Schema udtSchema = f.schema();
+              if (udtSchema.getType().equals(Schema.Type.UNION)) {
+                for (Schema schema : udtSchema.getTypes()) {
+                  if (schema.getType().equals(Schema.Type.RECORD)) {
+                    udtSchema = schema;
+                    break;
+                  }
+                }
+              }
+              PersistentBase udtObjectBase = (PersistentBase) value;
+              for (Schema.Field udtField : udtSchema.getFields()) {
+                Object udtFieldValue = AvroCassandraUtils.getFieldValueFromAvroBean(udtField.schema(), udtField.schema().getType(), udtObjectBase.get(udtField.name()), field);
+                if (udtField.schema().getType().equals(Schema.Type.MAP)) {
+                  udtValue.setMap(udtField.name(), (Map) udtFieldValue);
+                } else if (udtField.schema().getType().equals(Schema.Type.ARRAY)) {
+                  udtValue.setList(udtField.name(), (List) udtFieldValue);
+                } else {
+                  udtValue.set(udtField.name(), udtFieldValue, (Class) udtFieldValue.getClass());
+                }
+              }
+              value = udtValue;
+            } else {
+              value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field);
+            }
             values.add(value);
             fields.add(fieldName);
           }
@@ -122,7 +160,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     T obj = null;
     if (iterator.hasNext()) {
       obj = cassandraDataStore.newPersistent();
-      Row row = iterator.next();
+      AbstractGettableData row = (AbstractGettableData) iterator.next();
       populateValuesToPersistent(row, definitions, obj, mapping.getFieldNames());
     }
     return obj;
@@ -131,7 +169,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
   /**
    * This method wraps result set data in to DataEntry and creates a list of DataEntry.
    **/
-  private void populateValuesToPersistent(Row row, ColumnDefinitions columnDefinitions, PersistentBase base, String[] fields) {
+  private void populateValuesToPersistent(AbstractGettableData row, ColumnDefinitions columnDefinitions, PersistentBase base, String[] fields) {
     Object paramValue;
     for (String fieldName : fields) {
       Schema.Field avroField = base.getSchema().getField(fieldName);
@@ -142,16 +180,15 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
       }
       Schema fieldSchema = avroField.schema();
       String columnName = field.getColumnName();
-      paramValue = getValue(row, columnDefinitions, columnName);
+      paramValue = getValue(row, columnDefinitions.getType(columnName), columnName, fieldSchema);
       Object value = AvroCassandraUtils.getAvroFieldValue(paramValue, fieldSchema);
       base.put(avroField.pos(), value);
     }
   }
 
-  private Object getValue(Row row, ColumnDefinitions columnDefinitions, String columnName) {
+  private Object getValue(AbstractGettableData row, DataType columnType, String columnName, Schema schema) {
     Object paramValue;
-    Field field = mapping.getFieldFromColumnName(columnName);
-    DataType columnType = columnDefinitions.getType(columnName);
+    String dataType;
     switch (columnType.getName()) {
       case ASCII:
         paramValue = row.getString(columnName);
@@ -202,24 +239,33 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
         paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
         break;
       case LIST:
-        String dataType = field.getType();
-        dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+        dataType = columnType.getTypeArguments().get(0).toString();
         paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
         break;
       case SET:
-        dataType = field.getType();
-        dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+        dataType = columnType.getTypeArguments().get(0).toString();
         paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
         break;
       case MAP:
-        dataType = field.getType();
-        dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
-        dataType = dataType.split(",")[1];
+        dataType = columnType.getTypeArguments().get(1).toString();
         // Avro supports only String for keys
         paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, getRelevantClassForCassandraDataType(dataType));
         break;
       case UDT:
         paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName);
+        if (paramValue != null) {
+          try {
+            PersistentBase udtObject = (PersistentBase) SpecificData.newInstance(Class.forName(schema.getFullName()), schema);
+            for (Schema.Field f : udtObject.getSchema().getFields()) {
+              DataType dType = ((UDTValue) paramValue).getType().getFieldType(f.name());
+              Object fieldValue = getValue((UDTValue) paramValue, dType, f.name(), f.schema());
+              udtObject.put(f.pos(), fieldValue);
+            }
+            paramValue = udtObject;
+          } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Error occurred while populating data to " + schema.getFullName() + " : " + e.getMessage());
+          }
+        }
         break;
       case TUPLE:
         paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString();
@@ -234,35 +280,35 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     return paramValue;
   }
 
-/*  public Collection<Object> getFieldValues(Object o) {
-    UDTValue udtValue = (UDTValue) o;
-    UserType type = udtValue.getType();
-
-    Collection<Object> values = new ArrayList<Object>(type.size());
-
- *//*   for (UserType.Field field : type) {
-      udtValue.
-      ByteBuffer bytes = udtValue.getBytesUnsafe(field.getName());
-      DataType value = field.getType();
-      for(DataType type1 : value.getTypeArguments()) {
-        type1.
-      }
-      values.add(value);
-    }*//*
-
-    return values;
-  }*/
-
-
   private Class getRelevantClassForCassandraDataType(String dataType) {
     switch (dataType) {
-      //// TODO: 7/25/17 support all the datatypes 
       case "ascii":
       case "text":
       case "varchar":
         return String.class;
       case "blob":
         return ByteBuffer.class;
+      case "int":
+        return Integer.class;
+      case "double":
+        return Double.class;
+      case "bigint":
+      case "counter":
+        return Long.class;
+      case "decimal":
+        return BigDecimal.class;
+      case "float":
+        return Float.class;
+      case "boolean":
+        return Boolean.class;
+      case "inet":
+        return InetAddress.class;
+      case "varint":
+        return BigInteger.class;
+      case "uuid":
+        return UUID.class;
+      case "timestamp":
+        return Date.class;
       default:
         throw new RuntimeException("Invalid Cassandra DataType");
     }
@@ -302,7 +348,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     K keyObject;
     CassandraKey cassandraKey = mapping.getCassandraKey();
     while (iterator.hasNext()) {
-      Row row = iterator.next();
+      AbstractGettableData row = (AbstractGettableData) iterator.next();
       obj = cassandraDataStore.newPersistent();
       keyObject = cassandraDataStore.newKey();
       populateValuesToPersistent(row, definitions, obj, fields);
@@ -310,7 +356,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
         populateValuesToPersistent(row, definitions, (PersistentBase) keyObject, cassandraKey.getFieldNames());
       } else {
         Field key = mapping.getInlinedDefinedPartitionKey();
-        keyObject = (K) getValue(row, definitions, key.getColumnName());
+        keyObject = (K) getValue(row, definitions.getType(key.getColumnName()), key.getColumnName(), null);
       }
       cassandraResult.addResultElement(keyObject, obj);
     }

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
index 10c8f68..184955c 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
@@ -16,28 +16,31 @@
  */
 package org.apache.gora.cassandra.serializers;
 
+import com.datastax.driver.core.querybuilder.BuiltStatement;
 import com.datastax.driver.core.querybuilder.Delete;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.querybuilder.Select;
 import com.datastax.driver.core.querybuilder.Update;
+import com.datastax.driver.mapping.annotations.UDT;
 import org.apache.avro.Schema;
 import org.apache.gora.cassandra.bean.CassandraKey;
 import org.apache.gora.cassandra.bean.ClusterKeyField;
 import org.apache.gora.cassandra.bean.Field;
 import org.apache.gora.cassandra.bean.KeySpace;
 import org.apache.gora.cassandra.bean.PartitionKeyField;
-import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.cassandra.store.CassandraStore;
 import org.apache.gora.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * This class is used create Cassandra Queries.
@@ -95,14 +98,12 @@ class CassandraQueryFactory {
   static String getCreateTableQuery(CassandraMapping mapping) {
     StringBuilder stringBuffer = new StringBuilder();
     stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" (");
-    boolean isCommaNeeded = false;
     CassandraKey cassandraKey = mapping.getCassandraKey();
     // appending Cassandra Persistent columns into db schema
-    processFieldsForCreateTableQuery(mapping.getFieldList(), isCommaNeeded, stringBuffer);
+    processFieldsForCreateTableQuery(mapping.getFieldList(), false, stringBuffer);
 
     if (cassandraKey != null) {
-      isCommaNeeded = true;
-      processFieldsForCreateTableQuery(cassandraKey.getFieldList(), isCommaNeeded, stringBuffer);
+      processFieldsForCreateTableQuery(cassandraKey.getFieldList(), true, stringBuffer);
       List<PartitionKeyField> partitionKeys = cassandraKey.getPartitionKeyFields();
       if (partitionKeys != null) {
         stringBuffer.append(", PRIMARY KEY (");
@@ -253,17 +254,29 @@ class CassandraQueryFactory {
     String[] objects = new String[fields.size()];
     Arrays.fill(objects, "?");
     Delete delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName());
-    Delete.Where query = null;
+    return processKeys(columnNames, delete);
+  }
+
+  private static String processKeys(String[] columnNames, BuiltStatement delete) {
+    BuiltStatement query = null;
     boolean isWhereNeeded = true;
     for (String columnName : columnNames) {
       if (isWhereNeeded) {
-        query = delete.where(QueryBuilder.eq(columnName, "?"));
+        if (delete instanceof Delete) {
+          query = ((Delete) delete).where(QueryBuilder.eq(columnName, "?"));
+        } else {
+          query = ((Select) delete).where(QueryBuilder.eq(columnName, "?"));
+        }
         isWhereNeeded = false;
       } else {
-        query = query.and(QueryBuilder.eq(columnName, "?"));
+        if (delete instanceof Delete) {
+          query = ((Delete.Where) query).and(QueryBuilder.eq(columnName, "?"));
+        } else {
+          query = ((Select.Where) query).and(QueryBuilder.eq(columnName, "?"));
+        }
       }
     }
-    return query.getQueryString();
+    return query != null ? query.getQueryString() : null;
   }
 
   /**
@@ -280,17 +293,7 @@ class CassandraQueryFactory {
       select.allowFiltering();
     }
     String[] columnNames = getColumnNames(mapping, keyFields);
-    Select.Where query = null;
-    boolean isWhereNeeded = true;
-    for (String columnName : columnNames) {
-      if (isWhereNeeded) {
-        query = select.where(QueryBuilder.eq(columnName, "?"));
-        isWhereNeeded = false;
-      } else {
-        query = query.and(QueryBuilder.eq(columnName, "?"));
-      }
-    }
-    return query.getQueryString();
+    return processKeys(columnNames, select);
   }
 
   /**
@@ -309,17 +312,7 @@ class CassandraQueryFactory {
       select.allowFiltering();
     }
     String[] columnNames = getColumnNames(mapping, keyFields);
-    Select.Where query = null;
-    boolean isWhereNeeded = true;
-    for (String columnName : columnNames) {
-      if (isWhereNeeded) {
-        query = select.where(QueryBuilder.eq(columnName, "?"));
-        isWhereNeeded = false;
-      } else {
-        query = query.and(QueryBuilder.eq(columnName, "?"));
-      }
-    }
-    return query.getQueryString();
+    return processKeys(columnNames, select);
   }
 
   /**
@@ -373,10 +366,6 @@ class CassandraQueryFactory {
    * @return CQL Query
    */
   static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, String[] fields) {
-    Object startKey = cassandraQuery.getStartKey();
-    Object endKey = cassandraQuery.getEndKey();
-    Object key = cassandraQuery.getKey();
-    String primaryKey = null;
     long limit = cassandraQuery.getLimit();
     Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName());
     if (limit > 0) {
@@ -385,7 +374,15 @@ class CassandraQueryFactory {
     if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
       select.allowFiltering();
     }
-    Select.Where query = null;
+    return processQuery(cassandraQuery, select, mapping, objects);
+  }
+
+  private static String processQuery(Query cassandraQuery, BuiltStatement select, CassandraMapping mapping, List<Object> objects) {
+    String primaryKey = null;
+    BuiltStatement query = null;
+    Object startKey = cassandraQuery.getStartKey();
+    Object endKey = cassandraQuery.getEndKey();
+    Object key = cassandraQuery.getKey();
     boolean isWhereNeeded = true;
     if (key != null) {
       if (mapping.getCassandraKey() != null) {
@@ -395,17 +392,35 @@ class CassandraQueryFactory {
         String[] columnKeys = getColumnNames(mapping, cassandraKeys);
         for (int i = 0; i < cassandraKeys.size(); i++) {
           if (isWhereNeeded) {
-            query = select.where(QueryBuilder.eq(columnKeys[i], "?"));
+            if (select instanceof Select) {
+              query = ((Select) select).where(QueryBuilder.eq(columnKeys[i], "?"));
+            } else if (select instanceof Delete) {
+              query = ((Delete) select).where(QueryBuilder.eq(columnKeys[i], "?"));
+            } else {
+              query = ((Update.Assignments) select).where(QueryBuilder.eq(columnKeys[i], "?"));
+            }
             objects.add(cassandraValues.get(i));
             isWhereNeeded = false;
           } else {
-            query = query.and(QueryBuilder.eq(columnKeys[i], "?"));
+            if (select instanceof Select) {
+              query = ((Select.Where) query).and(QueryBuilder.eq(columnKeys[i], "?"));
+            } else if (select instanceof Delete) {
+              query = ((Delete.Where) query).and(QueryBuilder.eq(columnKeys[i], "?"));
+            } else {
+              query = ((Update.Where) query).and(QueryBuilder.eq(columnKeys[i], "?"));
+            }
             objects.add(cassandraValues.get(i));
           }
         }
       } else {
         primaryKey = getPKey(mapping.getFieldList());
-        query = select.where(QueryBuilder.eq(primaryKey, "?"));
+        if (select instanceof Select) {
+          query = ((Select) select).where(QueryBuilder.eq(primaryKey, "?"));
+        } else if (select instanceof Delete) {
+          query = ((Delete) select).where(QueryBuilder.eq(primaryKey, "?"));
+        } else {
+          query = ((Update.Assignments) select).where(QueryBuilder.eq(primaryKey, "?"));
+        }
         objects.add(key);
       }
     } else {
@@ -417,17 +432,44 @@ class CassandraQueryFactory {
           String[] columnKeys = getColumnNames(mapping, cassandraKeys);
           for (int i = 0; i < cassandraKeys.size(); i++) {
             if (isWhereNeeded) {
-              query = select.where(QueryBuilder.gte(columnKeys[i], "?"));
+              if (select instanceof Select) {
+                query = ((Select) select).where(QueryBuilder.gte(columnKeys[i], "?"));
+              } else if (select instanceof Delete) {
+                /*
+                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
+                 */
+                throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
+              } else {
+                query = ((Update.Assignments) select).where(QueryBuilder.gte(columnKeys[i], "?"));
+              }
               objects.add(cassandraValues.get(i));
               isWhereNeeded = false;
             } else {
-              query = query.and(QueryBuilder.gte(columnKeys[i], "?"));
+              if (select instanceof Select) {
+                query = ((Select.Where) query).and(QueryBuilder.gte(columnKeys[i], "?"));
+              } else if (select instanceof Delete) {
+                       /*
+                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
+                 */
+                throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
+              } else {
+                query = ((Update.Where) query).and(QueryBuilder.gte(columnKeys[i], "?"));
+              }
               objects.add(cassandraValues.get(i));
             }
           }
         } else {
           primaryKey = getPKey(mapping.getFieldList());
-          query = select.where(QueryBuilder.gte(primaryKey, "?"));
+          if (select instanceof Select) {
+            query = ((Select) select).where(QueryBuilder.gte(primaryKey, "?"));
+          } else if (select instanceof Delete) {
+                           /*
+                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
+                 */
+            throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
+          } else {
+            query = ((Update.Assignments) select).where(QueryBuilder.gte(primaryKey, "?"));
+          }
           objects.add(startKey);
           isWhereNeeded = false;
         }
@@ -440,20 +482,56 @@ class CassandraQueryFactory {
           String[] columnKeys = getColumnNames(mapping, cassandraKeys);
           for (int i = 0; i < cassandraKeys.size(); i++) {
             if (isWhereNeeded) {
-              query = select.where(QueryBuilder.lte(columnKeys[i], "?"));
+              if (select instanceof Select) {
+                query = ((Select) select).where(QueryBuilder.lte(columnKeys[i], "?"));
+              } else if (select instanceof Delete) {
+                               /*
+                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
+                 */
+                throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
+              } else {
+                query = ((Update.Assignments) select).where(QueryBuilder.lte(columnKeys[i], "?"));
+              }
               objects.add(cassandraValues.get(i));
               isWhereNeeded = false;
             } else {
-              query = query.and(QueryBuilder.lte(columnKeys[i], "?"));
+              if (select instanceof Select) {
+                query = ((Select.Where) query).and(QueryBuilder.lte(columnKeys[i], "?"));
+              } else if (select instanceof Delete) {
+                               /*
+                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
+                 */
+                throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
+              } else {
+                query = ((Update.Where) query).and(QueryBuilder.lte(columnKeys[i], "?"));
+              }
               objects.add(cassandraValues.get(i));
             }
           }
         } else {
           primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList());
           if (isWhereNeeded) {
-            query = select.where(QueryBuilder.lte(primaryKey, "?"));
+            if (select instanceof Select) {
+              query = ((Select) select).where(QueryBuilder.lte(primaryKey, "?"));
+            } else if (select instanceof Delete) {
+                             /*
+                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
+                 */
+              throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
+            } else {
+              query = ((Update.Assignments) select).where(QueryBuilder.lte(primaryKey, "?"));
+            }
           } else {
-            query = query.and(QueryBuilder.lte(primaryKey, "?"));
+            if (select instanceof Select) {
+              query = ((Select.Where) query).and(QueryBuilder.lte(primaryKey, "?"));
+            } else if (select instanceof Delete) {
+              /*
+                According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet.
+                */
+              throw new RuntimeException("Delete by Query is not suppoted for Key Ranges.");
+            } else {
+              query = ((Update.Where) query).and(QueryBuilder.lte(primaryKey, "?"));
+            }
           }
           objects.add(endKey);
         }
@@ -462,7 +540,7 @@ class CassandraQueryFactory {
     if (startKey == null && endKey == null && key == null) {
       return select.getQueryString();
     }
-    return query.getQueryString();
+    return query != null ? query.getQueryString() : null;
   }
 
   private static String[] getColumnNames(CassandraMapping mapping, List<String> fields) {
@@ -509,94 +587,13 @@ class CassandraQueryFactory {
     if (cassandraQuery.getFields() != null) {
       columns = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
     }
-    Object startKey = cassandraQuery.getStartKey();
-    Object endKey = cassandraQuery.getEndKey();
-    Object key = cassandraQuery.getKey();
-    String primaryKey = null;
     Delete delete;
     if (columns != null) {
       delete = QueryBuilder.delete(columns).from(mapping.getKeySpace().getName(), mapping.getCoreName());
     } else {
       delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName());
     }
-    Delete.Where query = null;
-    boolean isWhereNeeded = true;
-    if (key != null) {
-      if (mapping.getCassandraKey() != null) {
-        ArrayList<String> cassandraKeys = new ArrayList<>();
-        ArrayList<Object> cassandraValues = new ArrayList<>();
-        AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
-        String[] columnKeys = getColumnNames(mapping, cassandraKeys);
-        for (int i = 0; i < cassandraKeys.size(); i++) {
-          if (isWhereNeeded) {
-            query = delete.where(QueryBuilder.eq(columnKeys[i], "?"));
-            objects.add(cassandraValues.get(i));
-            isWhereNeeded = false;
-          } else {
-            query = query.and(QueryBuilder.eq(columnKeys[i], "?"));
-            objects.add(cassandraValues.get(i));
-          }
-        }
-      } else {
-        primaryKey = getPKey(mapping.getFieldList());
-        query = delete.where(QueryBuilder.eq(primaryKey, "?"));
-        objects.add(key);
-      }
-    } else {
-      if (startKey != null) {
-        if (mapping.getCassandraKey() != null) {
-          ArrayList<String> cassandraKeys = new ArrayList<>();
-          ArrayList<Object> cassandraValues = new ArrayList<>();
-          AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues);
-          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
-          for (int i = 0; i < cassandraKeys.size(); i++) {
-            if (isWhereNeeded) {
-              query = delete.where(QueryBuilder.gte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
-              objects.add(cassandraValues.get(i));
-              isWhereNeeded = false;
-            } else {
-              query = query.and(QueryBuilder.gte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
-              objects.add(cassandraValues.get(i));
-            }
-          }
-        } else {
-          primaryKey = getPKey(mapping.getFieldList());
-          query = delete.where(QueryBuilder.gte(QueryBuilder.token(primaryKey), QueryBuilder.token("?")));
-          objects.add(startKey);
-          isWhereNeeded = false;
-        }
-      }
-      if (endKey != null) {
-        if (mapping.getCassandraKey() != null) {
-          ArrayList<String> cassandraKeys = new ArrayList<>();
-          ArrayList<Object> cassandraValues = new ArrayList<>();
-          AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues);
-          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
-          for (int i = 0; i < cassandraKeys.size(); i++) {
-            if (isWhereNeeded) {
-              query = delete.where(QueryBuilder.lte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
-              objects.add(cassandraValues.get(i));
-              isWhereNeeded = false;
-            } else {
-              query = query.and(QueryBuilder.lte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
-              objects.add(cassandraValues.get(i));
-            }
-          }
-        } else {
-          primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList());
-          if (isWhereNeeded) {
-            query = delete.where(QueryBuilder.lte(QueryBuilder.token(primaryKey), QueryBuilder.token("?")));
-          } else {
-            query = query.and(QueryBuilder.lte(QueryBuilder.token(primaryKey), QueryBuilder.token("?")));
-          }
-          objects.add(endKey);
-        }
-      }
-    }
-    if (startKey == null && endKey == null && key == null) {
-      return delete.getQueryString();
-    }
-    return query.getQueryString();
+    return processQuery(cassandraQuery, delete, mapping, objects);
   }
 
   /**
@@ -608,12 +605,12 @@ class CassandraQueryFactory {
    * @param objects        field Objects list
    * @return CQL Query
    */
-  static String getUpdateByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) {
+  static String getUpdateByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, Schema schema) {
     Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName());
     Update.Assignments updateAssignments = null;
     if (cassandraQuery instanceof CassandraQuery) {
       String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
-      if (CassandraNativePersistent.class.isAssignableFrom(mapping.getPersistentClass())) {
+      if (((CassandraStore) cassandraQuery.getDataStore()).getSerializationType().equalsIgnoreCase("NATIVE")) {
         for (String column : columnNames) {
           updateAssignments = update.with(QueryBuilder.set(column, "?"));
           objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName()));
@@ -624,159 +621,33 @@ class CassandraQueryFactory {
           Field field = mapping.getFieldFromColumnName(column);
           Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field.getFieldName());
           try {
-            Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null);
             Schema schemaField = schema.getField(field.getFieldName()).schema();
             objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value, field));
-          } catch (IllegalAccessException | NoSuchFieldException e) {
-            throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler.");
           } catch (NullPointerException e) {
             throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + ".");
           }
         }
       }
-    }
-    String primaryKey = null;
-    Update.Where query = null;
-    Object startKey = cassandraQuery.getStartKey();
-    Object endKey = cassandraQuery.getEndKey();
-    Object key = cassandraQuery.getKey();
-    boolean isWhereNeeded = true;
-    if (key != null) {
-      if (mapping.getCassandraKey() != null) {
-        ArrayList<String> cassandraKeys = new ArrayList<>();
-        ArrayList<Object> cassandraValues = new ArrayList<>();
-        AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
-        String[] columnKeys = getColumnNames(mapping, cassandraKeys);
-        for (int i = 0; i < cassandraKeys.size(); i++) {
-          if (isWhereNeeded) {
-            query = updateAssignments.where(QueryBuilder.eq(columnKeys[i], "?"));
-            objects.add(cassandraValues.get(i));
-            isWhereNeeded = false;
-          } else {
-            query = query.and(QueryBuilder.eq(columnKeys[i], "?"));
-            objects.add(cassandraValues.get(i));
-          }
-        }
-      } else {
-        primaryKey = getPKey(mapping.getFieldList());
-        query = updateAssignments.where(QueryBuilder.eq(primaryKey, "?"));
-        objects.add(key);
-      }
     } else {
-      if (startKey != null) {
-        if (mapping.getCassandraKey() != null) {
-          ArrayList<String> cassandraKeys = new ArrayList<>();
-          ArrayList<Object> cassandraValues = new ArrayList<>();
-          AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues);
-          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
-          for (int i = 0; i < cassandraKeys.size(); i++) {
-            if (isWhereNeeded) {
-              query = updateAssignments.where(QueryBuilder.gte(columnKeys[i], "?"));
-              objects.add(cassandraValues.get(i));
-              isWhereNeeded = false;
-            } else {
-              query = query.and(QueryBuilder.gte(columnKeys[i], "?"));
-              objects.add(cassandraValues.get(i));
-            }
-          }
-        } else {
-          primaryKey = getPKey(mapping.getFieldList());
-          query = updateAssignments.where(QueryBuilder.gte(primaryKey, "?"));
-          objects.add(startKey);
-          isWhereNeeded = false;
-        }
-      }
-      if (endKey != null) {
-        if (mapping.getCassandraKey() != null) {
-          ArrayList<String> cassandraKeys = new ArrayList<>();
-          ArrayList<Object> cassandraValues = new ArrayList<>();
-          AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues);
-          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
-          for (int i = 0; i < cassandraKeys.size(); i++) {
-            if (isWhereNeeded) {
-              query = updateAssignments.where(QueryBuilder.lte(columnKeys[i], "?"));
-              objects.add(cassandraValues.get(i));
-              isWhereNeeded = false;
-            } else {
-              query = query.and(QueryBuilder.lte(columnKeys[i], "?"));
-              objects.add(cassandraValues.get(i));
-            }
-          }
-        } else {
-          primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList());
-          if (isWhereNeeded) {
-            query = updateAssignments.where(QueryBuilder.lte(primaryKey, "?"));
-          } else {
-            query = query.and(QueryBuilder.lte(primaryKey, "?"));
-          }
-          objects.add(endKey);
-        }
-      }
-    }
-    if (startKey == null && endKey == null && key == null) {
-      return updateAssignments.getQueryString();
+      throw new RuntimeException("Please use Cassandra Query object to invoke, UpdateByQuery method.");
     }
-    return query.getQueryString();
-  }
 
-  /**
-   * This method returns create Type CQL query to create user define types.
-   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/cqlRefcreateType.html
-   *
-   * @param fieldSchema avroSchema {@link Schema}
-   * @param mapping Cassandra mapping {@link CassandraMapping}
-   * @return CQL Query
-   */
-  static String getCreateUDTType(Schema fieldSchema, CassandraMapping mapping, Set<String> udtQueryStack) {
-    StringBuilder stringBuffer = new StringBuilder();
-    if (fieldSchema.getType().equals(Schema.Type.UNION)) {
-      for (Schema fieldTypeSchema : fieldSchema.getTypes()) {
-        if (fieldTypeSchema.getType().equals(Schema.Type.RECORD)) {
-          fieldSchema = fieldTypeSchema;
-          break;
-        }
-      }
-    }
-    stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(fieldSchema.getName()).append(" (");
-    processRecord(fieldSchema, stringBuffer, mapping, udtQueryStack);
-    stringBuffer.append(")");
-    return stringBuffer.toString();
+    return processQuery(cassandraQuery, updateAssignments, mapping, objects);
   }
 
-  private static void processRecord(Schema recordSchema, StringBuilder stringBuilder, CassandraMapping mapping, Set<String> udtQueryStack) {
-    boolean isCommaNeeded = false;
-    for (Schema.Field field : recordSchema.getFields()) {
-      if (isCommaNeeded) {
-        stringBuilder.append(", ");
-      }
-      String fieldName = field.name();
-      stringBuilder.append(fieldName).append(" ");
-      try {
-        populateFieldsToQuery(field.schema(), stringBuilder, mapping, udtQueryStack);
-        isCommaNeeded = true;
-      } catch (Exception e) {
-        int i = stringBuilder.indexOf(fieldName);
-        if (i != -1) {
-          stringBuilder.delete(i, i + fieldName.length());
-          isCommaNeeded = false;
-        }
-      }
-    }
-  }
-
-  private static void populateFieldsToQuery(Schema schema, StringBuilder builder, CassandraMapping mapping, Set<String> udtQueryStack) throws Exception {
+  private static void populateFieldsToQuery(Schema schema, StringBuilder builder) throws Exception {
     switch (schema.getType()) {
       case INT:
         builder.append("int");
         break;
       case MAP:
         builder.append("map<text,");
-        populateFieldsToQuery(schema.getValueType(), builder, mapping, udtQueryStack);
+        populateFieldsToQuery(schema.getValueType(), builder);
         builder.append(">");
         break;
       case ARRAY:
         builder.append("list<");
-        populateFieldsToQuery(schema.getElementType(), builder, mapping, udtQueryStack);
+        populateFieldsToQuery(schema.getElementType(), builder);
         builder.append(">");
         break;
       case LONG:
@@ -796,8 +667,6 @@ class CassandraQueryFactory {
         break;
       case RECORD:
         builder.append("frozen<").append(schema.getName()).append(">");
-        String query = getCreateUDTType(schema, mapping, udtQueryStack);
-        udtQueryStack.add(query);
         break;
       case STRING:
       case FIXED:
@@ -810,15 +679,13 @@ class CassandraQueryFactory {
             String recordName = unionElementSchema.getName();
             if (!builder.toString().contains(recordName)) {
               builder.append("frozen<").append(recordName).append(">");
-              query = getCreateUDTType(unionElementSchema, mapping, udtQueryStack);
-              udtQueryStack.add(query);
             } else {
               LOG.warn("Same Field Type can't be mapped recursively. This is not supported with Cassandra UDT types, Please use byte dataType for recursive mapping.");
               throw new Exception("Same Field Type has mapped recursively");
             }
             break;
           } else if (!unionElementSchema.getType().equals(Schema.Type.NULL)) {
-            populateFieldsToQuery(unionElementSchema, builder, mapping, udtQueryStack);
+            populateFieldsToQuery(unionElementSchema, builder);
             break;
           }
         }
@@ -826,4 +693,127 @@ class CassandraQueryFactory {
     }
   }
 
+  static void processRecord(Schema recordSchema, StringBuilder stringBuilder) {
+    boolean isCommaNeeded = false;
+    for (Schema.Field field : recordSchema.getFields()) {
+      if (isCommaNeeded) {
+        stringBuilder.append(", ");
+      }
+      String fieldName = field.name();
+      stringBuilder.append(fieldName).append(" ");
+      try {
+        populateFieldsToQuery(field.schema(), stringBuilder);
+        isCommaNeeded = true;
+      } catch (Exception e) {
+        int i = stringBuilder.indexOf(fieldName);
+        if (i != -1) {
+          stringBuilder.delete(i, i + fieldName.length());
+          isCommaNeeded = false;
+        }
+      }
+    }
+  }
+
+  static String getCreateUDTTypeForNative(CassandraMapping mapping, Class persistentClass, String udtType, String fieldName) throws NoSuchFieldException {
+    StringBuilder stringBuffer = new StringBuilder();
+    Class udtClass = persistentClass.getDeclaredField(fieldName).getType();
+    UDT annotation = (UDT) udtClass.getAnnotation(UDT.class);
+    if (annotation != null) {
+      stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" (");
+      boolean isCommaNeeded = false;
+      for (java.lang.reflect.Field udtField : udtClass.getDeclaredFields()) {
+        com.datastax.driver.mapping.annotations.Field fieldAnnotation = udtField.getDeclaredAnnotation(com.datastax.driver.mapping.annotations.Field.class);
+        if (fieldAnnotation != null) {
+          if (isCommaNeeded) {
+            stringBuffer.append(", ");
+          }
+          if(!fieldAnnotation.name().isEmpty()) {
+            stringBuffer.append(fieldAnnotation.name()).append(" ");
+          } else {
+            stringBuffer.append(udtField.getName()).append(" ");
+          }
+          stringBuffer.append(dataType(udtField, null));
+          isCommaNeeded = true;
+        }
+      }
+      stringBuffer.append(")");
+    } else {
+      throw new RuntimeException("");
+    }
+    return stringBuffer.toString();
+  }
+
+  static String getCreateUDTTypeForAvro(CassandraMapping mapping, String udtType, Schema fieldSchema) {
+    StringBuilder stringBuffer = new StringBuilder();
+    stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" (");
+    CassandraQueryFactory.processRecord(fieldSchema, stringBuffer);
+    stringBuffer.append(")");
+    return stringBuffer.toString();
+  }
+
+  private static String dataType(java.lang.reflect.Field field, Type fieldType) {
+    String type;
+    if (field != null) {
+      type = field.getType().getName();
+    } else {
+      type = fieldType.getTypeName();
+    }
+    String dataType;
+    String s = type;
+    if (s.equals("java.lang.String") || s.equals("java.lang.CharSequence")) {
+      dataType = "text";
+    } else if (s.equals("int") || s.equals("java.lang.Integer")) {
+      dataType = "int";
+    } else if (s.equals("double") || s.equals("java.lang.Double")) {
+      dataType = "double";
+    }  else if (s.equals("float") || s.equals("java.lang.Float")) {
+      dataType = "float";
+    } else if (s.equals("boolean") || s.equals("java.lang.Boolean")) {
+      dataType = "boolean";
+    } else if (s.equals("java.util.UUID")) {
+      dataType = "uuid";
+    } else if (s.equals("java.lang.Long")) {
+      dataType = "bigint";
+    }else if (s.equals("java.math.BigDecimal")) {
+      dataType = "decimal";
+    }else if (s.equals("java.net.InetAddress")) {
+      dataType = "inet";
+    }else if (s.equals("java.math.BigInteger")) {
+      dataType = "varint";
+    } else if (s.equals("java.nio.ByteBuffer")) {
+      dataType = "blob";
+    }else if (s.contains("Map")) {
+      ParameterizedType mapType;
+      if (field != null) {
+        mapType = (ParameterizedType) field.getGenericType();
+      } else {
+        mapType = (ParameterizedType) fieldType;
+      }
+      Type value1 = mapType.getActualTypeArguments()[0];
+      Type value2 = mapType.getActualTypeArguments()[1];
+      dataType = "map<" +dataType(null, value1) + "," + dataType(null, value2) + ">";
+    } else if (s.contains("List")) {
+      ParameterizedType listType;
+      if (field != null) {
+        listType = (ParameterizedType) field.getGenericType();
+      } else {
+        listType = (ParameterizedType) fieldType;
+      }
+      Type value = listType.getActualTypeArguments()[0];
+      dataType = "list<" + dataType(null, value) + ">";
+    } else if (s.contains("Set")) {
+      ParameterizedType setType;
+      if (field != null) {
+        setType = (ParameterizedType) field.getGenericType();
+      } else {
+        setType = (ParameterizedType) fieldType;
+      }
+      Type value = setType.getActualTypeArguments()[0];
+      dataType = "set<" + dataType(null, value) + ">";
+    } else {
+      throw new RuntimeException("Unsupported Cassandra DataType");
+    }
+    return dataType;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
index 17e0568..651dfdb 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
@@ -26,6 +26,7 @@ import org.apache.gora.cassandra.store.CassandraClient;
 import org.apache.gora.cassandra.store.CassandraMapping;
 import org.apache.gora.cassandra.store.CassandraStore;
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
@@ -33,10 +34,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.LinkedHashSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Set;
+import java.util.Map;
 
 /**
  * This is the abstract Cassandra Serializer class.
@@ -47,14 +48,23 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
 
   protected Class<T> persistentClass;
 
+  private Map<String, String> userDefineTypeMaps;
+
   protected CassandraMapping mapping;
+  private Schema persistentSchema;
   CassandraClient client;
 
-  CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) {
+  CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping, Schema schema) {
     this.keyClass = keyClass;
     this.persistentClass = persistantClass;
     this.client = cc;
     this.mapping = mapping;
+    persistentSchema = schema;
+    try {
+      analyzePersistent();
+    } catch (Exception e) {
+      throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage());
+    }
   }
 
   /**
@@ -68,24 +78,59 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
    * @param <T>       persistent class
    * @return Serializer
    */
-  public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) {
+  public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping, Schema schema) {
     CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH));
     CassandraSerializer serializer;
     switch (serType) {
       case AVRO:
-        serializer = new AvroSerializer(cc, dataStore, mapping);
+        serializer = new AvroSerializer(cc, dataStore, mapping, schema);
         break;
       case NATIVE:
       default:
-        serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
+        serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping, schema);
     }
     return serializer;
   }
 
+  private void analyzePersistent() throws Exception {
+    userDefineTypeMaps = new HashMap<>();
+    for (Field field : mapping.getFieldList()) {
+      String fieldType = field.getType();
+      if (fieldType.contains("frozen")) {
+        String udtType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">"));
+        if (this instanceof AvroSerializer) {
+          if (PersistentBase.class.isAssignableFrom(persistentClass)) {
+            Schema fieldSchema = persistentSchema.getField(field.getFieldName()).schema();
+            if (fieldSchema.getType().equals(Schema.Type.UNION)) {
+              for (Schema currentSchema : fieldSchema.getTypes()) {
+                if (currentSchema.getType().equals(Schema.Type.RECORD)) {
+                  fieldSchema = currentSchema;
+                  break;
+                }
+              }
+            }
+            String createQuery = CassandraQueryFactory.getCreateUDTTypeForAvro(mapping, udtType, fieldSchema);
+            userDefineTypeMaps.put(udtType, createQuery);
+          } else {
+            throw new RuntimeException("Unsupported Class for User Define Types, Please use PersistentBase class. field : " + udtType);
+          }
+        } else {
+          String createQuery = CassandraQueryFactory.getCreateUDTTypeForNative(mapping, persistentClass, udtType, field.getFieldName());
+          userDefineTypeMaps.put(udtType, createQuery);
+        }
+      }
+    }
+
+  }
+
+
   public void createSchema() {
     LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName());
     this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping));
-    processUDTSchemas(); //TODO complete functionality
+    for (Map.Entry udtType : userDefineTypeMaps.entrySet()) {
+      LOG.debug("creating Cassandra User Define Type {}", udtType.getKey());
+      this.client.getSession().execute((String) udtType.getValue());
+    }
     LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName());
     this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping));
   }
@@ -116,32 +161,6 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
     }
   }
 
-  private void processUDTSchemas() {
-    Set<String> schemaStack = new LinkedHashSet<>();
-    for (Field field : mapping.getFieldList()) {
-      if (field.getType().contains("frozen")) {
-        try {
-          Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null);
-          Schema schemaField = schema.getField(field.getFieldName()).schema();
-          String cqlQuery = CassandraQueryFactory.getCreateUDTType(schemaField, mapping, schemaStack);
-          schemaStack.add(cqlQuery);
-        } catch (IllegalAccessException | NoSuchFieldException e) {
-          throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler.");
-        } catch (NullPointerException e) {
-          throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + ".");
-        }
-      }
-    }
-    createUserDefineTypes(schemaStack);
-
-  }
-
-  private void createUserDefineTypes(Set<String> queries) {
-    for (String cqlQuery : queries) {
-      this.client.getSession().execute(cqlQuery);
-    }
-  }
-
   protected String[] getFields() {
     List<String> fields = new ArrayList<>();
     for (Field field : mapping.getFieldList()) {
@@ -162,7 +181,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
 
   public boolean updateByQuery(Query query) {
     List<Object> objectArrayList = new ArrayList<>();
-    String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, objectArrayList);
+    String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, objectArrayList, persistentSchema);
     ResultSet results;
     if (objectArrayList.size() == 0) {
       results = client.getSession().execute(cqlQuery);

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
index f8bb066..abff409 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
@@ -21,9 +21,9 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 import com.datastax.driver.mapping.Result;
+import org.apache.avro.Schema;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.gora.cassandra.bean.Field;
-import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
 import org.apache.gora.cassandra.query.CassandraResultSet;
 import org.apache.gora.cassandra.store.CassandraClient;
 import org.apache.gora.cassandra.store.CassandraMapping;
@@ -41,14 +41,14 @@ import java.util.List;
 /**
  * This Class contains the operation relates to Native Serialization.
  */
-class NativeSerializer<K, T extends CassandraNativePersistent> extends CassandraSerializer {
+class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
 
   private static final Logger LOG = LoggerFactory.getLogger(NativeSerializer.class);
 
   private Mapper<T> mapper;
 
-  NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
-    super(cassandraClient, keyClass, persistentClass, mapping);
+  NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping, Schema schema) {
+      super(cassandraClient, keyClass, persistentClass, mapping, schema);
     this.createSchema();
     MappingManager mappingManager = new MappingManager(cassandraClient.getSession());
     mapper = mappingManager.mapper(persistentClass);
@@ -131,7 +131,7 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
         break;
       }
     }
-    K key = null;
+    K key;
     Method keyMethod = null;
     try {
       for (Method method : this.persistentClass.getMethods()) {

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
index ac46a30..807a99d 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
@@ -51,6 +51,7 @@ public class CassandraMapping {
 
   /**
    * This method returns the KeySpace in the mapping file,
+   *
    * @return Key space {@link KeySpace}
    */
   public KeySpace getKeySpace() {
@@ -59,6 +60,7 @@ public class CassandraMapping {
 
   /**
    * This method set the KeySpace in the Cassandra mapping.
+   *
    * @param keySpace Key space {@link KeySpace}
    */
   void setKeySpace(KeySpace keySpace) {
@@ -67,6 +69,7 @@ public class CassandraMapping {
 
   /**
    * Thi method returns only the fields which belongs only for the Persistent Object.
+   *
    * @return List of Fields
    */
   public List<Field> getFieldList() {
@@ -75,6 +78,7 @@ public class CassandraMapping {
 
   /**
    * This method returns the Persistent Object's Field from the mapping, according to the FieldName.
+   *
    * @param fieldName Field Name
    * @return Field {@link Field}
    */
@@ -89,6 +93,7 @@ public class CassandraMapping {
 
   /**
    * This method returns the Persistent Object's Field from the mapping, according to the ColumnName.
+   *
    * @param columnName Column Name
    * @return Field {@link Field}
    */
@@ -103,6 +108,7 @@ public class CassandraMapping {
 
   /**
    * This method returns the Field Names
+   *
    * @return array of Field Names
    */
   public String[] getFieldNames() {
@@ -116,6 +122,7 @@ public class CassandraMapping {
 
   /**
    * This method returns
+   *
    * @return
    */
   public String[] getAllFieldsIncludingKeys() {
@@ -133,7 +140,6 @@ public class CassandraMapping {
   }
 
   /**
-   *
    * @return
    */
   public String[] getAllKeys() {

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
index f151458..1d787de 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
@@ -242,6 +242,9 @@ class CassandraMappingBuilder<K, T extends Persistent> {
             attributeValue = attributeValue.replace("udt(", "frozen(");
           }
           fieldKey.setType(attributeValue.replace("(", "<").replace(")", ">"));
+          if (fieldKey.getType().equalsIgnoreCase("udt")) {
+            throw new RuntimeException("Invalid udt type, Please enter dataType for udt with a unique name for particular user define data type, like udt(metadata).");
+          }
           break;
         default:
           fieldKey.addProperty(attributeName, attributeValue);

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index c481610..5e31fcc 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -17,12 +17,12 @@
 
 package org.apache.gora.cassandra.store;
 
-import org.apache.avro.data.RecordBuilder;
-import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
+import org.apache.avro.Schema;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.serializers.CassandraSerializer;
 import org.apache.gora.persistency.BeanFactory;
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -54,16 +54,23 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
 
   private Class<K> keyClass;
 
+  private Schema persistentSchema;
+
   private Class<T> persistentClass;
 
   private CassandraMapping mapping;
 
   private CassandraSerializer cassandraSerializer;
+  private String serializationType;
 
   public CassandraStore() {
     super();
   }
 
+  public String getSerializationType() {
+    return serializationType;
+  }
+
   /**
    * In initializing the cassandra datastore, read the mapping file, creates the basic connection to cassandra cluster,
    * according to the gora properties
@@ -77,12 +84,21 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     try {
       this.keyClass = keyClass;
       this.persistentClass = persistentClass;
+      if (this.beanFactory == null) {
+        this.beanFactory = new BeanFactoryImpl<>(keyClass, persistentClass);
+      }
+      if (PersistentBase.class.isAssignableFrom(persistentClass)) {
+        persistentSchema = ((PersistentBase) this.beanFactory.getCachedPersistent()).getSchema();
+      } else {
+        persistentSchema = null;
+      }
       String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
+      serializationType = properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE);
       CassandraMappingBuilder mappingBuilder = new CassandraMappingBuilder(this);
       mapping = mappingBuilder.readMapping(mappingFile);
       CassandraClient cassandraClient = new CassandraClient();
       cassandraClient.initialize(properties, mapping);
-      cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), this, mapping);
+      cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, serializationType, this, mapping, persistentSchema);
     } catch (Exception e) {
       throw new RuntimeException("Error while initializing Cassandra store: " + e.getMessage(), e);
     }
@@ -100,7 +116,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
    * This is a setter method to set the class of persistent objects.
    *
    * @param persistentClass class of persistent objects
-   *                        {@link CassandraNativePersistent}
    *                        {@link  org.apache.gora.persistency.Persistent}
    */
   @Override
@@ -153,9 +168,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     try {
       if (beanFactory != null) {
         return this.beanFactory.newPersistent();
-      } else if (PersistentBase.class.isAssignableFrom(persistentClass)) {
-        RecordBuilder builder = (RecordBuilder) persistentClass.getMethod("newBuilder").invoke(null);
-        return (T) RecordBuilder.class.getMethod("build").invoke(builder);
       } else {
         return persistentClass.newInstance();
       }

http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/test/conf/avroUDT/gora-cassandra-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/conf/avroUDT/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/avroUDT/gora-cassandra-mapping.xml
new file mode 100644
index 0000000..9dc3c0b
--- /dev/null
+++ b/gora-cassandra-cql/src/test/conf/avroUDT/gora-cassandra-mapping.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<gora-otd>
+
+    <keyspace name="avroKeySpace" durableWrite="false">
+        <placementStrategy name="SimpleStrategy" replication_factor="1"/>
+    </keyspace>
+
+    <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage"
+           allowFiltering="true"
+           keyspace="avroKeySpace">
+        <field name="url" column="url" type="ascii"/>
+        <field name="content" column="content" type="blob"/>
+        <field name="parsedContent" column="parsedContent" type="list(ascii)"/>
+        <field name="outlinks" column="outlinks" type="map(text,text)"/>
+        <field name="headers" column="headers" type="map(text,text)"/>
+        <field name="byteData" column="byteData" type="map(text,blob)"/>
+        <field name="metadata" column="metadata" type="udt(metadata)"/>
+        <field name="stringData" column="stringData" type="map(text,ascii)"/>
+    </class>
+
+    <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" keyspace="avroKeySpace"
+           allowFiltering="true"
+           table="Employee" compactStorage="true">
+        <field name="name" column="name" type="text"/>
+        <field name="dateOfBirth" column="dob" type="bigint"/>
+        <field name="ssn" column="ssn" type="text"/>
+        <field name="salary" column="salary" type="int"/>
+        <field name="boss" column="boss" type="blob"/>
+        <field name="webpage" column="webpage" type="blob"/>
+    </class>
+
+</gora-otd>


Mime
View raw message