gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject [18/37] gora git commit: Add avro serialization support
Date Wed, 23 Aug 2017 20:55:16 GMT
http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
new file mode 100644
index 0000000..70e0ecf
--- /dev/null
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
@@ -0,0 +1,274 @@
+package org.apache.gora.cassandra.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.cassandra.bean.CassandraKey;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is Utils class for Avro serialization.
+ */
+public class AvroCassandraUtils {
+
+  /**
+   * Default schema index with value "0" used when AVRO Union data types are stored.
+   */
+  private static final int DEFAULT_UNION_SCHEMA = 0;
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroCassandraUtils.class);
+
+  static void processKeys(CassandraMapping cassandraMapping, Object key, List<String> keys, List<Object> values) {
+    CassandraKey cassandraKey = cassandraMapping.getCassandraKey();
+    if (cassandraMapping.isPartitionKeyDefined()) {
+      if (cassandraKey != null) {
+        if (key instanceof PersistentBase) {
+          PersistentBase keyBase = (PersistentBase) key;
+          for (Schema.Field field : keyBase.getSchema().getFields()) {
+            if (cassandraMapping.getFieldFromFieldName(field.name()) != null) {
+              keys.add(field.name());
+              Object value = keyBase.get(field.pos());
+              value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value);
+              values.add(value);
+            } else {
+              LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()});
+            }
+          }
+        } else {
+          LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class});
+        }
+      } else {
+        for (Field field : cassandraMapping.getInlinedDefinedPartitionKeys()) {
+          keys.add(field.getFieldName());
+          values.add(key);
+        }
+      }
+    } else {
+      keys.add(cassandraMapping.getDefaultCassandraKey().getFieldName());
+      values.add(key.toString());
+    }
+  }
+
+  /**
+   * For every field within an object, we pass in a field schema, Type and value.
+   * This enables us to process fields (based on their characteristics)
+   * preparing them for persistence.
+   *
+   * @param fieldSchema the associated field schema
+   * @param type        the field type
+   * @param fieldValue  the field value.
+   * @return
+   */
+  static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue) {
+    switch (type) {
+      case RECORD:
+        PersistentBase persistent = (PersistentBase) fieldValue;
+        PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema());
+        for (Schema.Field member : fieldSchema.getFields()) {
+          if (member.pos() == 0 || !persistent.isDirty()) {
+            continue;
+          }
+          Schema memberSchema = member.schema();
+          Schema.Type memberType = memberSchema.getType();
+          Object memberValue = persistent.get(member.pos());
+          newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue));
+        }
+        fieldValue = newRecord;
+        break;
+      case MAP:
+        Schema valueSchema = fieldSchema.getValueType();
+        Schema.Type valuetype = valueSchema.getType();
+        HashMap<String, Object> map = new HashMap<>();
+        for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) fieldValue).entrySet()) {
+          String mapKey = e.getKey().toString();
+          Object mapValue = e.getValue();
+          mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue);
+          map.put(mapKey, mapValue);
+        }
+        fieldValue = map;
+        break;
+      case ARRAY:
+        valueSchema = fieldSchema.getElementType();
+        valuetype = valueSchema.getType();
+        ArrayList<Object> list = new ArrayList<>();
+        for (Object item : (Collection<?>) fieldValue) {
+          Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item);
+          list.add(value);
+        }
+        fieldValue = list;
+        break;
+      case UNION:
+        // storing the union selected schema, the actual value will
+        // be stored as soon as we get break out.
+        if (fieldValue != null) {
+          int schemaPos = getUnionSchema(fieldValue, fieldSchema);
+          Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+          Schema.Type unionType = unionSchema.getType();
+          fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue);
+        }
+        break;
+      case STRING:
+        fieldValue = fieldValue.toString();
+        break;
+      default:
+        break;
+    }
+    return fieldValue;
+  }
+
+  /**
+   * Given an object and the object schema this function obtains,
+   * from within the UNION schema, the position of the type used.
+   * If no data type can be inferred then we return a default value
+   * of position 0.
+   *
+   * @param pValue
+   * @param pUnionSchema
+   * @return the unionSchemaPosition.
+   */
+  private static int getUnionSchema(Object pValue, Schema pUnionSchema) {
+    int unionSchemaPos = 0;
+//    String valueType = pValue.getClass().getSimpleName();
+    for (Schema currentSchema : pUnionSchema.getTypes()) {
+      Schema.Type schemaType = currentSchema.getType();
+      if (pValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING))
+        return unionSchemaPos;
+      else if (pValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES))
+        return unionSchemaPos;
+      else if (pValue instanceof Integer && schemaType.equals(Schema.Type.INT))
+        return unionSchemaPos;
+      else if (pValue instanceof Long && schemaType.equals(Schema.Type.LONG))
+        return unionSchemaPos;
+      else if (pValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE))
+        return unionSchemaPos;
+      else if (pValue instanceof Float && schemaType.equals(Schema.Type.FLOAT))
+        return unionSchemaPos;
+      else if (pValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN))
+        return unionSchemaPos;
+      else if (pValue instanceof Map && schemaType.equals(Schema.Type.MAP))
+        return unionSchemaPos;
+      else if (pValue instanceof List && schemaType.equals(Schema.Type.ARRAY))
+        return unionSchemaPos;
+      else if (pValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD))
+        return unionSchemaPos;
+      unionSchemaPos++;
+    }
+    // if we weren't able to determine which data type it is, then we return the default
+    return DEFAULT_UNION_SCHEMA;
+  }
+
+  static String encodeFieldKey(final String key) {
+    if (key == null) {
+      return null;
+    }
+    return key.replace(".", "\u00B7")
+            .replace(":", "\u00FF")
+            .replace(";", "\u00FE")
+            .replace(" ", "\u00FD")
+            .replace("%", "\u00FC")
+            .replace("=", "\u00FB");
+  }
+
+  static String decodeFieldKey(final String key) {
+    if (key == null) {
+      return null;
+    }
+    return key.replace("\u00B7", ".")
+            .replace("\u00FF", ":")
+            .replace("\u00FE", ";")
+            .replace("\u00FD", " ")
+            .replace("\u00FC", "%")
+            .replace("\u00FB", "=");
+  }
+
+  static Object getAvroFieldValue(Object value, Schema schema) {
+    Object result;
+    switch (schema.getType()) {
+
+      case MAP:
+        Map<String, Object> rawMap = (Map<String, Object>) value;
+        Map<Utf8, Object> deserializableMap = new HashMap<>();
+        if (rawMap == null) {
+          result = new DirtyMapWrapper(deserializableMap);
+          break;
+        }
+        for (Map.Entry<?, ?> e : rawMap.entrySet()) {
+          Schema innerSchema = schema.getValueType();
+          Object obj = getAvroFieldValue(e.getValue(), innerSchema);
+          if (e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) {
+            deserializableMap.put((Utf8) e.getKey(), obj);
+          } else {
+            deserializableMap.put(new Utf8((String) e.getKey()), obj);
+          }
+        }
+        result = new DirtyMapWrapper<>(deserializableMap);
+        break;
+
+      case ARRAY:
+        List<Object> rawList = (List<Object>) value;
+        List<Object> deserializableList = new ArrayList<>();
+        if (rawList == null) {
+          return new DirtyListWrapper(deserializableList);
+        }
+        for (Object item : rawList) {
+          Object obj = getAvroFieldValue(item, schema.getElementType());
+          deserializableList.add(obj);
+        }
+        result = new DirtyListWrapper<>(deserializableList);
+        break;
+
+      case RECORD:
+        result = (PersistentBase) value;
+        break;
+
+      case UNION:
+        int index = getUnionSchema(value, schema);
+        Schema resolvedSchema = schema.getTypes().get(index);
+        result = getAvroFieldValue(value, resolvedSchema);
+        break;
+
+      case ENUM:
+        result = org.apache.gora.util.AvroUtils.getEnumValue(schema, (String) value);
+        break;
+
+      case BYTES:
+        if (ByteBuffer.class.isAssignableFrom(value.getClass())) {
+          result = value;
+        } else {
+          result = ByteBuffer.wrap((byte[]) value);
+        }
+        break;
+
+      case STRING:
+        if (value instanceof org.apache.avro.util.Utf8) {
+          result = value;
+        } else {
+          result = new Utf8((String) value);
+        }
+        break;
+
+      case INT:
+        result = value;
+        break;
+
+      default:
+        result = value;
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
index 3b626a4..21d548d 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
@@ -17,11 +17,13 @@
 
 package org.apache.gora.cassandra.serializers;
 
-import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
 import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificData;
-import org.apache.gora.cassandra.query.CassandraColumn;
-import org.apache.gora.cassandra.query.CassandraRow;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.query.CassandraResultSet;
 import org.apache.gora.cassandra.store.CassandraClient;
 import org.apache.gora.cassandra.store.CassandraMapping;
 import org.apache.gora.persistency.Persistent;
@@ -29,59 +31,253 @@ import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 /**
- * This class contains the operations relates to Avro Serialization
+ * This class contains the operations relates to Avro Serialization.
  */
 class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
 
 
-  /**
-   * Default schema index with value "0" used when AVRO Union data types are stored
-   */
-  public static final int DEFAULT_UNION_SCHEMA = 0;
+  private static final Logger LOG = LoggerFactory.getLogger(AvroSerializer.class);
+
+  private DataStore<K, T> cassandraDataStore;
 
-  AvroSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
-    super(cassandraClient, keyClass, persistentClass, mapping);
+  AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) {
+    super(cassandraClient, dataStore.getKeyClass(), dataStore.getKeyClass(), mapping);
+    this.cassandraDataStore = dataStore;
   }
 
   @Override
   public Persistent get(Object key, String[] fields) {
-    return null;
+    if (fields == null) {
+      fields = getFields();
+    }
+    ArrayList<String> cassandraKeys = new ArrayList<>();
+    ArrayList<Object> cassandraValues = new ArrayList<>();
+    AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
+    String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields, cassandraKeys);
+    ResultSet resultSet = this.client.getSession().execute(cqlQuery, cassandraValues.toArray());
+    Iterator<Row> iterator = resultSet.iterator();
+    ColumnDefinitions definitions = resultSet.getColumnDefinitions();
+    T obj = null;
+    if (iterator.hasNext()) {
+      obj = cassandraDataStore.newPersistent();
+      Row row = iterator.next();
+      populateValuesToPersistent(row, definitions, obj);
+    }
+    return obj;
   }
 
   @Override
