ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [08/20] ignite git commit: IGNITE-3172 Refactoring Ignite-Cassandra serializers. - Fixes #956.
Date Wed, 14 Sep 2016 01:37:54 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
new file mode 100644
index 0000000..393dbe4
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * Stores persistence settings for Ignite cache key
+ */
+public class KeyPersistenceSettings extends PersistenceSettings {
+    /** Partition key XML tag. */
+    private static final String PARTITION_KEY_ELEMENT = "partitionKey";
+
+    /** Cluster key XML tag. */
+    private static final String CLUSTER_KEY_ELEMENT = "clusterKey";
+
+    /** POJO field XML tag. */
+    private static final String FIELD_ELEMENT = "field";
+
+    /** POJO fields. */
+    private List<PojoField> fields = new LinkedList<>();
+
+    /** Partition key fields. */
+    private List<PojoField> partKeyFields = new LinkedList<>();
+
+    /** Cluster key fields. */
+    private List<PojoField> clusterKeyFields = new LinkedList<>();
+
+    /**
+     * Creates key persistence settings object based on it's XML configuration.
+     *
+     * @param el XML element storing key persistence settings
+     */
+    public KeyPersistenceSettings(Element el) {
+        super(el);
+
+        if (!PersistenceStrategy.POJO.equals(getStrategy()))
+            return;
+
+        NodeList keyElem = el.getElementsByTagName(PARTITION_KEY_ELEMENT);
+
+        Element partKeysNode = keyElem != null ? (Element) keyElem.item(0) : null;
+
+        Element clusterKeysNode = el.getElementsByTagName(CLUSTER_KEY_ELEMENT) != null ?
+            (Element)el.getElementsByTagName(CLUSTER_KEY_ELEMENT).item(0) : null;
+
+        if (partKeysNode == null && clusterKeysNode != null) {
+            throw new IllegalArgumentException("It's not allowed to specify cluster key fields mapping, but " +
+                "doesn't specify partition key mappings");
+        }
+
+        partKeyFields = detectFields(partKeysNode, getPartitionKeyDescriptors());
+
+        if (partKeyFields == null || partKeyFields.isEmpty()) {
+            throw new IllegalStateException("Failed to initialize partition key fields for class '" +
+                getJavaClass().getName() + "'");
+        }
+
+        clusterKeyFields = detectFields(clusterKeysNode, getClusterKeyDescriptors(partKeyFields));
+
+        fields = new LinkedList<>();
+        fields.addAll(partKeyFields);
+        fields.addAll(clusterKeyFields);
+
+        checkDuplicates(fields);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<PojoField> getFields() {
+        return fields;
+    }
+
+    /**
+     * Returns Cassandra DDL for primary key.
+     *
+     * @return DDL statement.
+     */
+    public String getPrimaryKeyDDL() {
+        StringBuilder partKey = new StringBuilder();
+
+        List<String> cols = getPartitionKeyColumns();
+        for (String column : cols) {
+            if (partKey.length() != 0)
+                partKey.append(", ");
+
+            partKey.append(column);
+        }
+
+        StringBuilder clusterKey = new StringBuilder();
+
+        cols = getClusterKeyColumns();
+        if (cols != null) {
+            for (String column : cols) {
+                if (clusterKey.length() != 0)
+                    clusterKey.append(", ");
+
+                clusterKey.append(column);
+            }
+        }
+
+        return clusterKey.length() == 0 ?
+            "  primary key ((" + partKey.toString() + "))" :
+            "  primary key ((" + partKey.toString() + "), " + clusterKey.toString() + ")";
+    }
+
+    /**
+     * Returns Cassandra DDL for cluster key.
+     *
+     * @return Cluster key DDL.
+     */
+    public String getClusteringDDL() {
+        StringBuilder builder = new StringBuilder();
+
+        for (PojoField field : clusterKeyFields) {
+            PojoKeyField.SortOrder sortOrder = ((PojoKeyField)field).getSortOrder();
+
+            if (sortOrder == null)
+                continue;
+
+            if (builder.length() != 0)
+                builder.append(", ");
+
+            boolean asc = PojoKeyField.SortOrder.ASC.equals(sortOrder);
+
+            builder.append(field.getColumn()).append(" ").append(asc ? "asc" : "desc");
+        }
+
+        return builder.length() == 0 ? null : "clustering order by (" + builder.toString() + ")";
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String defaultColumnName() {
+        return "key";
+    }
+
+    /**
+     * Returns partition key columns of Cassandra table.
+     *
+     * @return List of column names.
+     */
+    private List<String> getPartitionKeyColumns() {
+        List<String> cols = new LinkedList<>();
+
+        if (PersistenceStrategy.BLOB.equals(getStrategy()) || PersistenceStrategy.PRIMITIVE.equals(getStrategy())) {
+            cols.add(getColumn());
+            return cols;
+        }
+
+        if (partKeyFields != null) {
+            for (PojoField field : partKeyFields)
+                cols.add(field.getColumn());
+        }
+
+        return cols;
+    }
+
+    /**
+     * Returns cluster key columns of Cassandra table.
+     *
+     * @return List of column names.
+     */
+    private List<String> getClusterKeyColumns() {
+        List<String> cols = new LinkedList<>();
+
+        if (clusterKeyFields != null) {
+            for (PojoField field : clusterKeyFields)
+                cols.add(field.getColumn());
+        }
+
+        return cols;
+    }
+
+    /**
+     * Extracts POJO fields specified in XML element.
+     *
+     * @param el XML element describing fields.
+     * @param descriptors POJO fields descriptors.
+     * @return List of {@code This} fields.
+     */
+    private List<PojoField> detectFields(Element el, List<PropertyDescriptor> descriptors) {
+        List<PojoField> list = new LinkedList<>();
+
+        if (el == null && (descriptors == null || descriptors.isEmpty()))
+            return list;
+
+        if (el == null) {
+            for (PropertyDescriptor descriptor : descriptors)
+                list.add(new PojoKeyField(descriptor));
+
+            return list;
+        }
+
+        NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT);
+
+        int cnt = nodes == null ? 0 : nodes.getLength();
+
+        if (cnt == 0) {
+            throw new IllegalArgumentException("Incorrect configuration of Cassandra key persistence settings, " +
+                "no cluster key fields specified inside '" + PARTITION_KEY_ELEMENT + "/" +
+                CLUSTER_KEY_ELEMENT + "' element");
+        }
+
+        for (int i = 0; i < cnt; i++) {
+            PojoKeyField field = new PojoKeyField((Element)nodes.item(i), getJavaClass());
+
+            PropertyDescriptor desc = findPropertyDescriptor(descriptors, field.getName());
+
+            if (desc == null) {
+                throw new IllegalArgumentException("Specified POJO field '" + field.getName() +
+                    "' doesn't exist in '" + getJavaClass().getName() + "' class");
+            }
+
+            list.add(field);
+        }
+
+        return list;
+    }
+
+    /**
+     * @return POJO field descriptors for partition key.
+     */
+    private List<PropertyDescriptor> getPartitionKeyDescriptors() {
+        List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(),
+            AffinityKeyMapped.class, true);
+
+        return primitivePropDescriptors != null && !primitivePropDescriptors.isEmpty() ?
+            primitivePropDescriptors :
+            PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
+    }
+
+    /**
+     * @return POJO field descriptors for cluster key.
+     */
+    private List<PropertyDescriptor> getClusterKeyDescriptors(List<PojoField> partKeyFields) {
+        List<PropertyDescriptor> primitivePropDescriptors =
+            PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
+
+        if (primitivePropDescriptors == null || primitivePropDescriptors.isEmpty() ||
+            partKeyFields.size() == primitivePropDescriptors.size())
+            return null;
+
+        for (PojoField field : partKeyFields) {
+            for (int i = 0; i < primitivePropDescriptors.size(); i++) {
+                if (primitivePropDescriptors.get(i).getName().equals(field.getName())) {
+                    primitivePropDescriptors.remove(i);
+                    break;
+                }
+            }
+        }
+
+        return primitivePropDescriptors;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
new file mode 100644
index 0000000..2c43ed4
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.cassandra.common.SystemHelper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.springframework.core.io.Resource;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+/**
+ * Stores persistence settings for Ignite cache key and value
+ */
+public class KeyValuePersistenceSettings implements Serializable {
+    /**
+     * Default Cassandra keyspace options which should be used to create new keyspace.
+     * <ul>
+     * <li> <b>SimpleStrategy</b> for replication work well for single data center Cassandra cluster.<br/>
+     *      If your Cassandra cluster deployed across multiple data centers it's better to use <b>NetworkTopologyStrategy</b>.
+     * </li>
+     * <li> Three replicas will be created for each data block. </li>
+     * <li> Setting DURABLE_WRITES to true specifies that all data should be written to commit log. </li>
+     * </ul>
+     */
+    private static final String DFLT_KEYSPACE_OPTIONS = "replication = {'class' : 'SimpleStrategy', " +
+            "'replication_factor' : 3} and durable_writes = true";
+
+    /** Xml attribute specifying Cassandra keyspace to use. */
+    private static final String KEYSPACE_ATTR = "keyspace";
+
+    /** Xml attribute specifying Cassandra table to use. */
+    private static final String TABLE_ATTR = "table";
+
+    /** Xml attribute specifying ttl (time to leave) for rows inserted in Cassandra. */
+    private static final String TTL_ATTR = "ttl";
+
+    /** Root xml element containing persistence settings specification. */
+    private static final String PERSISTENCE_NODE = "persistence";
+
+    /** Xml element specifying Cassandra keyspace options. */
+    private static final String KEYSPACE_OPTIONS_NODE = "keyspaceOptions";
+
+    /** Xml element specifying Cassandra table options. */
+    private static final String TABLE_OPTIONS_NODE = "tableOptions";
+
+    /** Xml element specifying Ignite cache key persistence settings. */
+    private static final String KEY_PERSISTENCE_NODE = "keyPersistence";
+
+    /** Xml element specifying Ignite cache value persistence settings. */
+    private static final String VALUE_PERSISTENCE_NODE = "valuePersistence";
+
+    /** TTL (time to leave) for rows inserted into Cassandra table {@link <a href="https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_c.html">Expiring data</a>}. */
+    private Integer ttl;
+
+    /** Cassandra keyspace (analog of tablespace in relational databases). */
+    private String keyspace;
+
+    /** Cassandra table. */
+    private String tbl;
+
+    /** Cassandra table creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html">CREATE TABLE</a>}. */
+    private String tblOptions;
+
+    /** Cassandra keyspace creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html">CREATE KEYSPACE</a>}. */
+    private String keyspaceOptions = DFLT_KEYSPACE_OPTIONS;
+
+    /** Persistence settings for Ignite cache keys. */
+    private KeyPersistenceSettings keyPersistenceSettings;
+
+    /** Persistence settings for Ignite cache values. */
+    private ValuePersistenceSettings valPersistenceSettings;
+
+    /**
+     * Constructs Ignite cache key/value persistence settings.
+     *
+     * @param settings string containing xml with persistence settings for Ignite cache key/value
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public KeyValuePersistenceSettings(String settings) {
+        init(settings);
+    }
+
+    /**
+     * Constructs Ignite cache key/value persistence settings.
+     *
+     * @param settingsFile xml file with persistence settings for Ignite cache key/value
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public KeyValuePersistenceSettings(File settingsFile) {
+        InputStream in;
+
+        try {
+            in = new FileInputStream(settingsFile);
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to get input stream for Cassandra persistence settings file: " +
+                    settingsFile.getAbsolutePath(), e);
+        }
+
+        init(loadSettings(in));
+    }
+
+    /**
+     * Constructs Ignite cache key/value persistence settings.
+     *
+     * @param settingsRsrc resource containing xml with persistence settings for Ignite cache key/value
+     */
+    public KeyValuePersistenceSettings(Resource settingsRsrc) {
+        InputStream in;
+
+        try {
+            in = settingsRsrc.getInputStream();
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to get input stream for Cassandra persistence settings resource: " + settingsRsrc, e);
+        }
+
+        init(loadSettings(in));
+    }
+
+    /**
+     * Returns ttl to use for while inserting new rows into Cassandra table.
+     *
+     * @return ttl
+     */
+    public Integer getTTL() {
+        return ttl;
+    }
+
+    /**
+     * Returns Cassandra keyspace to use.
+     *
+     * @return keyspace.
+     */
+    public String getKeyspace() {
+        return keyspace;
+    }
+
+    /**
+     * Returns Cassandra table to use.
+     *
+     * @return table.
+     */
+    public String getTable() {
+        return tbl;
+    }
+
+    /**
+     * Returns full name of Cassandra table to use (including keyspace).
+     *
+     * @return full table name in format "keyspace.table".
+     */
+    public String getTableFullName()
+    {
+        return keyspace + "." + tbl;
+    }
+
+    /**
+     * Returns persistence settings for Ignite cache keys.
+     *
+     * @return keys persistence settings.
+     */
+    public KeyPersistenceSettings getKeyPersistenceSettings() {
+        return keyPersistenceSettings;
+    }
+
+    /**
+     * Returns persistence settings for Ignite cache values.
+     *
+     * @return values persistence settings.
+     */
+    public ValuePersistenceSettings getValuePersistenceSettings() {
+        return valPersistenceSettings;
+    }
+
+    /**
+     * Returns list of POJO fields to be mapped to Cassandra table columns.
+     *
+     * @return POJO fields list.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public List<PojoField> getFields() {
+        List<PojoField> fields = new LinkedList<>();
+
+        for (PojoField field : keyPersistenceSettings.getFields())
+            fields.add(field);
+
+        for (PojoField field : valPersistenceSettings.getFields())
+            fields.add(field);
+
+        return fields;
+    }
+
+    /**
+     * Returns list of Ignite cache key POJO fields to be mapped to Cassandra table columns.
+     *
+     * @return POJO fields list.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public List<PojoField> getKeyFields() {
+        return keyPersistenceSettings.getFields();
+    }
+
+    /**
+     * Returns list of Ignite cache value POJO fields to be mapped to Cassandra table columns.
+     *
+     * @return POJO fields list.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public List<PojoField> getValueFields() {
+        return valPersistenceSettings.getFields();
+    }
+
+    /**
+     * Returns DDL statement to create Cassandra keyspace.
+     *
+     * @return Keyspace DDL statement.
+     */
+    public String getKeyspaceDDLStatement() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("create keyspace if not exists ").append(keyspace);
+
+        if (keyspaceOptions != null) {
+            if (!keyspaceOptions.trim().toLowerCase().startsWith("with"))
+                builder.append("\nwith");
+
+            builder.append(" ").append(keyspaceOptions);
+        }
+
+        String statement = builder.toString().trim().replaceAll(" +", " ");
+
+        return statement.endsWith(";") ? statement : statement + ";";
+    }
+
+    /**
+     * Returns DDL statement to create Cassandra table.
+     *
+     * @return Table DDL statement.
+     */
+    public String getTableDDLStatement() {
+        String colsDDL = keyPersistenceSettings.getTableColumnsDDL() + ",\n" + valPersistenceSettings.getTableColumnsDDL();
+
+        String primaryKeyDDL = keyPersistenceSettings.getPrimaryKeyDDL();
+
+        String clusteringDDL = keyPersistenceSettings.getClusteringDDL();
+
+        String optionsDDL = tblOptions != null && !tblOptions.trim().isEmpty() ? tblOptions.trim() : "";
+
+        if (clusteringDDL != null && !clusteringDDL.isEmpty())
+            optionsDDL = optionsDDL.isEmpty() ? clusteringDDL : optionsDDL + " and " + clusteringDDL;
+
+        if (!optionsDDL.trim().isEmpty())
+            optionsDDL = optionsDDL.trim().toLowerCase().startsWith("with") ? optionsDDL.trim() : "with " + optionsDDL.trim();
+
+        StringBuilder builder = new StringBuilder();
+
+        builder.append("create table if not exists ").append(keyspace).append(".").append(tbl);
+        builder.append("\n(\n").append(colsDDL).append(",\n").append(primaryKeyDDL).append("\n)");
+
+        if (!optionsDDL.isEmpty())
+            builder.append(" \n").append(optionsDDL);
+
+        String tblDDL = builder.toString().trim().replaceAll(" +", " ");
+
+        return tblDDL.endsWith(";") ? tblDDL : tblDDL + ";";
+    }
+
+    /**
+     * Returns DDL statements to create Cassandra table secondary indexes.
+     *
+     * @return DDL statements to create secondary indexes.
+     */
+    public List<String> getIndexDDLStatements() {
+        List<String> idxDDLs = new LinkedList<>();
+
+        List<PojoField> fields = valPersistenceSettings.getFields();
+
+        for (PojoField field : fields) {
+            if (((PojoValueField)field).isIndexed())
+                idxDDLs.add(((PojoValueField)field).getIndexDDL(keyspace, tbl));
+        }
+
+        return idxDDLs;
+    }
+
+    /**
+     * Loads Ignite cache persistence settings from resource.
+     *
+     * @param in Input stream.
+     * @return String containing xml with Ignite cache persistence settings.
+     */
+    private String loadSettings(InputStream in) {
+        StringBuilder settings = new StringBuilder();
+        BufferedReader reader = null;
+
+        try {
+            reader = new BufferedReader(new InputStreamReader(in));
+
+            String line = reader.readLine();
+
+            while (line != null) {
+                if (settings.length() != 0)
+                    settings.append(SystemHelper.LINE_SEPARATOR);
+
+                settings.append(line);
+
+                line = reader.readLine();
+            }
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to read input stream for Cassandra persistence settings", e);
+        }
+        finally {
+            U.closeQuiet(reader);
+            U.closeQuiet(in);
+        }
+
+        return settings.toString();
+    }
+
+    /**
+     * @param elem Element with data.
+     * @param attr Attribute name.
+     * @return Numeric value for specified attribute.
+     */
+    private int extractIntAttribute(Element elem, String attr) {
+        String val = elem.getAttribute(attr).trim();
+
+        try {
+            return Integer.parseInt(val);
+        }
+        catch (NumberFormatException e) {
+            throw new IllegalArgumentException("Incorrect value '" + val + "' specified for '" + attr + "' attribute");
+        }
+    }
+
+    /**
+     * Initializes persistence settings from XML string.
+     *
+     * @param settings XML string containing Ignite cache persistence settings configuration.
+     */
+    @SuppressWarnings("IfCanBeSwitch")
+    private void init(String settings) {
+        Document doc;
+
+        try {
+            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+            DocumentBuilder builder = factory.newDocumentBuilder();
+            doc = builder.parse(new InputSource(new StringReader(settings)));
+        }
+        catch (Throwable e) {
+            throw new IllegalArgumentException("Failed to parse persistence settings:" +
+                SystemHelper.LINE_SEPARATOR + settings, e);
+        }
+
+        Element root = doc.getDocumentElement();
+
+        if (!PERSISTENCE_NODE.equals(root.getNodeName())) {
+            throw new IllegalArgumentException("Incorrect persistence settings specified. " +
+                "Root XML element should be 'persistence'");
+        }
+
+        if (!root.hasAttribute(KEYSPACE_ATTR)) {
+            throw new IllegalArgumentException("Incorrect persistence settings '" + KEYSPACE_ATTR +
+                "' attribute should be specified");
+        }
+
+        if (!root.hasAttribute(TABLE_ATTR)) {
+            throw new IllegalArgumentException("Incorrect persistence settings '" + TABLE_ATTR +
+                "' attribute should be specified");
+        }
+
+        keyspace = root.getAttribute(KEYSPACE_ATTR).trim();
+        tbl = root.getAttribute(TABLE_ATTR).trim();
+
+        if (root.hasAttribute(TTL_ATTR))
+            ttl = extractIntAttribute(root, TTL_ATTR);
+
+        if (!root.hasChildNodes()) {
+            throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                "there are no key and value persistence settings specified");
+        }
+
+        NodeList children = root.getChildNodes();
+        int cnt = children.getLength();
+
+        for (int i = 0; i < cnt; i++) {
+            Node node = children.item(i);
+
+            if (node.getNodeType() != Node.ELEMENT_NODE)
+                continue;
+
+            Element el = (Element)node;
+            String nodeName = el.getNodeName();
+
+            if (nodeName.equals(TABLE_OPTIONS_NODE)) {
+                tblOptions = el.getTextContent();
+                tblOptions = tblOptions.replace("\n", " ").replace("\r", "").replace("\t", " ");
+            }
+            else if (nodeName.equals(KEYSPACE_OPTIONS_NODE)) {
+                keyspaceOptions = el.getTextContent();
+                keyspaceOptions = keyspaceOptions.replace("\n", " ").replace("\r", "").replace("\t", " ");
+            }
+            else if (nodeName.equals(KEY_PERSISTENCE_NODE))
+                keyPersistenceSettings = new KeyPersistenceSettings(el);
+            else if (nodeName.equals(VALUE_PERSISTENCE_NODE))
+                valPersistenceSettings = new ValuePersistenceSettings(el);
+        }
+
+        if (keyPersistenceSettings == null) {
+            throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                "there are no key persistence settings specified");
+        }
+
+        if (valPersistenceSettings == null) {
+            throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                "there are no value persistence settings specified");
+        }
+
+        List<PojoField> keyFields = keyPersistenceSettings.getFields();
+        List<PojoField> valFields = valPersistenceSettings.getFields();
+
+        if (PersistenceStrategy.POJO.equals(keyPersistenceSettings.getStrategy()) &&
+            (keyFields == null || keyFields.isEmpty())) {
+            throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                "there are no key fields found");
+        }
+
+        if (PersistenceStrategy.POJO.equals(valPersistenceSettings.getStrategy()) &&
+            (valFields == null || valFields.isEmpty())) {
+            throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                "there are no value fields found");
+        }
+
+        if (keyFields == null || keyFields.isEmpty() || valFields == null || valFields.isEmpty())
+            return;
+
+        for (PojoField keyField : keyFields) {
+            for (PojoField valField : valFields) {
+                if (keyField.getColumn().equals(valField.getColumn())) {
+                    throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                        "key column '" + keyField.getColumn() + "' also specified as a value column");
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
new file mode 100644
index 0000000..e734ca3
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
+
+/**
+ * Intermediate layer between persistent store (Cassandra) and Ignite cache key/value classes.
+ * Handles  all the mappings to/from Java classes into Cassandra and responsible for all the details
+ * of how Java objects should be written/loaded to/from Cassandra.
+ */
+public class PersistenceController {
+    /** Ignite cache key/value persistence settings. */
+    private KeyValuePersistenceSettings persistenceSettings;
+
+    /** CQL statement to insert row into Cassandra table. */
+    private String writeStatement;
+
+    /** CQL statement to delete row from Cassandra table. */
+    private String delStatement;
+
+    /** CQL statement to select value fields from Cassandra table. */
+    private String loadStatement;
+
+    /** CQL statement to select key/value fields from Cassandra table. */
+    private String loadStatementWithKeyFields;
+
+    /**
+     * Constructs persistence controller from Ignite cache persistence settings.
+     *
+     * @param settings persistence settings.
+     */
+    public PersistenceController(KeyValuePersistenceSettings settings) {
+        if (settings == null)
+            throw new IllegalArgumentException("Persistent settings can't be null");
+
+        this.persistenceSettings = settings;
+    }
+
+    /**
+     * Returns Ignite cache persistence settings.
+     *
+     * @return persistence settings.
+     */
+    public KeyValuePersistenceSettings getPersistenceSettings() {
+        return persistenceSettings;
+    }
+
+    /**
+     * Returns Cassandra keyspace to use.
+     *
+     * @return keyspace.
+     */
+    public String getKeyspace() {
+        return persistenceSettings.getKeyspace();
+    }
+
+    /**
+     * Returns Cassandra table to use.
+     *
+     * @return table.
+     */
+    public String getTable() {
+        return persistenceSettings.getTable();
+    }
+
+    /**
+     * Returns CQL statement to insert row into Cassandra table.
+     *
+     * @return CQL statement.
+     */
+    public String getWriteStatement() {
+        if (writeStatement != null)
+            return writeStatement;
+
+        List<String> cols = getKeyValueColumns();
+
+        StringBuilder colsList = new StringBuilder();
+        StringBuilder questionsList = new StringBuilder();
+
+        for (String column : cols) {
+            if (colsList.length() != 0) {
+                colsList.append(", ");
+                questionsList.append(",");
+            }
+
+            colsList.append(column);
+            questionsList.append("?");
+        }
+
+        writeStatement = "insert into " + persistenceSettings.getKeyspace() + "." + persistenceSettings.getTable() + " (" +
+            colsList.toString() + ") values (" + questionsList.toString() + ")";
+
+        if (persistenceSettings.getTTL() != null)
+            writeStatement += " using ttl " + persistenceSettings.getTTL();
+
+        writeStatement += ";";
+
+        return writeStatement;
+    }
+
+    /**
+     * Returns CQL statement to delete row from Cassandra table.
+     *
+     * @return CQL statement.
+     */
+    public String getDeleteStatement() {
+        if (delStatement != null)
+            return delStatement;
+
+        List<String> cols = getKeyColumns();
+
+        StringBuilder statement = new StringBuilder();
+
+        for (String column : cols) {
+            if (statement.length() != 0)
+                statement.append(" and ");
+
+            statement.append(column).append("=?");
+        }
+
+        statement.append(";");
+
+        delStatement = "delete from " +
+            persistenceSettings.getKeyspace() + "." +
+            persistenceSettings.getTable() + " where " +
+            statement.toString();
+
+        return delStatement;
+    }
+
+    /**
+     * Returns CQL statement to select key/value fields from Cassandra table.
+     *
+     * @param includeKeyFields whether to include/exclude key fields from the returned row.
+     *
+     * @return CQL statement.
+     */
+    public String getLoadStatement(boolean includeKeyFields) {
+        if (loadStatement != null && loadStatementWithKeyFields != null)
+            return includeKeyFields ? loadStatementWithKeyFields : loadStatement;
+
+        List<String> valCols = getValueColumns();
+
+        List<String> keyCols = getKeyColumns();
+
+        StringBuilder hdrWithKeyFields = new StringBuilder("select ");
+
+        for (int i = 0; i < keyCols.size(); i++) {
+            if (i > 0)
+                hdrWithKeyFields.append(", ");
+
+            hdrWithKeyFields.append(keyCols.get(i));
+        }
+
+        StringBuilder hdr = new StringBuilder("select ");
+
+        for (int i = 0; i < valCols.size(); i++) {
+            if (i > 0)
+                hdr.append(", ");
+
+            hdrWithKeyFields.append(",");
+
+            hdr.append(valCols.get(i));
+            hdrWithKeyFields.append(valCols.get(i));
+        }
+
+        StringBuilder statement = new StringBuilder();
+
+        statement.append(" from ");
+        statement.append(persistenceSettings.getKeyspace());
+        statement.append(".").append(persistenceSettings.getTable());
+        statement.append(" where ");
+
+        for (int i = 0; i < keyCols.size(); i++) {
+            if (i > 0)
+                statement.append(" and ");
+
+            statement.append(keyCols.get(i)).append("=?");
+        }
+
+        statement.append(";");
+
+        loadStatement = hdr.toString() + statement.toString();
+        loadStatementWithKeyFields = hdrWithKeyFields.toString() + statement.toString();
+
+        return includeKeyFields ? loadStatementWithKeyFields : loadStatement;
+    }
+
+    /**
+     * Binds Ignite cache key object to {@link com.datastax.driver.core.PreparedStatement}.
+     *
+     * @param statement statement to which key object should be bind.
+     * @param key key object.
+     *
+     * @return statement with bounded key.
+     */
+    public BoundStatement bindKey(PreparedStatement statement, Object key) {
+        KeyPersistenceSettings settings = persistenceSettings.getKeyPersistenceSettings();
+
+        Object[] values = getBindingValues(settings.getStrategy(),
+            settings.getSerializer(), settings.getFields(), key);
+
+        return statement.bind(values);
+    }
+
+    /**
+     * Binds Ignite cache key and value object to {@link com.datastax.driver.core.PreparedStatement}.
+     *
+     * @param statement statement to which key and value object should be bind.
+     * @param key key object.
+     * @param val value object.
+     *
+     * @return statement with bounded key and value.
+     */
+    public BoundStatement bindKeyValue(PreparedStatement statement, Object key, Object val) {
+        KeyPersistenceSettings keySettings = persistenceSettings.getKeyPersistenceSettings();
+        Object[] keyValues = getBindingValues(keySettings.getStrategy(),
+            keySettings.getSerializer(), keySettings.getFields(), key);
+
+        ValuePersistenceSettings valSettings = persistenceSettings.getValuePersistenceSettings();
+        Object[] valValues = getBindingValues(valSettings.getStrategy(),
+            valSettings.getSerializer(), valSettings.getFields(), val);
+
+        Object[] values = new Object[keyValues.length + valValues.length];
+
+        int i = 0;
+
+        for (Object keyVal : keyValues) {
+            values[i] = keyVal;
+            i++;
+        }
+
+        for (Object valVal : valValues) {
+            values[i] = valVal;
+            i++;
+        }
+
+        return statement.bind(values);
+    }
+
+    /**
+     * Builds Ignite cache key object from returned Cassandra table row.
+     *
+     * @param row Cassandra table row.
+     *
+     * @return key object.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public Object buildKeyObject(Row row) {
+        return buildObject(row, persistenceSettings.getKeyPersistenceSettings());
+    }
+
+    /**
+     * Builds Ignite cache value object from Cassandra table row .
+     *
+     * @param row Cassandra table row.
+     *
+     * @return value object.
+     */
+    public Object buildValueObject(Row row) {
+        return buildObject(row, persistenceSettings.getValuePersistenceSettings());
+    }
+
+    /**
+     * Builds object from Cassandra table row.
+     *
+     * @param row Cassandra table row.
+     * @param settings persistence settings to use.
+     *
+     * @return object.
+     */
+    private Object buildObject(Row row, PersistenceSettings settings) {
+        if (row == null)
+            return null;
+
+        PersistenceStrategy stgy = settings.getStrategy();
+
+        Class clazz = settings.getJavaClass();
+
+        String col = settings.getColumn();
+
+        List<PojoField> fields = settings.getFields();
+
+        if (PersistenceStrategy.PRIMITIVE.equals(stgy))
+            return PropertyMappingHelper.getCassandraColumnValue(row, col, clazz, null);
+
+        if (PersistenceStrategy.BLOB.equals(stgy))
+            return settings.getSerializer().deserialize(row.getBytes(col));
+
+        Object obj;
+
+        try {
+            obj = clazz.newInstance();
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to instantiate object of type '" + clazz.getName() + "' using reflection", e);
+        }
+
+        for (PojoField field : fields)
+            field.setValueFromRow(row, obj, settings.getSerializer());
+
+        return obj;
+    }
+
+    /**
+     * Extracts field values from POJO object and converts them into Java types
+     * which could be mapped to Cassandra types.
+     *
+     * @param stgy persistence strategy to use.
+     * @param serializer serializer to use for BLOBs.
+     * @param fields fields who's values should be extracted.
+     * @param obj object instance who's field values should be extracted.
+     *
+     * @return array of object field values converted into Java object instances having Cassandra compatible types
+     */
+    private Object[] getBindingValues(PersistenceStrategy stgy, Serializer serializer, List<PojoField> fields, Object obj) {
+        if (PersistenceStrategy.PRIMITIVE.equals(stgy)) {
+            if (PropertyMappingHelper.getCassandraType(obj.getClass()) == null ||
+                obj.getClass().equals(ByteBuffer.class) || obj instanceof byte[]) {
+                throw new IllegalArgumentException("Couldn't deserialize instance of class '" +
+                    obj.getClass().getName() + "' using PRIMITIVE strategy. Please use BLOB strategy for this case.");
+            }
+
+            return new Object[] {obj};
+        }
+
+        if (PersistenceStrategy.BLOB.equals(stgy))
+            return new Object[] {serializer.serialize(obj)};
+
+        Object[] values = new Object[fields.size()];
+
+        int i = 0;
+
+        for (PojoField field : fields) {
+            Object val = field.getValueFromObject(obj, serializer);
+
+            if (val instanceof byte[])
+                val = ByteBuffer.wrap((byte[]) val);
+
+            values[i] = val;
+
+            i++;
+        }
+
+        return values;
+    }
+
+    /**
+     * Returns list of Cassandra table columns mapped to Ignite cache key and value fields
+     *
+     * @return list of column names
+     */
+    private List<String> getKeyValueColumns() {
+        List<String> cols = getKeyColumns();
+
+        cols.addAll(getValueColumns());
+
+        return cols;
+    }
+
+    /**
+     * Returns list of Cassandra table columns mapped to Ignite cache key fields
+     *
+     * @return list of column names
+     */
+    private List<String> getKeyColumns() {
+        return getColumns(persistenceSettings.getKeyPersistenceSettings());
+    }
+
+    /**
+     * Returns list of Cassandra table columns mapped to Ignite cache value fields
+     *
+     * @return list of column names
+     */
+    private List<String> getValueColumns() {
+        return getColumns(persistenceSettings.getValuePersistenceSettings());
+    }
+
+    /**
+     * Returns list of Cassandra table columns based on persistence strategy to use
+     *
+     * @return list of column names
+     */
+    private List<String> getColumns(PersistenceSettings settings) {
+        List<String> cols = new LinkedList<>();
+
+        if (!PersistenceStrategy.POJO.equals(settings.getStrategy())) {
+            cols.add(settings.getColumn());
+            return cols;
+        }
+
+        for (PojoField field : settings.getFields())
+            cols.add(field.getColumn());
+
+        return cols;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java
new file mode 100644
index 0000000..20d790a
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import com.datastax.driver.core.DataType;
+import java.beans.PropertyDescriptor;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer;
+import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
+import org.w3c.dom.Element;
+
+/**
+ * Stores persistence settings, which describes how particular key/value
+ * from Ignite cache should be stored in Cassandra.
+ */
+public abstract class PersistenceSettings implements Serializable {
+    /** Xml attribute specifying persistence strategy. */
+    private static final String STRATEGY_ATTR = "strategy";
+
+    /** Xml attribute specifying Cassandra column name. */
+    private static final String COLUMN_ATTR = "column";
+
+    /** Xml attribute specifying BLOB serializer to use. */
+    private static final String SERIALIZER_ATTR = "serializer";
+
+    /** Xml attribute specifying java class of the object to be persisted. */
+    private static final String CLASS_ATTR = "class";
+
+    /** Persistence strategy to use. */
+    private PersistenceStrategy stgy;
+
+    /** Java class of the object to be persisted. */
+    private Class javaCls;
+
+    /** Cassandra table column name where object should be persisted in
+     *  case of using BLOB or PRIMITIVE persistence strategy. */
+    private String col;
+
+    /** Serializer for BLOBs. */
+    private Serializer serializer = new JavaSerializer();
+
+    /**
+     * Extracts property descriptor from the descriptors list by its name.
+     *
+     * @param descriptors descriptors list.
+     * @param propName property name.
+     *
+     * @return property descriptor.
+     */
+    public static PropertyDescriptor findPropertyDescriptor(List<PropertyDescriptor> descriptors, String propName) {
+        if (descriptors == null || descriptors.isEmpty() || propName == null || propName.trim().isEmpty())
+            return null;
+
+        for (PropertyDescriptor descriptor : descriptors) {
+            if (descriptor.getName().equals(propName))
+                return descriptor;
+        }
+
+        return null;
+    }
+
+    /**
+     * Constructs persistence settings from corresponding XML element.
+     *
+     * @param el xml element containing persistence settings configuration.
+     */
+    @SuppressWarnings("unchecked")
+    public PersistenceSettings(Element el) {
+        if (el == null)
+            throw new IllegalArgumentException("DOM element representing key/value persistence object can't be null");
+
+        if (!el.hasAttribute(STRATEGY_ATTR)) {
+            throw new IllegalArgumentException("DOM element representing key/value persistence object should have '" +
+                STRATEGY_ATTR + "' attribute");
+        }
+
+        try {
+            stgy = PersistenceStrategy.valueOf(el.getAttribute(STRATEGY_ATTR).trim().toUpperCase());
+        }
+        catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Incorrect persistence strategy specified: " + el.getAttribute(STRATEGY_ATTR));
+        }
+
+        if (!el.hasAttribute(CLASS_ATTR) && !PersistenceStrategy.BLOB.equals(stgy)) {
+            throw new IllegalArgumentException("DOM element representing key/value persistence object should have '" +
+                CLASS_ATTR + "' attribute or have BLOB persistence strategy");
+        }
+
+        try {
+            javaCls = el.hasAttribute(CLASS_ATTR) ? getClassInstance(el.getAttribute(CLASS_ATTR).trim()) : null;
+        }
+        catch (Throwable e) {
+            throw new IllegalArgumentException("Incorrect java class specified '" + el.getAttribute(CLASS_ATTR) + "' " +
+                "for Cassandra persistence", e);
+        }
+
+        if (!PersistenceStrategy.BLOB.equals(stgy) &&
+            (ByteBuffer.class.equals(javaCls) || byte[].class.equals(javaCls))) {
+            throw new IllegalArgumentException("Java class '" + el.getAttribute(CLASS_ATTR) + "' " +
+                "specified could only be persisted using BLOB persistence strategy");
+        }
+
+        if (PersistenceStrategy.PRIMITIVE.equals(stgy) &&
+            PropertyMappingHelper.getCassandraType(javaCls) == null) {
+            throw new IllegalArgumentException("Current implementation doesn't support persisting '" +
+                javaCls.getName() + "' object using PRIMITIVE strategy");
+        }
+
+        if (PersistenceStrategy.POJO.equals(stgy)) {
+            if (javaCls == null)
+                throw new IllegalStateException("Object java class should be specified for POJO persistence strategy");
+
+            try {
+                javaCls.getConstructor();
+            }
+            catch (Throwable e) {
+                throw new IllegalArgumentException("Java class '" + javaCls.getName() + "' couldn't be used as POJO " +
+                    "cause it doesn't have no arguments constructor", e);
+            }
+        }
+
+        if (el.hasAttribute(COLUMN_ATTR)) {
+            if (!PersistenceStrategy.BLOB.equals(stgy) && !PersistenceStrategy.PRIMITIVE.equals(stgy)) {
+                throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " +
+                    "'" + COLUMN_ATTR + "' attribute is only applicable for PRIMITIVE or BLOB strategy");
+            }
+
+            col = el.getAttribute(COLUMN_ATTR).trim();
+        }
+
+        if (el.hasAttribute(SERIALIZER_ATTR)) {
+            if (!PersistenceStrategy.BLOB.equals(stgy) && !PersistenceStrategy.POJO.equals(stgy)) {
+                throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " +
+                    "'" + SERIALIZER_ATTR + "' attribute is only applicable for BLOB and POJO strategies");
+            }
+
+            Object obj = newObjectInstance(el.getAttribute(SERIALIZER_ATTR).trim());
+
+            if (!(obj instanceof Serializer)) {
+                throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " +
+                    "serializer class '" + el.getAttribute(SERIALIZER_ATTR) + "' doesn't implement '" +
+                    Serializer.class.getName() + "' interface");
+            }
+
+            serializer = (Serializer)obj;
+        }
+
+        if ((PersistenceStrategy.BLOB.equals(stgy) || PersistenceStrategy.PRIMITIVE.equals(stgy)) && col == null)
+            col = defaultColumnName();
+    }
+
+    /**
+     * Returns java class of the object to be persisted.
+     *
+     * @return java class.
+     */
+    public Class getJavaClass() {
+        return javaCls;
+    }
+
+    /**
+     * Returns persistence strategy to use.
+     *
+     * @return persistence strategy.
+     */
+    public PersistenceStrategy getStrategy() {
+        return stgy;
+    }
+
+    /**
+     * Returns Cassandra table column name where object should be persisted in
+     * case of using BLOB or PRIMITIVE persistence strategy.
+     *
+     * @return column name.
+     */
+    public String getColumn() {
+        return col;
+    }
+
+    /**
+     * Returns serializer to be used for BLOBs.
+     *
+     * @return serializer.
+     */
+    public Serializer getSerializer() {
+        return serializer;
+    }
+
+    /**
+     * Returns list of POJO fields to be persisted.
+     *
+     * @return list of fields.
+     */
+    public abstract List<PojoField> getFields();
+
+    /**
+     * Returns Cassandra table columns DDL, corresponding to POJO fields which should be persisted.
+     *
+     * @return DDL statement for Cassandra table fields
+     */
+    public String getTableColumnsDDL() {
+        if (PersistenceStrategy.BLOB.equals(stgy))
+            return "  " + col + " " + DataType.Name.BLOB.toString();
+
+        if (PersistenceStrategy.PRIMITIVE.equals(stgy))
+            return "  " + col + " " + PropertyMappingHelper.getCassandraType(javaCls);
+
+        StringBuilder builder = new StringBuilder();
+
+        for (PojoField field : getFields()) {
+            if (builder.length() > 0)
+                builder.append(",\n");
+
+            builder.append("  ").append(field.getColumnDDL());
+        }
+
+        if (builder.length() == 0) {
+            throw new IllegalStateException("There are no POJO fields found for '" + javaCls.toString()
+                + "' class to be presented as a Cassandra primary key");
+        }
+
+        return builder.toString();
+    }
+
+    /**
+     * Returns default name for Cassandra column (if it's not specified explicitly).
+     *
+     * @return column name
+     */
+    protected abstract String defaultColumnName();
+
+    /**
+     * Checks if there are POJO filed with the same name or same Cassandra column specified in persistence settings
+     *
+     * @param fields list of fields to be persisted into Cassandra
+     */
+    protected void checkDuplicates(List<PojoField> fields) {
+        if (fields == null || fields.isEmpty())
+            return;
+
+        for (PojoField field1 : fields) {
+            boolean sameNames = false;
+            boolean sameCols = false;
+
+            for (PojoField field2 : fields) {
+                if (field1.getName().equals(field2.getName())) {
+                    if (sameNames) {
+                        throw new IllegalArgumentException("Incorrect Cassandra key persistence settings, " +
+                            "two POJO fields with the same name '" + field1.getName() + "' specified");
+                    }
+
+                    sameNames = true;
+                }
+
+                if (field1.getColumn().equals(field2.getColumn())) {
+                    if (sameCols) {
+                        throw new IllegalArgumentException("Incorrect Cassandra persistence settings, " +
+                            "two POJO fields with the same column '" + field1.getColumn() + "' specified");
+                    }
+
+                    sameCols = true;
+                }
+            }
+        }
+    }
+
+    /**
+     * Instantiates Class object for particular class
+     *
+     * @param clazz class name
+     * @return Class object
+     */
+    private Class getClassInstance(String clazz) {
+        try {
+            return Class.forName(clazz);
+        }
+        catch (ClassNotFoundException ignored) {
+        }
+
+        try {
+            return Class.forName(clazz, true, Thread.currentThread().getContextClassLoader());
+        }
+        catch (ClassNotFoundException ignored) {
+        }
+
+        try {
+            return Class.forName(clazz, true, PersistenceSettings.class.getClassLoader());
+        }
+        catch (ClassNotFoundException ignored) {
+        }
+
+        try {
+            return Class.forName(clazz, true, ClassLoader.getSystemClassLoader());
+        }
+        catch (ClassNotFoundException ignored) {
+        }
+
+        throw new IgniteException("Failed to load class '" + clazz + "' using reflection");
+    }
+
+    /**
+     * Creates new object instance of particular class
+     *
+     * @param clazz class name
+     * @return object
+     */
+    private Object newObjectInstance(String clazz) {
+        try {
+            return getClassInstance(clazz).newInstance();
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to instantiate class '" + clazz + "' using default constructor", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java
new file mode 100644
index 0000000..4b1e2d8
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+/**
+ * Describes persistence strategy to be used to persist object data into Cassandra.
+ */
+public enum PersistenceStrategy {
+    /**
+     * Stores object value as is, by mapping its value to Cassandra table column with corresponding type.
+     * <p>
+     * Could be used for primitive java type (like Integer, String, Long and etc) which could be directly mapped
+     * to appropriate Cassandra types.
+     */
+    PRIMITIVE,
+
+    /**
+     * Stores object value as BLOB, by mapping its value to Cassandra table column with blob type.
+     * Could be used for any java type. Conversion of java object to BLOB is handled by specified serializer.
+     * <p>
+     * Available serializer implementations:
+     * <ul>
+     *     <li>
+     *         org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer - uses standard Java
+     *         serialization framework.
+     *     </li>
+     *     <li>
+     *        org.apache.ignite.cache.store.cassandra.serializer.KryoSerializer - uses Kryo serialization
+     *        framework.
+     *     </li>
+     * </ul>
+     */
+    BLOB,
+
+    /**
+     * Stores each field of an object as a column having corresponding type in Cassandra table.
+     * Provides ability to utilize Cassandra secondary indexes for object fields.
+     * <p>
+     * Could be used for objects which follow JavaBeans convention and having empty public constructor.
+     * Object fields should be:
+     * <ul>
+     *     <li>Primitive java types like int, long, String and etc.</li>
+     *     <li>Collections of primitive java types like List<Integer>, Map<Integer, String>, Set<Long></li>
+     * </ul>
+     */
+    POJO
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
new file mode 100644
index 0000000..af569fd
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Row;
+import java.beans.PropertyDescriptor;
+import java.io.Serializable;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
+import org.w3c.dom.Element;
+
+/**
+ * Descriptor for particular field in a POJO object, specifying how this field
+ * should be written to or loaded from Cassandra.
+ */
+public abstract class PojoField implements Serializable {
+    /** Name attribute of XML element describing Pojo field. */
+    private static final String NAME_ATTR = "name";
+
+    /** Column attribute of XML element describing Pojo field. */
+    private static final String COLUMN_ATTR = "column";
+
+    /** Field name. */
+    private String name;
+
+    /** Java class to which the field belongs. */
+    private Class javaCls;
+
+    /** Field column name in Cassandra table. */
+    private String col;
+
+    /** Field column DDL.  */
+    private String colDDL;
+
+    /** Field property descriptor. */
+    private transient PropertyDescriptor desc;
+
+    /**
+     * Creates instance of {@link PojoField} based on it's description in XML element.
+     *
+     * @param el XML element describing Pojo field
+     * @param pojoCls Pojo java class.
+     */
+    public PojoField(Element el, Class<?> pojoCls) {
+        if (el == null)
+            throw new IllegalArgumentException("DOM element representing POJO field object can't be null");
+
+        if (!el.hasAttribute(NAME_ATTR)) {
+            throw new IllegalArgumentException("DOM element representing POJO field object should have '"
+                + NAME_ATTR + "' attribute");
+        }
+
+        this.name = el.getAttribute(NAME_ATTR).trim();
+        this.col = el.hasAttribute(COLUMN_ATTR) ? el.getAttribute(COLUMN_ATTR).trim() : name.toLowerCase();
+
+        init(PropertyMappingHelper.getPojoPropertyDescriptor(pojoCls, name));
+    }
+
+    /**
+     * Creates instance of {@link PojoField}  from its property descriptor.
+     *
+     * @param desc Field property descriptor.
+     */
+    public PojoField(PropertyDescriptor desc) {
+        this.name = desc.getName();
+
+        QuerySqlField sqlField = desc.getReadMethod() != null ?
+            desc.getReadMethod().getAnnotation(QuerySqlField.class) :
+            desc.getWriteMethod() == null ?
+                null :
+                desc.getWriteMethod().getAnnotation(QuerySqlField.class);
+
+        this.col = sqlField != null && sqlField.name() != null ? sqlField.name() : name.toLowerCase();
+
+        init(desc);
+
+        if (sqlField != null)
+            init(sqlField);
+    }
+
+    /**
+     * @return field name.
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * @return Cassandra table column name.
+     */
+    public String getColumn() {
+        return col;
+    }
+
+    /**
+     * @return Cassandra table column DDL statement.
+     */
+    public String getColumnDDL() {
+        return colDDL;
+    }
+
+    /**
+     * Gets field value as an object having Cassandra compatible type.
+     * This it could be stored directly into Cassandra without any conversions.
+     *
+     * @param obj Object instance.
+     * @param serializer {@link org.apache.ignite.cache.store.cassandra.serializer.Serializer} to use.
+     * @return Object to store in Cassandra table column.
+     */
+    public Object getValueFromObject(Object obj, Serializer serializer) {
+        try {
+            Object val = propDesc().getReadMethod().invoke(obj);
+
+            if (val == null)
+                return null;
+
+            DataType.Name cassandraType = PropertyMappingHelper.getCassandraType(val.getClass());
+
+            if (cassandraType != null)
+                return val;
+
+            if (serializer == null) {
+                throw new IllegalStateException("Can't serialize value from object '" +
+                    val.getClass().getName() + "' field '" + name + "', cause there is no BLOB serializer specified");
+            }
+
+            return serializer.serialize(val);
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to get value of the field '" + name + "' from the instance " +
+                " of '" + obj.getClass().toString() + "' class", e);
+        }
+    }
+
+    /**
+     * Sets object field value from a {@link com.datastax.driver.core.Row} returned by Cassandra CQL statement.
+     *
+     * @param row {@link com.datastax.driver.core.Row}
+     * @param obj object which field should be populated from {@link com.datastax.driver.core.Row}
+     * @param serializer {@link org.apache.ignite.cache.store.cassandra.serializer.Serializer} to use.
+     */
+    public void setValueFromRow(Row row, Object obj, Serializer serializer) {
+        Object val = PropertyMappingHelper.getCassandraColumnValue(row, col, propDesc().getPropertyType(), serializer);
+
+        try {
+            propDesc().getWriteMethod().invoke(obj, val);
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to set value of the field '" + name + "' of the instance " +
+                " of '" + obj.getClass().toString() + "' class", e);
+        }
+    }
+
+    /**
+     * Initializes field info from annotation.
+     *
+     * @param sqlField {@link QuerySqlField} annotation.
+     */
+    protected abstract void init(QuerySqlField sqlField);
+
+    /**
+     * Initializes field info from property descriptor.
+     *
+     * @param desc {@link PropertyDescriptor} descriptor.
+     */
+    protected void init(PropertyDescriptor desc) {
+        if (desc.getReadMethod() == null) {
+            throw new IllegalArgumentException("Field '" + desc.getName() +
+                "' of the class instance '" + desc.getPropertyType().getName() +
+                "' doesn't provide getter method");
+        }
+
+        if (desc.getWriteMethod() == null) {
+            throw new IllegalArgumentException("Field '" + desc.getName() +
+                "' of POJO object instance of the class '" + desc.getPropertyType().getName() +
+                "' doesn't provide write method");
+        }
+
+        if (!desc.getReadMethod().isAccessible())
+            desc.getReadMethod().setAccessible(true);
+
+        if (!desc.getWriteMethod().isAccessible())
+            desc.getWriteMethod().setAccessible(true);
+
+        DataType.Name cassandraType = PropertyMappingHelper.getCassandraType(desc.getPropertyType());
+        cassandraType = cassandraType == null ? DataType.Name.BLOB : cassandraType;
+
+        this.javaCls = desc.getReadMethod().getDeclaringClass();
+        this.desc = desc;
+        this.colDDL = col + " " + cassandraType.toString();
+    }
+
+    /**
+     * Returns property descriptor of the POJO field
+     *
+     * @return Property descriptor
+     */
+    private PropertyDescriptor propDesc() {
+        return desc != null ? desc : (desc = PropertyMappingHelper.getPojoPropertyDescriptor(javaCls, name));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
new file mode 100644
index 0000000..4e86d74
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.w3c.dom.Element;
+
+/**
+ * Descriptor for Ignite key POJO class
+ */
+public class PojoKeyField extends PojoField {
+
+    /**
+     * Specifies sort order for POJO key field
+     */
+    public enum SortOrder {
+        /** Ascending sort order. */
+        ASC,
+        /** Descending sort order. */
+        DESC
+    }
+
+    /** Xml attribute specifying sort order. */
+    private static final String SORT_ATTR = "sort";
+
+    /** Sort order. */
+    private SortOrder sortOrder = null;
+
+    /**
+     * Constructs Ignite cache key POJO object descriptor.
+     *
+     * @param el xml configuration element.
+     * @param pojoCls java class of key POJO field.
+     */
+    public PojoKeyField(Element el, Class pojoCls) {
+        super(el, pojoCls);
+
+        if (el.hasAttribute(SORT_ATTR)) {
+            try {
+                sortOrder = SortOrder.valueOf(el.getAttribute(SORT_ATTR).trim().toUpperCase());
+            }
+            catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Incorrect sort order '" + el.getAttribute(SORT_ATTR) + "' specified");
+            }
+        }
+    }
+
+    /**
+     * Constructs Ignite cache key POJO object descriptor.
+     *
+     * @param desc property descriptor.
+     */
+    public PojoKeyField(PropertyDescriptor desc) {
+        super(desc);
+    }
+
+    /**
+     * Returns sort order for the field.
+     *
+     * @return sort order.
+     */
+    public SortOrder getSortOrder() {
+        return sortOrder;
+    }
+
+    /**
+     * Initializes descriptor from {@link QuerySqlField} annotation.
+     *
+     * @param sqlField {@link QuerySqlField} annotation.
+     */
+    protected void init(QuerySqlField sqlField) {
+        if (sqlField.descending())
+            sortOrder = SortOrder.DESC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
new file mode 100644
index 0000000..c29f1db
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.w3c.dom.Element;
+
+/**
+ * Descriptor for Ignite value POJO class
+ */
+public class PojoValueField extends PojoField {
+    /** Xml attribute specifying that Cassandra column is static. */
+    private static final String STATIC_ATTR = "static";
+
+    /** Xml attribute specifying that secondary index should be created for Cassandra column. */
+    private static final String INDEX_ATTR = "index";
+
+    /** Xml attribute specifying secondary index custom class. */
+    private static final String INDEX_CLASS_ATTR = "indexClass";
+
+    /** Xml attribute specifying secondary index options. */
+    private static final String INDEX_OPTIONS_ATTR = "indexOptions";
+
+    /** Indicates if Cassandra column should be indexed. */
+    private Boolean isIndexed;
+
+    /** Custom java class for Cassandra secondary index. */
+    private String idxCls;
+
+    /** Secondary index options. */
+    private String idxOptions;
+
+    /** Indicates if Cassandra column is static. */
+    private Boolean isStatic;
+
+    /**
+     * Constructs Ignite cache value field descriptor.
+     *
+     * @param el field descriptor xml configuration element.
+     * @param pojoCls field java class
+     */
+    public PojoValueField(Element el, Class pojoCls) {
+        super(el, pojoCls);
+
+        if (el.hasAttribute(STATIC_ATTR))
+            isStatic = Boolean.parseBoolean(el.getAttribute(STATIC_ATTR).trim().toLowerCase());
+
+        if (el.hasAttribute(INDEX_ATTR))
+            isIndexed = Boolean.parseBoolean(el.getAttribute(INDEX_ATTR).trim().toLowerCase());
+
+        if (el.hasAttribute(INDEX_CLASS_ATTR))
+            idxCls = el.getAttribute(INDEX_CLASS_ATTR).trim();
+
+        if (el.hasAttribute(INDEX_OPTIONS_ATTR)) {
+            idxOptions = el.getAttribute(INDEX_OPTIONS_ATTR).trim();
+
+            if (!idxOptions.toLowerCase().startsWith("with")) {
+                idxOptions = idxOptions.toLowerCase().startsWith("options") ?
+                    "with " + idxOptions :
+                    "with options = " + idxOptions;
+            }
+        }
+    }
+
+    /**
+     * Constructs Ignite cache value field descriptor.
+     *
+     * @param desc field property descriptor.
+     */
+    public PojoValueField(PropertyDescriptor desc) {
+        super(desc);
+    }
+
+    /**
+     * Returns DDL for Cassandra columns corresponding to POJO field.
+     *
+     * @return columns DDL.
+     */
+    public String getColumnDDL() {
+        String colDDL = super.getColumnDDL();
+
+        if (isStatic != null && isStatic)
+            colDDL = colDDL + " static";
+
+        return colDDL;
+    }
+
+    /**
+     * Indicates if secondary index should be created for the field.
+     *
+     * @return true/false if secondary index should/shouldn't be created for the field.
+     */
+    public boolean isIndexed() {
+        return isIndexed != null && isIndexed;
+    }
+
+    /**
+     * Returns DDL for the field secondary index.
+     *
+     * @param keyspace Cassandra keyspace where index should be created.
+     * @param tbl Cassandra table for which secondary index should be created.
+     *
+     * @return secondary index DDL.
+     */
+    public String getIndexDDL(String keyspace, String tbl) {
+        if (isIndexed == null || !isIndexed)
+            return null;
+
+        StringBuilder builder = new StringBuilder();
+
+        if (idxCls != null)
+            builder.append("create custom index if not exists on ").append(keyspace).append(".").append(tbl);
+        else
+            builder.append("create index if not exists on ").append(keyspace).append(".").append(tbl);
+
+        builder.append(" (").append(getColumn()).append(")");
+
+        if (idxCls != null)
+            builder.append(" using '").append(idxCls).append("'");
+
+        if (idxOptions != null)
+            builder.append(" ").append(idxOptions);
+
+        return builder.append(";").toString();
+    }
+
+    /**
+     * Initializes descriptor from {@link QuerySqlField} annotation.
+     *
+     * @param sqlField {@link QuerySqlField} annotation.
+     */
+    protected void init(QuerySqlField sqlField) {
+        if (sqlField.index())
+            isIndexed = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java
new file mode 100644
index 0000000..877167d
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * Stores persistence settings for Ignite cache value
+ */
+public class ValuePersistenceSettings extends PersistenceSettings {
+    /** XML element describing value field settings. */
+    private static final String FIELD_ELEMENT = "field";
+
+    /** Value fields. */
+    private List<PojoField> fields = new LinkedList<>();
+
+    /**
+     * Creates class instance from XML configuration.
+     *
+     * @param el XML element describing value persistence settings.
+     */
+    public ValuePersistenceSettings(Element el) {
+        super(el);
+
+        if (!PersistenceStrategy.POJO.equals(getStrategy()))
+            return;
+
+        NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT);
+
+        fields = detectFields(nodes);
+
+        if (fields.isEmpty())
+            throw new IllegalStateException("Failed to initialize value fields for class '" + getJavaClass().getName() + "'");
+
+        checkDuplicates(fields);
+    }
+
+    /**
+     * @return List of value fields.
+     */
+    public List<PojoField> getFields() {
+        return fields == null ? null : Collections.unmodifiableList(fields);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String defaultColumnName() {
+        return "value";
+    }
+
+    /**
+     * Extracts POJO fields from a list of corresponding XML field nodes.
+     *
+     * @param fieldNodes Field nodes to process.
+     * @return POJO fields list.
+     */
+    private List<PojoField> detectFields(NodeList fieldNodes) {
+        List<PojoField> list = new LinkedList<>();
+
+        if (fieldNodes == null || fieldNodes.getLength() == 0) {
+            List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
+            for (PropertyDescriptor descriptor : primitivePropDescriptors)
+                list.add(new PojoValueField(descriptor));
+
+            return list;
+        }
+
+        List<PropertyDescriptor> allPropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), false);
+
+        int cnt = fieldNodes.getLength();
+
+        for (int i = 0; i < cnt; i++) {
+            PojoValueField field = new PojoValueField((Element)fieldNodes.item(i), getJavaClass());
+
+            PropertyDescriptor desc = findPropertyDescriptor(allPropDescriptors, field.getName());
+
+            if (desc == null) {
+                throw new IllegalArgumentException("Specified POJO field '" + field.getName() +
+                    "' doesn't exist in '" + getJavaClass().getName() + "' class");
+            }
+
+            list.add(field);
+        }
+
+        return list;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java
new file mode 100644
index 0000000..76d32fb
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains persistent settings configuration
+ */
+package org.apache.ignite.cache.store.cassandra.persistence;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java
new file mode 100644
index 0000000..44d2d47
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Serializer based on standard Java serialization.
+ */
+public class JavaSerializer implements Serializer {
+    /** */
+    private static final int DFLT_BUFFER_SIZE = 4096;
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer serialize(Object obj) {
+        if (obj == null)
+            return null;
+
+        ByteArrayOutputStream stream = null;
+        ObjectOutputStream out = null;
+
+        try {
+            stream = new ByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+            out = new ObjectOutputStream(stream);
+            out.writeObject(obj);
+            out.flush();
+
+            return ByteBuffer.wrap(stream.toByteArray());
+        }
+        catch (IOException e) {
+            throw new IllegalStateException("Failed to serialize object of the class '" + obj.getClass().getName() + "'", e);
+        }
+        finally {
+            U.closeQuiet(out);
+            U.closeQuiet(stream);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object deserialize(ByteBuffer buf) {
+        ByteArrayInputStream stream = null;
+        ObjectInputStream in = null;
+
+        try {
+            stream = new ByteArrayInputStream(buf.array());
+            in = new ObjectInputStream(stream);
+
+            return in.readObject();
+        }
+        catch (Throwable e) {
+            throw new IllegalStateException("Failed to deserialize object from byte stream", e);
+        }
+        finally {
+            U.closeQuiet(in);
+            U.closeQuiet(stream);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java
new file mode 100644
index 0000000..5b8d542
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.serializer;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Interface which should be implemented by all serializers responsible
+ * for writing/loading data to/from Cassandra in binary (BLOB) format.
+ */
+public interface Serializer extends Serializable {
+    /**
+     * Serializes object into byte buffer.
+     *
+     * @param obj Object to serialize.
+     * @return Byte buffer with binary data.
+     */
+    public ByteBuffer serialize(Object obj);
+
+    /**
+     * Deserializes object from byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @return Deserialized object.
+     */
+    public Object deserialize(ByteBuffer buf);
+}


Mime
View raw message