-  public void put(Object key, Persistent value) {
-
+  public void put(Object key, Persistent persistent) {
+    if (persistent instanceof PersistentBase) {
+      if (persistent.isDirty()) {
+        PersistentBase persistentBase = (PersistentBase) persistent;
+        ArrayList<String> fields = new ArrayList<>();
+        ArrayList<Object> values = new ArrayList<>();
+        AvroCassandraUtils.processKeys(mapping, key, fields, values);
+        for (Schema.Field f : persistentBase.getSchema().getFields()) {
+          String fieldName = f.name();
+          if (mapping.getFieldFromFieldName(fieldName) == null) {
+            LOG.debug("Ignoring {} adding field, {} field can't find in {} mapping", new Object[]{fieldName, fieldName, persistentClass});
+            continue;
+          }
+          if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKeys().contains(mapping.getFieldFromFieldName(fieldName))) {
+            Object value = persistentBase.get(f.pos());
+            value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value);
+            values.add(value);
+            fields.add(fieldName);
+          }
+        }
+        String cqlQuery = CassandraQueryFactory.getInsertDataQuery(mapping, fields);
+        client.getSession().execute(cqlQuery, values.toArray());
+      } else {
+        LOG.info("Ignored putting persistent bean {} in the store as it is neither "
+                + "new, neither dirty.", new Object[]{persistent});
+      }
+    } else {
+      LOG.error("{} Persistent bean isn't extended by {} .", new Object[]{this.persistentClass, PersistentBase.class});
+    }
   }
 
   @Override
   public Persistent get(Object key) {
-    return null;
+    ArrayList<String> cassandraKeys = new ArrayList<>();
+    ArrayList<Object> cassandraValues = new ArrayList<>();
+    AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
+    String cqlQuery = CassandraQueryFactory.getSelectObjectQuery(mapping, cassandraKeys);
+    ResultSet resultSet = this.client.getSession().execute(cqlQuery, cassandraValues.toArray());
+    Iterator<Row> iterator = resultSet.iterator();
+    ColumnDefinitions definitions = resultSet.getColumnDefinitions();
+    T obj = null;
+    if (iterator.hasNext()) {
+      obj = cassandraDataStore.newPersistent();
+      Row row = iterator.next();
+      populateValuesToPersistent(row, definitions, obj);
+    }
+    return obj;
   }
 
-  @Override
-  public boolean delete(Object key) {
-    return false;
-  }
+  /**
+   * This method wraps result set data in to DataEntry and creates a list of DataEntry.
+   **/
+  private void populateValuesToPersistent(Row row, ColumnDefinitions columnDefinitions, PersistentBase base) {
 
+    Object paramValue;
+    for (Schema.Field avrofield : base.getSchema().getFields()) {
 
-  @Override
-  public Result execute(DataStore dataStore,Query query) {
-    return null;
+      Field field = mapping.getFieldFromFieldName(avrofield.name());
+      //to ignore unspecified fields in the mapping
+      if (field == null) {
+        continue;
+      }
+      Schema fieldSchema = avrofield.schema();
+      String columnName = field.getColumnName();
+      DataType columnType = columnDefinitions.getType(columnName);
+
+      switch (columnType.getName()) {
+        case ASCII:
+          paramValue = row.getString(columnName);
+          break;
+        case BIGINT:
+          paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
+          break;
+        case BLOB:
+          paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
+          break;
+        case BOOLEAN:
+          paramValue = row.isNull(columnName) ? null : row.getBool(columnName);
+          break;
+        case COUNTER:
+          paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
+          break;
+        case DECIMAL:
+          paramValue = row.isNull(columnName) ? null : row.getDecimal(columnName);
+          break;
+        case DOUBLE:
+          paramValue = row.isNull(columnName) ? null : row.getDouble(columnName);
+          break;
+        case FLOAT:
+          paramValue = row.isNull(columnName) ? null : row.getFloat(columnName);
+          break;
+        case INET:
+          paramValue = row.isNull(columnName) ? null : row.getInet(columnName).toString();
+          break;
+        case INT:
+          paramValue = row.isNull(columnName) ? null : row.getInt(columnName);
+          break;
+        case TEXT:
+          paramValue = row.getString(columnName);
+          break;
+        case TIMESTAMP:
+          paramValue = row.isNull(columnName) ? null : row.getDate(columnName);
+          break;
+        case UUID:
+          paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
+          break;
+        case VARCHAR:
+          paramValue = row.getString(columnName);
+          break;
+        case VARINT:
+          paramValue = row.isNull(columnName) ? null : row.getVarint(columnName);
+          break;
+        case TIMEUUID:
+          paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
+          break;
+        case LIST:
+          String dataType = field.getType();
+          dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+          paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
+          break;
+        case SET:
+          dataType = field.getType();
+          dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+          paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
+          break;
+        case MAP:
+          dataType = field.getType();
+          dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+          dataType = dataType.split(",")[1];
+          // Avro supports only String for keys
+          paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, getRelevantClassForCassandraDataType(dataType));
+          break;
+        case UDT:
+          paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName).toString();
+          break;
+        case TUPLE:
+          paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString();
+          break;
+        case CUSTOM:
+          paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
+          break;
+        default:
+          paramValue = row.getString(columnName);
+          break;
+      }
+      Object value = AvroCassandraUtils.getAvroFieldValue(paramValue, fieldSchema);
+      base.put(avrofield.pos(), value);
+    }
+  }
+
+  private Class getRelevantClassForCassandraDataType(String dataType) {
+    switch (dataType) {
+      //// TODO: 7/25/17 support all the datatypes 
+      case "ascii":
+      case "text":
+      case "varchar":
+        return String.class;
+      case "blob":
+        return ByteBuffer.class;
+      default:
+        throw new RuntimeException("Invalid Cassandra DataType");
+    }
   }
 
   @Override
-  public long deleteByQuery(Query query) {
-    return 0;
+  public boolean delete(Object key) {
+    ArrayList<String> cassandraKeys = new ArrayList<>();
+    ArrayList<Object> cassandraValues = new ArrayList<>();
+    AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
+    String cqlQuery = CassandraQueryFactory.getDeleteDataQuery(mapping, cassandraKeys);
+    ResultSet resultSet = this.client.getSession().execute(cqlQuery, cassandraValues.toArray());
+    return resultSet.wasApplied();
   }
 
+
   @Override
-  public boolean updateByQuery(Query query) {
-    return false;
+  public Result execute(DataStore dataStore, Query query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
+    String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList);
+    ResultSet results;
+    if (objectArrayList.size() == 0) {
+      results = client.getSession().execute(cqlQuery);
+    } else {
+      results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+    }
+    Iterator<Row> iterator = results.iterator();
+    ColumnDefinitions definitions = results.getColumnDefinitions();
+    T obj = null;
+    K keyObject = null;
+    long count = 0;
+    while (iterator.hasNext()) {
+      Row row = iterator.next();
+      obj = cassandraDataStore.newPersistent();
+      keyObject = cassandraDataStore.newKey();
+      populateValuesToPersistent(row, definitions, obj);
+      populateValuesToPersistent(row, definitions, (PersistentBase) keyObject);
+      cassandraResult.addResultElement(keyObject, obj);
+      count ++;
+    }
+    cassandraResult.setLimit(count);
+    return cassandraResult;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
index ebb7c20..865a8b3 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
@@ -26,12 +26,15 @@ import org.apache.gora.cassandra.bean.Field;
 import org.apache.gora.cassandra.bean.KeySpace;
 import org.apache.gora.cassandra.bean.PartitionKeyField;
 import org.apache.gora.cassandra.query.CassandraQuery;
-import org.apache.gora.cassandra.query.CassandraRow;
 import org.apache.gora.cassandra.store.CassandraMapping;
 import org.apache.gora.query.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 /**
@@ -39,6 +42,8 @@ import java.util.Map;
  */
 class CassandraQueryFactory {
 
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraQueryFactory.class);
+
   /**
    * This method returns the CQL query to create key space.
    * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html
@@ -83,7 +88,7 @@ class CassandraQueryFactory {
    * It's very much needed to follow the same order in other CRUD operations as well.
    *
    * @param mapping Cassandra mapping
-   * @return CQL
+   * @return CQL Query
    */
   static String getCreateTableQuery(CassandraMapping mapping) {
     StringBuilder stringBuffer = new StringBuilder();
@@ -91,23 +96,6 @@ class CassandraQueryFactory {
     boolean isCommaNeeded = false;
     CassandraKey cassandraKey = mapping.getCassandraKey();
     // appending Cassandra key columns into db schema
-    if (cassandraKey != null) {
-      for (PartitionKeyField partitionKeyField : cassandraKey.getPartitionKeyFields()) {
-        if (partitionKeyField.isComposite()) {
-          for (Field compositeField : partitionKeyField.getFields()) {
-            stringBuffer = processFields(stringBuffer, compositeField, isCommaNeeded);
-          }
-
-        } else {
-          stringBuffer = processFields(stringBuffer, partitionKeyField, isCommaNeeded);
-        }
-        isCommaNeeded = true;
-      }
-      for (ClusterKeyField clusterKeyField : cassandraKey.getClusterKeyFields()) {
-        stringBuffer = processFields(stringBuffer, clusterKeyField, isCommaNeeded);
-      }
-    }
-    // appending Other columns
     for (Field field : mapping.getFieldList()) {
       if (isCommaNeeded) {
         stringBuffer.append(", ");
@@ -151,6 +139,11 @@ class CassandraQueryFactory {
         }
         stringBuffer.append(")");
       }
+    } else {
+      if (!stringBuffer.toString().toLowerCase(Locale.ENGLISH).contains("primary key")) {
+        Field field = mapping.getDefaultCassandraKey();
+        stringBuffer.append(", ").append(field.getFieldName()).append(" ").append(field.getType()).append("  PRIMARY KEY ");
+      }
     }
 
     stringBuffer.append(")");
@@ -196,18 +189,6 @@ class CassandraQueryFactory {
     return stringBuffer.toString();
   }
 
-  private static StringBuilder processFields(StringBuilder stringBuilder, Field field, boolean isCommaNeeded) {
-    if (isCommaNeeded) {
-      stringBuilder.append(", ");
-    }
-    stringBuilder.append(field.getColumnName()).append(" ").append(field.getType());
-    boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static"));
-    if (isStaticColumn) {
-      stringBuilder.append(" STATIC");
-    }
-    return stringBuilder;
-  }
-
   /**
    * This method returns the CQL query to drop table.
    * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html
@@ -246,32 +227,91 @@ class CassandraQueryFactory {
    *
    * @return
    */
-  static String getInsertDataQuery(CassandraMapping mapping, CassandraRow row) {
-    String query = QueryBuilder.insertInto(mapping.getKeySpace().getName(), mapping.getCoreName()).values(row.getFields(), row.getValues()).getQueryString();
-    return query;
+  static String getInsertDataQuery(CassandraMapping mapping, List<String> fields) {
+    String[] columnNames = getColumnNames(mapping, fields);
+    String[] objects = new String[fields.size()];
+    Arrays.fill(objects, "?");
+    return QueryBuilder.insertInto(mapping.getKeySpace().getName(), mapping.getCoreName()).values(columnNames, objects).getQueryString();
   }
 
-  static <K> String getObjectWithFieldsQuery(CassandraMapping mapping, String[] fields, K key, List<Object> objects) {
+  /**
+   *
+   * @param mapping
+   * @param fields
+   * @return
+   */
+  static String getDeleteDataQuery(CassandraMapping mapping, List<String> fields) {
+    String[] columnNames = getColumnNames(mapping, fields);
+    String[] objects = new String[fields.size()];
+    Arrays.fill(objects, "?");
+    Delete delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName());
+    Delete.Where query = null;
+    boolean isWhereNeeded = true;
+    for (String columnName : columnNames) {
+      if (isWhereNeeded) {
+        query = delete.where(QueryBuilder.eq(columnName, "?"));
+        isWhereNeeded = false;
+      } else {
+        query = query.and(QueryBuilder.eq(columnName, "?"));
+      }
+    }
+    return query.getQueryString();
+  }
+
+  static String getSelectObjectQuery(CassandraMapping mapping, List<String> keyFields) {
+    Select select = QueryBuilder.select().from(mapping.getKeySpace().getName(), mapping.getCoreName());
+    if(Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
+      select.allowFiltering();
+    }
+    String[] columnNames = getColumnNames(mapping, keyFields);
+    Select.Where query = null;
+    boolean isWhereNeeded = true;
+    for (String columnName : columnNames) {
+      if (isWhereNeeded) {
+        query = select.where(QueryBuilder.eq(columnName, "?"));
+        isWhereNeeded = false;
+      } else {
+        query = query.and(QueryBuilder.eq(columnName, "?"));
+      }
+    }
+    return query.getQueryString();
+  }
+
+  static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, String[] fields, List<String> keyFields) {
+    Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName());
+    if(Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
+      select.allowFiltering();
+    }
+    String[] columnNames = getColumnNames(mapping, keyFields);
+    Select.Where query = null;
+    boolean isWhereNeeded = true;
+    for (String columnName : columnNames) {
+      if (isWhereNeeded) {
+        query = select.where(QueryBuilder.eq(columnName, "?"));
+        isWhereNeeded = false;
+      } else {
+        query = query.and(QueryBuilder.eq(columnName, "?"));
+      }
+    }
+    return query.getQueryString();
+  }
+
+  static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, String[] fields) {
     String cqlQuery = null;
-    Select select = QueryBuilder.select(fields).from(mapping.getKeySpace().getName(), mapping.getCoreName());
+    String[] columnNames = getColumnNames(mapping, Arrays.asList(fields));
+    Select select = QueryBuilder.select(columnNames).from(mapping.getKeySpace().getName(), mapping.getCoreName());
+    if(Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
+      select.allowFiltering();
+    }
     CassandraKey cKey = mapping.getCassandraKey();
     if (cKey != null) {
       Select.Where query = null;
       boolean isWhereNeeded = true;
-      for (PartitionKeyField field : cKey.getPartitionKeyFields()) {
-        if (field.isComposite()) {
-          for (Field compositeField : field.getFields()) {
-            if (isWhereNeeded) {
-              query = select.where(QueryBuilder.eq(compositeField.getColumnName(), "?"));
-              isWhereNeeded = false;
-            }
-            query = query.and(QueryBuilder.eq(compositeField.getColumnName(), "?"));
-          }
+      for (Field field : cKey.getFieldList()) {
+        if (isWhereNeeded) {
+          query = select.where(QueryBuilder.eq(field.getColumnName(), "?"));
+          isWhereNeeded = false;
         } else {
-          if (isWhereNeeded) {
-            query = select.where(QueryBuilder.eq(field.getColumnName(), "?"));
-            isWhereNeeded = false;
-          }
           query = query.and(QueryBuilder.eq(field.getColumnName(), "?"));
         }
       }
@@ -281,7 +321,6 @@ class CassandraQueryFactory {
         boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
         if (isPrimaryKey) {
           cqlQuery = select.where(QueryBuilder.eq(field.getColumnName(), "?")).getQueryString();
-          objects.add(key);
           break;
         }
       }
@@ -298,20 +337,53 @@ class CassandraQueryFactory {
     Object key = cassandraQuery.getKey();
     String primaryKey = null;
     long limit = cassandraQuery.getLimit();
-    Select select = QueryBuilder.select(getColumnNames(mapping, fields)).from(mapping.getKeySpace().getName(), mapping.getCoreName());
+    Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName());
     if (limit > 0) {
       select = select.limit((int) limit);
     }
+    if(Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
+      select.allowFiltering();
+    }
     Select.Where query = null;
     boolean isWhereNeeded = true;
     if (key != null) {
-      primaryKey = getPKey(mapping.getFieldList());
-      query = select.where(QueryBuilder.eq(primaryKey, "?"));
-      objects.add(key);
+      if (mapping.getCassandraKey() != null) {
+        ArrayList<String> cassandraKeys = new ArrayList<>();
+        ArrayList<Object> cassandraValues = new ArrayList<>();
+        AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
+        String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+        for (int i = 0; i < cassandraKeys.size(); i++) {
+          if (isWhereNeeded) {
+            query = select.where(QueryBuilder.eq(columnKeys[i], "?"));
+            objects.add(cassandraValues.get(i));
+            isWhereNeeded = false;
+          } else {
+            query = query.and(QueryBuilder.eq(columnKeys[i], "?"));
+            objects.add(cassandraValues.get(i));
+          }
+        }
+      } else {
+        primaryKey = getPKey(mapping.getFieldList());
+        query = select.where(QueryBuilder.eq(primaryKey, "?"));
+        objects.add(key);
+      }
     } else {
       if (startKey != null) {
         if (mapping.getCassandraKey() != null) {
-//todo avro serialization
+          ArrayList<String> cassandraKeys = new ArrayList<>();
+          ArrayList<Object> cassandraValues = new ArrayList<>();
+          AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues);
+          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+          for (int i = 0; i < cassandraKeys.size(); i++) {
+            if (isWhereNeeded) {
+              query = select.where(QueryBuilder.gte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+              isWhereNeeded = false;
+            } else {
+              query = query.and(QueryBuilder.gte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+            }
+          }
         } else {
           primaryKey = getPKey(mapping.getFieldList());
           query = select.where(QueryBuilder.gte(primaryKey, "?"));
@@ -321,7 +393,20 @@ class CassandraQueryFactory {
       }
       if (endKey != null) {
         if (mapping.getCassandraKey() != null) {
-//todo avro serialization
+          ArrayList<String> cassandraKeys = new ArrayList<>();
+          ArrayList<Object> cassandraValues = new ArrayList<>();
+          AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues);
+          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+          for (int i = 0; i < cassandraKeys.size(); i++) {
+            if (isWhereNeeded) {
+              query = select.where(QueryBuilder.lte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+              isWhereNeeded = false;
+            } else {
+              query = query.and(QueryBuilder.lte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+            }
+          }
         } else {
           primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList());
           if (isWhereNeeded) {
@@ -339,14 +424,21 @@ class CassandraQueryFactory {
     return query.getQueryString();
   }
 
-  private static String[] getColumnNames(CassandraMapping mapping, String[] fields) {
-    String[] columnNames = new String[fields.length];
-    int i = 0;
+  private static String[] getColumnNames(CassandraMapping mapping, List<String> fields) {
+    ArrayList<String> columnNames = new ArrayList<>();
     for (String field : fields) {
-      columnNames[i] = mapping.getField(field).getColumnName();
-      i++;
+      Field fieldBean = mapping.getFieldFromFieldName(field);
+      if (fieldBean != null) {
+        columnNames.add(fieldBean.getColumnName());
+      } else {
+        if (mapping.getDefaultCassandraKey().getFieldName().equals(field)) {
+          columnNames.add(field);
+        } else {
+          LOG.warn("{} field is ignored, couldn't find relavant field in the persistent mapping", field);
+        }
+      }
     }
-    return columnNames;
+    return columnNames.toArray(new String[0]);
   }
 
   private static String getPKey(List<Field> fields) {
@@ -362,7 +454,7 @@ class CassandraQueryFactory {
   static String getDeleteByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) {
     String[] columns = null;
     if (!Arrays.equals(cassandraQuery.getFields(), mapping.getFieldNames())) {
-      columns = getColumnNames(mapping, cassandraQuery.getFields());
+      columns = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
     }
     Object startKey = cassandraQuery.getStartKey();
     Object endKey = cassandraQuery.getEndKey();
@@ -377,13 +469,43 @@ class CassandraQueryFactory {
     Delete.Where query = null;
     boolean isWhereNeeded = true;
     if (key != null) {
-      primaryKey = getPKey(mapping.getFieldList());
-      query = delete.where(QueryBuilder.eq(primaryKey, "?"));
-      objects.add(key);
+      if (mapping.getCassandraKey() != null) {
+        ArrayList<String> cassandraKeys = new ArrayList<>();
+        ArrayList<Object> cassandraValues = new ArrayList<>();
+        AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
+        String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+        for (int i = 0; i < cassandraKeys.size(); i++) {
+          if (isWhereNeeded) {
+            query = delete.where(QueryBuilder.eq(columnKeys[i], "?"));
+            objects.add(cassandraValues.get(i));
+            isWhereNeeded = false;
+          } else {
+            query = query.and(QueryBuilder.eq(columnKeys[i], "?"));
+            objects.add(cassandraValues.get(i));
+          }
+        }
+      } else {
+        primaryKey = getPKey(mapping.getFieldList());
+        query = delete.where(QueryBuilder.eq(primaryKey, "?"));
+        objects.add(key);
+      }
     } else {
       if (startKey != null) {
         if (mapping.getCassandraKey() != null) {
-//todo avro serialization
+          ArrayList<String> cassandraKeys = new ArrayList<>();
+          ArrayList<Object> cassandraValues = new ArrayList<>();
+          AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues);
+          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+          for (int i = 0; i < cassandraKeys.size(); i++) {
+            if (isWhereNeeded) {
+              query = delete.where(QueryBuilder.gte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+              isWhereNeeded = false;
+            } else {
+              query = query.and(QueryBuilder.gte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+            }
+          }
         } else {
           primaryKey = getPKey(mapping.getFieldList());
           query = delete.where(QueryBuilder.gte(primaryKey, "?"));
@@ -393,7 +515,20 @@ class CassandraQueryFactory {
       }
       if (endKey != null) {
         if (mapping.getCassandraKey() != null) {
-//todo avro serialization
+          ArrayList<String> cassandraKeys = new ArrayList<>();
+          ArrayList<Object> cassandraValues = new ArrayList<>();
+          AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues);
+          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+          for (int i = 0; i < cassandraKeys.size(); i++) {
+            if (isWhereNeeded) {
+              query = delete.where(QueryBuilder.lte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+              isWhereNeeded = false;
+            } else {
+              query = query.and(QueryBuilder.lte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+            }
+          }
         } else {
           primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList());
           if (isWhereNeeded) {
@@ -415,8 +550,7 @@ class CassandraQueryFactory {
     Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName());
     Update.Assignments updateAssignments = null;
     if (cassandraQuery instanceof CassandraQuery) {
-      String[] fields = cassandraQuery.getFields();
-      String[] columnNames = getColumnNames(mapping, fields);
+      String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
       for (String column : columnNames) {
         updateAssignments = update.with(QueryBuilder.set(column, "?"));
         objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(column));
@@ -429,13 +563,43 @@ class CassandraQueryFactory {
     Object key = cassandraQuery.getKey();
     boolean isWhereNeeded = true;
     if (key != null) {
-      primaryKey = getPKey(mapping.getFieldList());
-      query = updateAssignments.where(QueryBuilder.eq(primaryKey, "?"));
-      objects.add(key);
+      if (mapping.getCassandraKey() != null) {
+        ArrayList<String> cassandraKeys = new ArrayList<>();
+        ArrayList<Object> cassandraValues = new ArrayList<>();
+        AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
+        String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+        for (int i = 0; i < cassandraKeys.size(); i++) {
+          if (isWhereNeeded) {
+            query = updateAssignments.where(QueryBuilder.eq(columnKeys[i], "?"));
+            objects.add(cassandraValues.get(i));
+            isWhereNeeded = false;
+          } else {
+            query = query.and(QueryBuilder.eq(columnKeys[i], "?"));
+            objects.add(cassandraValues.get(i));
+          }
+        }
+      } else {
+        primaryKey = getPKey(mapping.getFieldList());
+        query = updateAssignments.where(QueryBuilder.eq(primaryKey, "?"));
+        objects.add(key);
+      }
     } else {
       if (startKey != null) {
         if (mapping.getCassandraKey() != null) {
-//todo avro serialization
+          ArrayList<String> cassandraKeys = new ArrayList<>();
+          ArrayList<Object> cassandraValues = new ArrayList<>();
+          AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues);
+          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+          for (int i = 0; i < cassandraKeys.size(); i++) {
+            if (isWhereNeeded) {
+              query = updateAssignments.where(QueryBuilder.gte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+              isWhereNeeded = false;
+            } else {
+              query = query.and(QueryBuilder.gte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+            }
+          }
         } else {
           primaryKey = getPKey(mapping.getFieldList());
           query = updateAssignments.where(QueryBuilder.gte(primaryKey, "?"));
@@ -445,7 +609,20 @@ class CassandraQueryFactory {
       }
       if (endKey != null) {
         if (mapping.getCassandraKey() != null) {
-//todo avro serialization
+          ArrayList<String> cassandraKeys = new ArrayList<>();
+          ArrayList<Object> cassandraValues = new ArrayList<>();
+          AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues);
+          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+          for (int i = 0; i < cassandraKeys.size(); i++) {
+            if (isWhereNeeded) {
+              query = updateAssignments.where(QueryBuilder.lte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+              isWhereNeeded = false;
+            } else {
+              query = query.and(QueryBuilder.lte(columnKeys[i], "?"));
+              objects.add(cassandraValues.get(i));
+            }
+          }
         } else {
           primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList());
           if (isWhereNeeded) {

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
index 237bd8a..a924a31 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
@@ -18,7 +18,9 @@
 package org.apache.gora.cassandra.serializers;
 
 import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.TableMetadata;
+import org.apache.gora.cassandra.bean.Field;
 import org.apache.gora.cassandra.store.CassandraClient;
 import org.apache.gora.cassandra.store.CassandraMapping;
 import org.apache.gora.cassandra.store.CassandraStore;
@@ -29,6 +31,8 @@ import org.apache.gora.store.DataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Locale;
 
 /**
@@ -90,27 +94,34 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
    *
    * @param cc              Cassandra Client
    * @param type            Serialization type
-   * @param keyClass        key class
-   * @param persistentClass persistent class
+   * @param dataStore        Cassandra DataStore
    * @param mapping         Cassandra Mapping
    * @param <K>             key class
    * @param <T>             persistent class
    * @return Serializer
    */
-  public static <K, T> CassandraSerializer getSerializer(CassandraClient cc, String type, final Class<K> keyClass, final Class<T> persistentClass, CassandraMapping mapping) {
+  public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K,T> dataStore, CassandraMapping mapping) {
     CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH));
     CassandraSerializer serializer;
     switch (serType) {
       case AVRO:
-        serializer = new AvroSerializer(cc, keyClass, persistentClass, mapping);
+        serializer = new AvroSerializer(cc, dataStore, mapping);
         break;
       case NATIVE:
       default:
-        serializer = new NativeSerializer(cc, keyClass, persistentClass, mapping);
+        serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
     }
     return serializer;
   }
 
+  protected String[] getFields() {
+    List<String> fields = new ArrayList<>();
+    for (Field field : mapping.getFieldList()) {
+      fields.add(field.getFieldName());
+    }
+    return fields.toArray(new String[0]);
+  }
+
   public abstract void put(K key, T value);
 
   public abstract T get(K key);
@@ -121,8 +132,30 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
 
   public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query);
 
-  public abstract long deleteByQuery(Query<K, T> query);
+  public boolean updateByQuery(Query query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, objectArrayList);
+    ResultSet results;
+    if (objectArrayList.size() == 0) {
+      results = client.getSession().execute(cqlQuery);
+    } else {
+      results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+    }
+    return results.wasApplied();
+  }
 
-  public abstract boolean updateByQuery(Query<K, T> query);
+  public long deleteByQuery(Query query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList);
+    ResultSet results;
+    if (objectArrayList.size() == 0) {
+      results = client.getSession().execute(cqlQuery);
+    } else {
+      results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+    }
+    LOG.debug("Delete by Query was applied : " + results.wasApplied());
+    LOG.info("Delete By Query method doesn't return the deleted element count.");
+    return 0;
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
index 6f64fa2..5f36ce2 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
@@ -42,7 +42,7 @@ import java.util.List;
  */
 class NativeSerializer<K, T extends CassandraNativePersistent> extends CassandraSerializer {
 
-  private static final Logger LOG = LoggerFactory.getLogger(CassandraNativePersistent.class);
+  private static final Logger LOG = LoggerFactory.getLogger(NativeSerializer.class);
 
   private Mapper<T> mapper;
 
@@ -72,9 +72,11 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
 
   @Override
   public Persistent get(Object key, String[] fields) {
-    List<Object> objectArrayList = new ArrayList<>();
-    String cqlQuery = CassandraQueryFactory.getObjectWithFieldsQuery(mapping, fields, key, objectArrayList);
-    ResultSet results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+    if(fields == null) {
+      fields = getFields();
+    }
+    String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields);
+    ResultSet results = client.getSession().execute(cqlQuery, key);
     Result<T> objects = mapper.map(results);
     List<T> objectList = objects.all();
     if (objectList != null) {
@@ -86,24 +88,9 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
   }
 
   @Override
-  public long deleteByQuery(Query query) {
-    List<Object> objectArrayList = new ArrayList<>();
-    String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList);
-    ResultSet results;
-    if (objectArrayList.size() == 0) {
-      results = client.getSession().execute(cqlQuery);
-    } else {
-      results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
-    }
-    LOG.debug("Delete by Query was applied : " + results.wasApplied());
-    LOG.info("Delete By Query method doesn't return the deleted element count.");
-    return 0;
-  }
-
-  @Override
   public org.apache.gora.query.Result execute(DataStore dataStore, Query query) {
     List<Object> objectArrayList = new ArrayList<>();
-    CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<K, T>(dataStore, query);
+    CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
     String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList);
     ResultSet results;
     if (objectArrayList.size() == 0) {
@@ -113,11 +100,14 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
     }
     Result<T> objects = mapper.map(results);
     Iterator iterator = objects.iterator();
+    long count = 0;
     while (iterator.hasNext()) {
       T result = (T) iterator.next();
       K key = getKey(result);
       cassandraResult.addResultElement(key, result);
+      count ++ ;
     }
+    cassandraResult.setLimit(count);
     return cassandraResult;
   }
 
@@ -128,19 +118,6 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
     mapper = mappingManager.mapper(persistentClass);
   }
 
-  @Override
-  public boolean updateByQuery(Query query) {
-    List<Object> objectArrayList = new ArrayList<>();
-    String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, objectArrayList);
-    ResultSet results;
-    if (objectArrayList.size() == 0) {
-      results = client.getSession().execute(cqlQuery);
-    } else {
-      results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
-    }
-    return results.wasApplied();
-  }
-
   private K getKey(T object) {
     String keyField = null;
     for (Field field : mapping.getFieldList()) {

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
index 7b5d265..61b8d1e 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
@@ -35,10 +35,18 @@ public class CassandraMapping {
 
   private Map<String, String> tableProperties;
 
+  private Class keyClass;
+
+  private Class persistentClass;
+
   private KeySpace keySpace;
 
   private List<Field> fieldList;
 
+  private List<Field> inlinedDefinedPartitionKeys;
+
+  private static final String PRIMARY_KEY = "primarykey";
+
   private String coreName;
 
   public KeySpace getKeySpace() {
@@ -53,9 +61,9 @@ public class CassandraMapping {
     return fieldList;
   }
 
-  public Field getField(String field) {
+  public Field getFieldFromFieldName(String fieldName) {
     for (Field field1 : fieldList) {
-      if (field1.getFieldName().equals(field)) {
+      if (field1.getFieldName().equals(fieldName)) {
         return field1;
       }
     }
@@ -75,11 +83,12 @@ public class CassandraMapping {
     return cassandraKey;
   }
 
-  public void setCassandraKey(CassandraKey cassandraKey) {
+  void setCassandraKey(CassandraKey cassandraKey) {
     this.cassandraKey = cassandraKey;
+    this.fieldList.addAll(cassandraKey.getFieldList());
   }
 
-  public CassandraMapping() {
+  CassandraMapping() {
     this.fieldList = new ArrayList<>();
     this.tableProperties = new HashMap<>();
   }
@@ -103,4 +112,54 @@ public class CassandraMapping {
   public String getProperty(String key) {
     return this.tableProperties.get(key);
   }
+
+  public Field getDefaultCassandraKey() {
+    Field field = new Field();
+    field.setFieldName("defaultId");
+    field.setColumnName("defaultId");
+    field.setType("text");
+    return field;
+  }
+
+  public boolean isPartitionKeyDefined() {
+    if (cassandraKey == null) {
+      for (Field field : fieldList) {
+        if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) {
+          return true;
+        }
+      }
+      return false;
+    }
+    return true;
+  }
+
+  public Class getKeyClass() {
+    return keyClass;
+  }
+
+  public void setKeyClass(Class keyClass) {
+    this.keyClass = keyClass;
+  }
+
+  public Class getPersistentClass() {
+    return persistentClass;
+  }
+
+  public void setPersistentClass(Class persistentClass) {
+    this.persistentClass = persistentClass;
+  }
+
+  public List<Field> getInlinedDefinedPartitionKeys() {
+    if(inlinedDefinedPartitionKeys != null) {
+      return inlinedDefinedPartitionKeys;
+    } else {
+      inlinedDefinedPartitionKeys = new ArrayList<>();
+      for (Field field : fieldList) {
+        if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) {
+          inlinedDefinedPartitionKeys.add(field);
+        }
+      }
+      return inlinedDefinedPartitionKeys;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
index 0231ac3..d828d2f 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
@@ -46,7 +46,7 @@ public class CassandraMappingBuilder<K, T extends Persistent> {
    */
   @SuppressWarnings("all")
   public CassandraMapping readMapping(String filename) throws IOException {
-    CassandraMapping map = new CassandraMapping();
+    CassandraMapping cassandraMapping = new CassandraMapping();
     Class keyClass = dataStore.getKeyClass();
     Class persistentClass = dataStore. getPersistentClass();
     try {
@@ -66,14 +66,16 @@ public class CassandraMappingBuilder<K, T extends Persistent> {
 
           classMatched = true;
           String tableName = classElement.getAttributeValue("table");
-          map.setCoreName(tableName);
+          cassandraMapping.setCoreName(tableName);
+          cassandraMapping.setKeyClass(dataStore.getKeyClass());
+          cassandraMapping.setPersistentClass(dataStore.getPersistentClass());
 
           List classAttributes = classElement.getAttributes();
           for (Object anAttributeList : classAttributes) {
             Attribute attribute = (Attribute) anAttributeList;
             String attributeName = attribute.getName();
             String attributeValue = attribute.getValue();
-            map.addProperty(attributeName, attributeValue);
+            cassandraMapping.addProperty(attributeName, attributeValue);
           }
 
           List<Element> fields = classElement.getChildren("field");
@@ -83,7 +85,7 @@ public class CassandraMappingBuilder<K, T extends Persistent> {
 
             List fieldAttributes = field.getAttributes();
             processAttributes(fieldAttributes, cassandraField);
-            map.addCassandraField(cassandraField);
+            cassandraMapping.addCassandraField(cassandraField);
           }
           break;
         }
@@ -96,7 +98,7 @@ public class CassandraMappingBuilder<K, T extends Persistent> {
         LOG.error("Check that 'keyClass' and 'name' parameters in {} no mapping has been initialized for {} class mapping", filename, persistentClass);
       }
 
-      String keyspaceName = map.getProperty("keyspace");
+      String keyspaceName = cassandraMapping.getProperty("keyspace");
       if (keyspaceName != null) {
         KeySpace keyspace;
         for (Element keyspaceElement : keyspaces) {
@@ -135,13 +137,16 @@ public class CassandraMappingBuilder<K, T extends Persistent> {
                 }
                 break;
             }
-            map.setKeySpace(keyspace);
+            cassandraMapping.setKeySpace(keyspace);
             break;
           }
 
         }
 
       }
+      else {
+        throw new RuntimeException("KeySpace couldn't be able to found in the  cassandra mapping. Please configure the cassandra mapping correctly.");
+      }
 
       for (Element key : keys) {
         if (keyClass.getName().equals(key.getAttributeValue("name"))) {
@@ -172,7 +177,7 @@ public class CassandraMappingBuilder<K, T extends Persistent> {
           }
 
           //process cluster keys
-          List<Element> clusterKeyFields = clusterKeys.getChildren("field");
+          List<Element> clusterKeyFields = clusterKeys.getChildren("key");
           for (Element clusterKeyField : clusterKeyFields) {
             ClusterKeyField keyField = new ClusterKeyField();
             List fieldAttributes = clusterKeyField.getAttributes();
@@ -181,32 +186,25 @@ public class CassandraMappingBuilder<K, T extends Persistent> {
               String attributeName = attribute.getName();
               String attributeValue = attribute.getValue();
               switch (attributeName) {
-                case "name":
-                  keyField.setFieldName(attributeValue);
-                  break;
                 case "column":
                   keyField.setColumnName(attributeValue);
                   break;
-                case "type":
-                  keyField.setType(attributeValue.replace("(","<").replace(")",">"));
-                  break;
                 case "order":
                   keyField.setOrder(ClusterKeyField.Order.valueOf(attributeValue.toUpperCase(Locale.ENGLISH)));
                   break;
                 default:
-                  keyField.addProperty(attributeName, attributeValue);
-                  break;
+                  throw new RuntimeException("");
               }
             }
             cassandraKey.addClusterKeyField(keyField);
           }
-          map.setCassandraKey(cassandraKey);
+          cassandraMapping.setCassandraKey(cassandraKey);
         }
       }
     } catch (Exception ex) {
       throw new IOException(ex);
     }
-    return map;
+    return cassandraMapping;
   }
 
   private void processAttributes(List<Element> attributes, Field fieldKey) {

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index 5e209d9..8b04442 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -90,7 +90,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
       mapping = mappingBuilder.readMapping(mappingFile);
       CassandraClient cassandraClient = new CassandraClient();
       cassandraClient.initialize(properties, mapping);
-      cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), keyClass, persistentClass, mapping);
+      cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), this, mapping);
     } catch (Exception e) {
       throw new RuntimeException("Error while initializing Cassandra store: " + e.getMessage(), e);
     }

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml
index 8c970dc..74e4b1c 100644
--- a/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml
+++ b/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml
@@ -17,38 +17,6 @@
    limitations under the License.
 -->
 
-<!--
-   The value of 'host' attribute of keyspace tag should match exactly what is in
-   gora.properties file. Essentially this means that if you are using port number, you should
-   use it every where regardless of whether it is the default port or not.
-   At runtime Gora will otherwise try to connect to localhost
-   https://issues.apache.org/jira/browse/GORA-269
-
-   The values of 'replication_factor' and 'placement_strategy' attribute of keyspace tag
-   only apply if gora create the kyespace. they have no effect if this is being used against 
-   an existing keyspace. the default value for 'replication_factor' is '1'
-   
-   The value of 'placement_strategy' should be a fully qualifed class name that is known to
-   the cassansra cluster, not the application or gora. As of this writing, the classes that ship
-   with cassandra are:
-   'org.apache.cassandra.locator.SimpleStrategy'
-   'org.apache.cassandra.locator.NetworkTopologyStrategy'
-   gora cassandra would use SimpleStrategy by default if no value for this attribute is specified
-   
-   The default value of 'gc_grace_seconds' is '0' which is ONLY VIABLE FOR SINGLE NODE
-   CLUSTER. you should update this value according to your cluster configuration. 
-   https://wiki.apache.org/cassandra/StorageConfiguration
-
-   The value of 'ttl' (time to live) attribute of field tag should most likely always
-   be zero unless you want Cassandra to create Tombstones and delete portions of your
-   data once this period expires. Any positive value is read and bound to the number
-   of seconds after which the value for that field will disappear. The default value of ttl
-   is '0'
-
-   More information on gora-cassandra configuration and mapping's can be found
-   at http://gora.apache.org/current/gora-cassandra.html
--->
-
 <gora-otd>
 
   <keyspace name="avroKeySpace" durableWrite="false">
@@ -56,18 +24,21 @@
   </keyspace>
   
   <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage" keyspace="avroKeySpace">
-    <field name="url" column="url" length="128" primarykey="true"/>
+    <field name="url" column="url" type="ascii" />
     <field name="content" column="content" type="blob"/>
-    <field name="parsedContent" column="parsedContent" type="list"/>
-    <field name="outlinks" column="outlinks" type="map"/>
+    <field name="parsedContent" column="parsedContent" type="list(ascii)"/>
+    <field name="outlinks" column="outlinks" type="map(text,text)"/>
+    <field name="headers" column="headers" type="map(text,text)"/>
+    <field name="byteData" column="byteData" type="map(text,blob)"/>
+    <field name="stringData" column="stringData" type="map(text,ascii)"/>
   </class>
 
   <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" keyspace="avroKeySpace"
          table="Employee" compactStorage="true" >
-    <field name="name" column="name" type="text" ttl="10"/>
-    <field name="dateOfBirth" column="dob" type="bigint" ttl="10"/>
-    <field name="ssn" column="ssn" type="text" ttl="10" primarykey="true"/>
-    <field name="salary" column="salary" type="int" ttl="10" />
+    <field name="name" column="name" type="text"/>
+    <field name="dateOfBirth" column="dob" type="bigint" />
+    <field name="ssn" column="ssn" type="text" />
+    <field name="salary" column="salary" type="int" />
    </class>
 
 </gora-otd>

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml
index 556d553..f3b3272 100644
--- a/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml
+++ b/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml
@@ -55,23 +55,30 @@
         <placementStrategy name="SimpleStrategy" replication_factor="1"/>
     </keyspace>
 
-    <class name="org.apache.gora.examples.generated.Employee" keyClass="org.apache.gora.examples.generated.WebPage" keyspace="EmployeeSpace"
-           table="Employee"  compactStorage="true" id="31323131">
-        <field name="name" column="name" type="text" ttl="10"/>
-        <field name="dateOfBirth" column="dob" type="timestamp" ttl="10"/>
+    <class name="org.apache.gora.cassandra.example.generated.avroSerialization.CassandraRecord" keyClass="org.apache.gora.cassandra.example.generated.avroSerialization.CassandraKey" keyspace="EmployeeSpace"
+           table="CassandraRecord" allowFiltering="true"  id="5a1c395e-b41f-11e5-9f22-ba0be0483c18">
+        <field name="dataString" column="name" type="text"/>
+        <field name="dataInt" column="age" type="int"/>
+        <field name="dataLong" column="salary" type="bigint"/>
+        <field name="dataDouble" column="testDouble" type="double"/>
+        <field name="dataBytes" column="quotes" type="blob"/>
+        <field name="arrayInt" column="listInt" type="list(int)"/>
+        <field name="arrayString" column="listString" type="list(text)"/>
+        <field name="arrayLong" column="listLong" type="list(bigint)"/>
+        <field name="arrayDouble" column="listDouble" type="list(double)"/>
+        <field name="mapInt" column="mapInt" type="map(text,int)"/>
+        <field name="mapString" column="mapString" type="map(text,text)"/>
+        <field name="mapLong" column="mapLong" type="map(text,bigint)"/>
+        <field name="mapDouble" column="mapDouble" type="map(text,double)"/>
     </class>
 
-    <cassandraKey name="org.apache.gora.examples.generated.WebPage">
+    <cassandraKey name="org.apache.gora.cassandra.example.generated.avroSerialization.CassandraKey">
         <partitionKey>
-            <compositeKey>
-                <field name="city" column="city" type="text"/>
-                <field name="country" column="country" type="text"/>
-            </compositeKey>
-            <field name="employerId" column="empID" type="int"/>
-            <field name="departmentId" column="deptID" type="int"/>
+                <field name="url" column="urlData" type="text"/>
+                <field name="timestamp" column="timestampData" type="bigint"/>
         </partitionKey>
         <clusterKey>
-            <field name="joinDate" column="joinDate" type="timestamp" order="desc"/>
+            <key column="timestampData" order="desc"/>
         </clusterKey>
     </cassandraKey>
 

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/test/conf/gora-cassandra-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/conf/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/gora-cassandra-mapping.xml
index 5f59ba8..90f5267 100644
--- a/gora-cassandra-cql/src/test/conf/gora-cassandra-mapping.xml
+++ b/gora-cassandra-cql/src/test/conf/gora-cassandra-mapping.xml
@@ -62,7 +62,7 @@
         </placementStrategy>
     </keyspace>
 
-    <class name="org.apache.gora.examples.generated.Employee1" keyClass="java.lang.String" keyspace="EmployeeSpace"
+    <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" keyspace="EmployeeSpace"
            table="Employee" compactStorage="true" id="31323131">
         <field name="lname" column="name" type="text" ttl="10" static="true"/>
         <field name="fname" column="name" type="text" ttl="10"/>

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java
new file mode 100644
index 0000000..82f6f58
--- /dev/null
+++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java
@@ -0,0 +1,210 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.gora.cassandra.store;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.cassandra.GoraCassandraTestDriver;
+import org.apache.gora.cassandra.example.generated.avroSerialization.CassandraKey;
+import org.apache.gora.cassandra.example.generated.avroSerialization.CassandraRecord;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Properties;
+
+/**
+ * This class tests Cassandra Store functionality with CassandraKey.
+ */
+public class TestCassandraStoreWithCassandraKey {
+  private static GoraCassandraTestDriver testDriver = new GoraCassandraTestDriver();
+  private static DataStore<CassandraKey, CassandraRecord> cassandraRecordDataStore;
+  private static Properties parameter;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    setProperties();
+    testDriver.setParameters(parameter);
+    testDriver.setUpClass();
+    cassandraRecordDataStore = testDriver.createDataStore(CassandraKey.class, CassandraRecord.class);
+  }
+
+  private static void setProperties() {
+    parameter = new Properties();
+    parameter.setProperty(CassandraStoreParameters.CASSANDRA_SERVERS, "localhost");
+    parameter.setProperty(CassandraStoreParameters.PORT, "9042");
+    parameter.setProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE, "avro");
+    parameter.setProperty(CassandraStoreParameters.PROTOCOL_VERSION, "3");
+    parameter.setProperty(CassandraStoreParameters.CLUSTER_NAME, "Test Cluster");
+    parameter.setProperty("gora.cassandrastore.mapping.file", "compositeKey/gora-cassandra-mapping.xml");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    testDriver.tearDown();
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    testDriver.tearDownClass();
+  }
+
+
+  /**
+   * In this test case, schema exists method behavior of the data store is testing.
+   */
+  @Test
+  public void testSchemaRelatedBehaviour() {
+    cassandraRecordDataStore.createSchema();
+    Assert.assertTrue(cassandraRecordDataStore.schemaExists());
+    cassandraRecordDataStore.deleteSchema();
+    Assert.assertFalse(cassandraRecordDataStore.schemaExists());
+    cassandraRecordDataStore.createSchema();
+    Assert.assertTrue(cassandraRecordDataStore.schemaExists());
+  }
+
+  @Test
+  public void testSimplePutGet() {
+    cassandraRecordDataStore.createSchema();
+    CassandraRecord record = new CassandraRecord();
+    record.setDataLong(719411002L);
+    record.setDataString(new Utf8("M.K.H. Gunasekara"));
+    record.setDataInt(144);
+    record.setDataBytes(ByteBuffer.wrap("No 144, Gunasekara Mawatha, Mattumgala, Ragama".getBytes(Charset.defaultCharset())));
+    record.setDataDouble(3.14159d);
+    CassandraKey key = new CassandraKey();
+    key.setTimestamp(2027L);
+    key.setUrl("www.apache.org");
+    cassandraRecordDataStore.put(key, record);
+    CassandraRecord retrievedRecord = cassandraRecordDataStore.get(key);
+    Assert.assertEquals(record.getDataInt(), retrievedRecord.getDataInt());
+    Assert.assertEquals(record.getDataString(), retrievedRecord.getDataString());
+    Assert.assertEquals(record.getDataLong(), retrievedRecord.getDataLong());
+    Assert.assertEquals(record.getDataBytes(), retrievedRecord.getDataBytes());
+    Assert.assertEquals(record.getDataDouble(), retrievedRecord.getDataDouble());
+    cassandraRecordDataStore.delete(key);
+    Assert.assertNull(cassandraRecordDataStore.get(key));
+  }
+
+  @Test
+  public void testExecuteQuery() throws Exception {
+    Query query = cassandraRecordDataStore.newQuery();
+    cassandraRecordDataStore.truncateSchema();
+    CassandraKey key = new CassandraKey();
+    key.setTimestamp(2027L);
+    key.setUrl("www.apache.org");
+    query.setKey(key);
+    Result result = query.execute();
+    Assert.assertFalse(result.next());
+    CassandraRecord record = new CassandraRecord();
+    record.setDataLong(719411002L);
+    record.setDataString(new Utf8("M.K.H. Gunasekara"));
+    record.setDataInt(144);
+    record.setDataBytes(ByteBuffer.wrap("No 144, Gunasekara Mawatha, Mattumgala, Ragama".getBytes(Charset.defaultCharset())));
+    record.setDataDouble(3.14159d);
+    // test simple put and query with setKey
+    cassandraRecordDataStore.put(key, record);
+    CassandraRecord retrievedRecord = cassandraRecordDataStore.get(key);
+    Assert.assertEquals(record.getDataInt(), retrievedRecord.getDataInt());
+    Assert.assertEquals(record.getDataString(), retrievedRecord.getDataString());
+    Assert.assertEquals(record.getDataLong(), retrievedRecord.getDataLong());
+    Assert.assertEquals(record.getDataBytes(), retrievedRecord.getDataBytes());
+    Assert.assertEquals(record.getDataDouble(), retrievedRecord.getDataDouble());
+    result = query.execute();
+    Assert.assertTrue(result.next());
+    // verify data
+    retrievedRecord = (CassandraRecord) result.get();
+    Assert.assertEquals(record.getDataInt(), retrievedRecord.getDataInt());
+    Assert.assertEquals(record.getDataString(), retrievedRecord.getDataString());
+    Assert.assertEquals(record.getDataLong(), retrievedRecord.getDataLong());
+    Assert.assertEquals(record.getDataBytes(), retrievedRecord.getDataBytes());
+    Assert.assertEquals(record.getDataDouble(), retrievedRecord.getDataDouble());
+    // test delete by query
+    cassandraRecordDataStore.deleteByQuery(query);
+    result = query.execute();
+    Assert.assertFalse(result.next());
+    // test empty query
+    Query emptyQuery = cassandraRecordDataStore.newQuery();
+    result = emptyQuery.execute();
+    Assert.assertFalse(result.next());
+    cassandraRecordDataStore.put(key, record);
+    result = query.execute();
+    Assert.assertTrue(result.next());
+
+    // test Range with Query
+    cassandraRecordDataStore.truncateSchema();
+    //insert data
+    CassandraRecord record1 = new CassandraRecord();
+    CassandraRecord record2 = new CassandraRecord();
+    CassandraRecord record3 = new CassandraRecord();
+    CassandraRecord record4 = new CassandraRecord();
+    record1.setDataLong(719411002L);
+    record1.setDataString(new Utf8("Madawa"));
+    record1.setDataInt(100);
+    record2.setDataLong(712778588L);
+    record2.setDataString(new Utf8("Kasun"));
+    record2.setDataInt(101);
+    record3.setDataLong(716069539L);
+    record3.setDataString(new Utf8("Charith"));
+    record3.setDataInt(102);
+    record4.setDataLong(112956051L);
+    record4.setDataString(new Utf8("Bhanuka"));
+    record4.setDataInt(103);
+    CassandraKey key1 = new CassandraKey();
+    key1.setTimestamp(200L);
+    key1.setUrl("www.apache.org");
+    CassandraKey key2 = new CassandraKey();
+    key2.setTimestamp(205L);
+    key2.setUrl("www.apache.org");
+    CassandraKey key3 = new CassandraKey();
+    key3.setTimestamp(210L);
+    key3.setUrl("www.apache.org");
+    CassandraKey key4 = new CassandraKey();
+    key4.setTimestamp(215L);
+    key4.setUrl("www.apache.org");
+    cassandraRecordDataStore.put(key1,record1);
+    cassandraRecordDataStore.put(key2,record2);
+    cassandraRecordDataStore.put(key3,record3);
+    cassandraRecordDataStore.put(key4,record4);
+    Query rangeQuery = cassandraRecordDataStore.newQuery();
+    rangeQuery.setStartKey(key2);
+    rangeQuery.setEndKey(key2);
+    result = rangeQuery.execute();
+    int i = 0;
+    while (result.next()) {
+      i++;
+    }
+    Assert.assertEquals(1,i);
+
+    rangeQuery.setStartKey(key2);
+    rangeQuery.setEndKey(key3);
+    result = rangeQuery.execute();
+    i = 0;
+    while (result.next()) {
+      i++;
+    }
+    Assert.assertEquals(2,i);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
index 88d6267..f3df5e4 100644
--- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
+++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
@@ -81,7 +81,7 @@ public class TestCassandraStoreWithNativeSerialization {
    * In this test case, put and get behavior of the data store are testing.
    */
   @Test
-  public void testSimplePutandGet() {
+  public void testSimplePutAndGet() {
     UUID id = UUID.randomUUID();
     User user1 = new User(id, "madhawa", Date.from(Instant.now()));
     // storing data;
@@ -96,7 +96,7 @@ public class TestCassandraStoreWithNativeSerialization {
    * In this test case, put and delete behavior of the data store are testing.
    */
   @Test
-  public void testSimplePutDeleteandGet() {
+  public void testSimplePutDeleteAndGet() {
     UUID id = UUID.randomUUID();
     User user1 = new User(id, "kasun", Date.from(Instant.now()));
     // storing data;


Mime
View raw message