ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [8/9] ignite git commit: IGNITE-1371 Implemented Cassandra cache store.
Date Wed, 18 May 2016 09:50:05 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/b33eb027/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
new file mode 100644
index 0000000..1ecb28f
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.datasource;
+
+import com.datastax.driver.core.AuthProvider;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.policies.AddressTranslator;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
+import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
+
+/**
+ * Data source abstraction to specify configuration of the Cassandra session to be used.
+ */
+public class DataSource {
+    /** Number of rows to immediately fetch in CQL statement execution. */
+    private Integer fetchSize;
+
+    /** Consistency level for READ operations. */
+    private ConsistencyLevel readConsistency;
+
+    /** Consistency level for WRITE operations. */
+    private ConsistencyLevel writeConsistency;
+
+    /** Username to use for authentication. */
+    private String user;
+
+    /** Password to use for authentication. */
+    private String pwd;
+
+    /** Port to use for Cassandra connection. */
+    private Integer port;
+
+    /** List of contact points to connect to Cassandra cluster. */
+    private List<InetAddress> contactPoints;
+
+    /** List of contact points with ports to connect to Cassandra cluster. */
+    private List<InetSocketAddress> contactPointsWithPorts;
+
+    /** Maximum time to wait for schema agreement before returning from a DDL query. */
+    private Integer maxSchemaAgreementWaitSeconds;
+
+    /** The native protocol version to use. */
+    private Integer protoVer;
+
+    /** Compression to use for the transport. */
+    private String compression;
+
+    /** Use SSL for communications with Cassandra. */
+    private Boolean useSSL;
+
+    /** Enables metrics collection. */
+    private Boolean collectMetrix;
+
+    /** Enables JMX reporting of the metrics. */
+    private Boolean jmxReporting;
+
+    /** Credentials to use for authentication. */
+    private Credentials creds;
+
+    /** Load balancing policy to use. */
+    private LoadBalancingPolicy loadBalancingPlc;
+
+    /** Reconnection policy to use. */
+    private ReconnectionPolicy reconnectionPlc;
+
+    /** Retry policy to use. */
+    private RetryPolicy retryPlc;
+
+    /** Address translator to use. */
+    private AddressTranslator addrTranslator;
+
+    /** Speculative execution policy to use. */
+    private SpeculativeExecutionPolicy speculativeExecutionPlc;
+
+    /** Authentication provider to use. */
+    private AuthProvider authProvider;
+
+    /** SSL options to use. */
+    private SSLOptions sslOptions;
+
+    /** Connection pooling options to use. */
+    private PoolingOptions poolingOptions;
+
+    /** Socket options to use. */
+    private SocketOptions sockOptions;
+
+    /** Netty options to use for connection. */
+    private NettyOptions nettyOptions;
+
+    /** Cassandra session wrapper instance. */
+    private volatile CassandraSession ses;
+
+    /**
+     * Sets user name to use for authentication.
+     *
+     * @param user user name
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setUser(String user) {
+        this.user = user;
+
+        invalidate();
+    }
+
+    /**
+     * Sets password to use for authentication.
+     *
+     * @param pwd password
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setPassword(String pwd) {
+        this.pwd = pwd;
+
+        invalidate();
+    }
+
+    /**
+     * Sets port to use for Cassandra connection.
+     *
+     * @param port port
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setPort(int port) {
+        this.port = port;
+
+        invalidate();
+    }
+
+    /**
+     * Sets list of contact points to connect to Cassandra cluster.
+     *
+     * @param points contact points
+     */
+    public void setContactPoints(String... points) {
+        if (points == null || points.length == 0)
+            return;
+
+        for (String point : points) {
+            if (point.contains(":")) {
+                if (contactPointsWithPorts == null)
+                    contactPointsWithPorts = new LinkedList<>();
+
+                String[] chunks = point.split(":");
+
+                try {
+                    contactPointsWithPorts.add(InetSocketAddress.createUnresolved(chunks[0].trim(), Integer.parseInt(chunks[1].trim())));
+                }
+                catch (Throwable e) {
+                    throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
+                }
+            }
+            else {
+                if (contactPoints == null)
+                    contactPoints = new LinkedList<>();
+
+                try {
+                    contactPoints.add(InetAddress.getByName(point));
+                }
+                catch (Throwable e) {
+                    throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
+                }
+            }
+        }
+
+        invalidate();
+    }
+
+    /** Sets maximum time to wait for schema agreement before returning from a DDL query. */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setMaxSchemaAgreementWaitSeconds(int seconds) {
+        maxSchemaAgreementWaitSeconds = seconds;
+
+        invalidate();
+    }
+
+    /**
+     * Sets the native protocol version to use.
+     *
+     * @param ver version number
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setProtocolVersion(int ver) {
+        protoVer = ver;
+
+        invalidate();
+    }
+
+    /**
+     * Sets compression algorithm to use for the transport.
+     *
+     * @param compression Compression algorithm.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setCompression(String compression) {
+        this.compression = compression == null || compression.trim().isEmpty() ? null : compression.trim();
+
+        try {
+            if (this.compression != null)
+                ProtocolOptions.Compression.valueOf(this.compression);
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Incorrect compression '" + compression + "' specified for Cassandra connection", e);
+        }
+
+        invalidate();
+    }
+
+    /**
+     * Enables SSL for communications with Cassandra.
+     *
+     * @param use Flag to enable/disable SSL.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setUseSSL(boolean use) {
+        useSSL = use;
+
+        invalidate();
+    }
+
+    /**
+     * Enables metrics collection.
+     *
+     * @param collect Flag to enable/disable metrics collection.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setCollectMetrix(boolean collect) {
+        collectMetrix = collect;
+
+        invalidate();
+    }
+
+    /**
+     * Enables JMX reporting of the metrics.
+     *
+     * @param enableReporting Flag to enable/disable JMX reporting.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setJmxReporting(boolean enableReporting) {
+        jmxReporting = enableReporting;
+
+        invalidate();
+    }
+
+    /**
+     * Sets number of rows to immediately fetch in CQL statement execution.
+     *
+     * @param size Number of rows to fetch.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setFetchSize(int size) {
+        fetchSize = size;
+
+        invalidate();
+    }
+
+    /**
+     * Set consistency level for READ operations.
+     *
+     * @param level Consistency level.
+     */
+    public void setReadConsistency(String level) {
+        readConsistency = parseConsistencyLevel(level);
+
+        invalidate();
+    }
+
+    /**
+     * Set consistency level for WRITE operations.
+     *
+     * @param level Consistency level.
+     */
+    public void setWriteConsistency(String level) {
+        writeConsistency = parseConsistencyLevel(level);
+
+        invalidate();
+    }
+
+    /**
+     * Sets credentials to use for authentication.
+     *
+     * @param creds Credentials.
+     */
+    public void setCredentials(Credentials creds) {
+        this.creds = creds;
+
+        invalidate();
+    }
+
+    /**
+     * Sets load balancing policy.
+     *
+     * @param plc Load balancing policy.
+     */
+    public void setLoadBalancingPolicy(LoadBalancingPolicy plc) {
+        this.loadBalancingPlc = plc;
+
+        invalidate();
+    }
+
+    /**
+     * Sets reconnection policy.
+     *
+     * @param plc Reconnection policy.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setReconnectionPolicy(ReconnectionPolicy plc) {
+        this.reconnectionPlc = plc;
+
+        invalidate();
+    }
+
+    /**
+     * Sets retry policy.
+     *
+     * @param plc Retry policy.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setRetryPolicy(RetryPolicy plc) {
+        this.retryPlc = plc;
+
+        invalidate();
+    }
+
+    /**
+     * Sets address translator.
+     *
+     * @param translator Address translator.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setAddressTranslator(AddressTranslator translator) {
+        this.addrTranslator = translator;
+
+        invalidate();
+    }
+
+    /**
+     * Sets speculative execution policy.
+     *
+     * @param plc Speculative execution policy.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) {
+        this.speculativeExecutionPlc = plc;
+
+        invalidate();
+    }
+
+    /**
+     * Sets authentication provider.
+     *
+     * @param provider Authentication provider.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setAuthProvider(AuthProvider provider) {
+        this.authProvider = provider;
+
+        invalidate();
+    }
+
+    /**
+     * Sets SSL options.
+     *
+     * @param options SSL options.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setSslOptions(SSLOptions options) {
+        this.sslOptions = options;
+
+        invalidate();
+    }
+
+    /**
+     * Sets pooling options.
+     *
+     * @param options pooling options to use.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setPoolingOptions(PoolingOptions options) {
+        this.poolingOptions = options;
+
+        invalidate();
+    }
+
+    /**
+     * Sets socket options to use.
+     *
+     * @param options Socket options.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setSocketOptions(SocketOptions options) {
+        this.sockOptions = options;
+
+        invalidate();
+    }
+
+    /**
+     * Sets netty options to use.
+     *
+     * @param options netty options.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setNettyOptions(NettyOptions options) {
+        this.nettyOptions = options;
+
+        invalidate();
+    }
+
+    /**
+     * Creates Cassandra session wrapper if it wasn't created yet and returns it
+     *
+     * @param log logger
+     * @return Cassandra session wrapper
+     */
+    @SuppressWarnings("deprecation")
+    public synchronized CassandraSession session(IgniteLogger log) {
+        if (ses != null)
+            return ses;
+
+        Cluster.Builder builder = Cluster.builder();
+
+        if (user != null)
+            builder = builder.withCredentials(user, pwd);
+
+        if (port != null)
+            builder = builder.withPort(port);
+
+        if (contactPoints != null)
+            builder = builder.addContactPoints(contactPoints);
+
+        if (contactPointsWithPorts != null)
+            builder = builder.addContactPointsWithPorts(contactPointsWithPorts);
+
+        if (maxSchemaAgreementWaitSeconds != null)
+            builder = builder.withMaxSchemaAgreementWaitSeconds(maxSchemaAgreementWaitSeconds);
+
+        if (protoVer != null)
+            builder = builder.withProtocolVersion(ProtocolVersion.fromInt(protoVer));
+
+        if (compression != null) {
+            try {
+                builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compression.trim().toLowerCase()));
+            }
+            catch (IllegalArgumentException e) {
+                throw new IgniteException("Incorrect compression option '" + compression + "' specified for Cassandra connection", e);
+            }
+        }
+
+        if (useSSL != null && useSSL)
+            builder = builder.withSSL();
+
+        if (sslOptions != null)
+            builder = builder.withSSL(sslOptions);
+
+        if (collectMetrix != null && !collectMetrix)
+            builder = builder.withoutMetrics();
+
+        if (jmxReporting != null && !jmxReporting)
+            builder = builder.withoutJMXReporting();
+
+        if (creds != null)
+            builder = builder.withCredentials(creds.getUser(), creds.getPassword());
+
+        if (loadBalancingPlc != null)
+            builder = builder.withLoadBalancingPolicy(loadBalancingPlc);
+
+        if (reconnectionPlc != null)
+            builder = builder.withReconnectionPolicy(reconnectionPlc);
+
+        if (retryPlc != null)
+            builder = builder.withRetryPolicy(retryPlc);
+
+        if (addrTranslator != null)
+            builder = builder.withAddressTranslator(addrTranslator);
+
+        if (speculativeExecutionPlc != null)
+            builder = builder.withSpeculativeExecutionPolicy(speculativeExecutionPlc);
+
+        if (authProvider != null)
+            builder = builder.withAuthProvider(authProvider);
+
+        if (poolingOptions != null)
+            builder = builder.withPoolingOptions(poolingOptions);
+
+        if (sockOptions != null)
+            builder = builder.withSocketOptions(sockOptions);
+
+        if (nettyOptions != null)
+            builder = builder.withNettyOptions(nettyOptions);
+
+        return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log);
+    }
+
+    /**
+     * Parses consistency level provided as string.
+     *
+     * @param level consistency level string.
+     *
+     * @return consistency level.
+     */
+    private ConsistencyLevel parseConsistencyLevel(String level) {
+        if (level == null)
+            return null;
+
+        try {
+            return ConsistencyLevel.valueOf(level.trim().toUpperCase());
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Incorrect consistency level '" + level + "' specified for Cassandra connection", e);
+        }
+    }
+
+    /**
+     * Invalidates session.
+     */
+    private synchronized void invalidate() {
+        ses = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b33eb027/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
new file mode 100644
index 0000000..9d0710e
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.datasource;
+
+/**
+ * Simple implementation of {@link Credentials} which just uses its constructor to hold user/password values.
+ */
+public class PlainCredentials implements Credentials {
+    /** User name. */
+    private String user;
+
+    /** User password. */
+    private String pwd;
+
+    /**
+     * Creates credentials object.
+     *
+     * @param user User name.
+     * @param pwd User password.
+     */
+    public PlainCredentials(String user, String pwd) {
+        this.user = user;
+        this.pwd = pwd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getUser() {
+        return user;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getPassword() {
+        return pwd;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b33eb027/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
new file mode 100644
index 0000000..393dbe4
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * Stores persistence settings for Ignite cache key
+ */
+public class KeyPersistenceSettings extends PersistenceSettings {
+    /** Partition key XML tag. */
+    private static final String PARTITION_KEY_ELEMENT = "partitionKey";
+
+    /** Cluster key XML tag. */
+    private static final String CLUSTER_KEY_ELEMENT = "clusterKey";
+
+    /** POJO field XML tag. */
+    private static final String FIELD_ELEMENT = "field";
+
+    /** POJO fields. */
+    private List<PojoField> fields = new LinkedList<>();
+
+    /** Partition key fields. */
+    private List<PojoField> partKeyFields = new LinkedList<>();
+
+    /** Cluster key fields. */
+    private List<PojoField> clusterKeyFields = new LinkedList<>();
+
+    /**
+     * Creates key persistence settings object based on it's XML configuration.
+     *
+     * @param el XML element storing key persistence settings
+     */
+    public KeyPersistenceSettings(Element el) {
+        super(el);
+
+        if (!PersistenceStrategy.POJO.equals(getStrategy()))
+            return;
+
+        NodeList keyElem = el.getElementsByTagName(PARTITION_KEY_ELEMENT);
+
+        Element partKeysNode = keyElem != null ? (Element) keyElem.item(0) : null;
+
+        Element clusterKeysNode = el.getElementsByTagName(CLUSTER_KEY_ELEMENT) != null ?
+            (Element)el.getElementsByTagName(CLUSTER_KEY_ELEMENT).item(0) : null;
+
+        if (partKeysNode == null && clusterKeysNode != null) {
+            throw new IllegalArgumentException("It's not allowed to specify cluster key fields mapping, but " +
+                "doesn't specify partition key mappings");
+        }
+
+        partKeyFields = detectFields(partKeysNode, getPartitionKeyDescriptors());
+
+        if (partKeyFields == null || partKeyFields.isEmpty()) {
+            throw new IllegalStateException("Failed to initialize partition key fields for class '" +
+                getJavaClass().getName() + "'");
+        }
+
+        clusterKeyFields = detectFields(clusterKeysNode, getClusterKeyDescriptors(partKeyFields));
+
+        fields = new LinkedList<>();
+        fields.addAll(partKeyFields);
+        fields.addAll(clusterKeyFields);
+
+        checkDuplicates(fields);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<PojoField> getFields() {
+        return fields;
+    }
+
+    /**
+     * Returns Cassandra DDL for primary key.
+     *
+     * @return DDL statement.
+     */
+    public String getPrimaryKeyDDL() {
+        StringBuilder partKey = new StringBuilder();
+
+        List<String> cols = getPartitionKeyColumns();
+        for (String column : cols) {
+            if (partKey.length() != 0)
+                partKey.append(", ");
+
+            partKey.append(column);
+        }
+
+        StringBuilder clusterKey = new StringBuilder();
+
+        cols = getClusterKeyColumns();
+        if (cols != null) {
+            for (String column : cols) {
+                if (clusterKey.length() != 0)
+                    clusterKey.append(", ");
+
+                clusterKey.append(column);
+            }
+        }
+
+        return clusterKey.length() == 0 ?
+            "  primary key ((" + partKey.toString() + "))" :
+            "  primary key ((" + partKey.toString() + "), " + clusterKey.toString() + ")";
+    }
+
+    /**
+     * Returns Cassandra DDL for cluster key.
+     *
+     * @return Cluster key DDL.
+     */
+    public String getClusteringDDL() {
+        StringBuilder builder = new StringBuilder();
+
+        for (PojoField field : clusterKeyFields) {
+            PojoKeyField.SortOrder sortOrder = ((PojoKeyField)field).getSortOrder();
+
+            if (sortOrder == null)
+                continue;
+
+            if (builder.length() != 0)
+                builder.append(", ");
+
+            boolean asc = PojoKeyField.SortOrder.ASC.equals(sortOrder);
+
+            builder.append(field.getColumn()).append(" ").append(asc ? "asc" : "desc");
+        }
+
+        return builder.length() == 0 ? null : "clustering order by (" + builder.toString() + ")";
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String defaultColumnName() {
+        return "key";
+    }
+
+    /**
+     * Returns partition key columns of Cassandra table.
+     *
+     * @return List of column names.
+     */
+    private List<String> getPartitionKeyColumns() {
+        List<String> cols = new LinkedList<>();
+
+        if (PersistenceStrategy.BLOB.equals(getStrategy()) || PersistenceStrategy.PRIMITIVE.equals(getStrategy())) {
+            cols.add(getColumn());
+            return cols;
+        }
+
+        if (partKeyFields != null) {
+            for (PojoField field : partKeyFields)
+                cols.add(field.getColumn());
+        }
+
+        return cols;
+    }
+
+    /**
+     * Returns cluster key columns of Cassandra table.
+     *
+     * @return List of column names.
+     */
+    private List<String> getClusterKeyColumns() {
+        List<String> cols = new LinkedList<>();
+
+        if (clusterKeyFields != null) {
+            for (PojoField field : clusterKeyFields)
+                cols.add(field.getColumn());
+        }
+
+        return cols;
+    }
+
+    /**
+     * Extracts POJO fields specified in XML element.
+     *
+     * @param el XML element describing fields.
+     * @param descriptors POJO fields descriptors.
+     * @return List of {@code This} fields.
+     */
+    private List<PojoField> detectFields(Element el, List<PropertyDescriptor> descriptors) {
+        List<PojoField> list = new LinkedList<>();
+
+        if (el == null && (descriptors == null || descriptors.isEmpty()))
+            return list;
+
+        if (el == null) {
+            for (PropertyDescriptor descriptor : descriptors)
+                list.add(new PojoKeyField(descriptor));
+
+            return list;
+        }
+
+        NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT);
+
+        int cnt = nodes == null ? 0 : nodes.getLength();
+
+        if (cnt == 0) {
+            throw new IllegalArgumentException("Incorrect configuration of Cassandra key persistence settings, " +
+                "no cluster key fields specified inside '" + PARTITION_KEY_ELEMENT + "/" +
+                CLUSTER_KEY_ELEMENT + "' element");
+        }
+
+        for (int i = 0; i < cnt; i++) {
+            PojoKeyField field = new PojoKeyField((Element)nodes.item(i), getJavaClass());
+
+            PropertyDescriptor desc = findPropertyDescriptor(descriptors, field.getName());
+
+            if (desc == null) {
+                throw new IllegalArgumentException("Specified POJO field '" + field.getName() +
+                    "' doesn't exist in '" + getJavaClass().getName() + "' class");
+            }
+
+            list.add(field);
+        }
+
+        return list;
+    }
+
+    /**
+     * @return POJO field descriptors for partition key.
+     */
+    private List<PropertyDescriptor> getPartitionKeyDescriptors() {
+        List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(),
+            AffinityKeyMapped.class, true);
+
+        return primitivePropDescriptors != null && !primitivePropDescriptors.isEmpty() ?
+            primitivePropDescriptors :
+            PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
+    }
+
+    /**
+     * @return POJO field descriptors for cluster key.
+     */
+    private List<PropertyDescriptor> getClusterKeyDescriptors(List<PojoField> partKeyFields) {
+        List<PropertyDescriptor> primitivePropDescriptors =
+            PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
+
+        if (primitivePropDescriptors == null || primitivePropDescriptors.isEmpty() ||
+            partKeyFields.size() == primitivePropDescriptors.size())
+            return null;
+
+        for (PojoField field : partKeyFields) {
+            for (int i = 0; i < primitivePropDescriptors.size(); i++) {
+                if (primitivePropDescriptors.get(i).getName().equals(field.getName())) {
+                    primitivePropDescriptors.remove(i);
+                    break;
+                }
+            }
+        }
+
+        return primitivePropDescriptors;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b33eb027/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
new file mode 100644
index 0000000..2c43ed4
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.cassandra.common.SystemHelper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.springframework.core.io.Resource;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+/**
+ * Stores persistence settings for Ignite cache key and value
+ */
+public class KeyValuePersistenceSettings implements Serializable {
+    /**
+     * Default Cassandra keyspace options which should be used to create new keyspace.
+     * <ul>
+     * <li> <b>SimpleStrategy</b> for replication work well for single data center Cassandra cluster.<br/>
+     *      If your Cassandra cluster deployed across multiple data centers it's better to use <b>NetworkTopologyStrategy</b>.
+     * </li>
+     * <li> Three replicas will be created for each data block. </li>
+     * <li> Setting DURABLE_WRITES to true specifies that all data should be written to commit log. </li>
+     * </ul>
+     */
+    private static final String DFLT_KEYSPACE_OPTIONS = "replication = {'class' : 'SimpleStrategy', " +
+            "'replication_factor' : 3} and durable_writes = true";
+
+    /** Xml attribute specifying Cassandra keyspace to use. */
+    private static final String KEYSPACE_ATTR = "keyspace";
+
+    /** Xml attribute specifying Cassandra table to use. */
+    private static final String TABLE_ATTR = "table";
+
+    /** Xml attribute specifying ttl (time to leave) for rows inserted in Cassandra. */
+    private static final String TTL_ATTR = "ttl";
+
+    /** Root xml element containing persistence settings specification. */
+    private static final String PERSISTENCE_NODE = "persistence";
+
+    /** Xml element specifying Cassandra keyspace options. */
+    private static final String KEYSPACE_OPTIONS_NODE = "keyspaceOptions";
+
+    /** Xml element specifying Cassandra table options. */
+    private static final String TABLE_OPTIONS_NODE = "tableOptions";
+
+    /** Xml element specifying Ignite cache key persistence settings. */
+    private static final String KEY_PERSISTENCE_NODE = "keyPersistence";
+
+    /** Xml element specifying Ignite cache value persistence settings. */
+    private static final String VALUE_PERSISTENCE_NODE = "valuePersistence";
+
+    /** TTL (time to leave) for rows inserted into Cassandra table {@link <a href="https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_c.html">Expiring data</a>}. */
+    private Integer ttl;
+
+    /** Cassandra keyspace (analog of tablespace in relational databases). */
+    private String keyspace;
+
+    /** Cassandra table. */
+    private String tbl;
+
+    /** Cassandra table creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html">CREATE TABLE</a>}. */
+    private String tblOptions;
+
+    /** Cassandra keyspace creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html">CREATE KEYSPACE</a>}. */
+    private String keyspaceOptions = DFLT_KEYSPACE_OPTIONS;
+
+    /** Persistence settings for Ignite cache keys. */
+    private KeyPersistenceSettings keyPersistenceSettings;
+
+    /** Persistence settings for Ignite cache values. */
+    private ValuePersistenceSettings valPersistenceSettings;
+
+    /**
+     * Constructs Ignite cache key/value persistence settings.
+     *
+     * @param settings string containing xml with persistence settings for Ignite cache key/value
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public KeyValuePersistenceSettings(String settings) {
+        init(settings);
+    }
+
+    /**
+     * Constructs Ignite cache key/value persistence settings.
+     *
+     * @param settingsFile xml file with persistence settings for Ignite cache key/value
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public KeyValuePersistenceSettings(File settingsFile) {
+        InputStream in;
+
+        try {
+            in = new FileInputStream(settingsFile);
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to get input stream for Cassandra persistence settings file: " +
+                    settingsFile.getAbsolutePath(), e);
+        }
+
+        init(loadSettings(in));
+    }
+
+    /**
+     * Constructs Ignite cache key/value persistence settings.
+     *
+     * @param settingsRsrc resource containing xml with persistence settings for Ignite cache key/value
+     */
+    public KeyValuePersistenceSettings(Resource settingsRsrc) {
+        InputStream in;
+
+        try {
+            in = settingsRsrc.getInputStream();
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to get input stream for Cassandra persistence settings resource: " + settingsRsrc, e);
+        }
+
+        init(loadSettings(in));
+    }
+
+    /**
+     * Returns ttl to use for while inserting new rows into Cassandra table.
+     *
+     * @return ttl
+     */
+    public Integer getTTL() {
+        return ttl;
+    }
+
+    /**
+     * Returns Cassandra keyspace to use.
+     *
+     * @return keyspace.
+     */
+    public String getKeyspace() {
+        return keyspace;
+    }
+
+    /**
+     * Returns Cassandra table to use.
+     *
+     * @return table.
+     */
+    public String getTable() {
+        return tbl;
+    }
+
+    /**
+     * Returns full name of Cassandra table to use (including keyspace).
+     *
+     * @return full table name in format "keyspace.table".
+     */
+    public String getTableFullName()
+    {
+        return keyspace + "." + tbl;
+    }
+
+    /**
+     * Returns persistence settings for Ignite cache keys.
+     *
+     * @return keys persistence settings.
+     */
+    public KeyPersistenceSettings getKeyPersistenceSettings() {
+        return keyPersistenceSettings;
+    }
+
+    /**
+     * Returns persistence settings for Ignite cache values.
+     *
+     * @return values persistence settings.
+     */
+    public ValuePersistenceSettings getValuePersistenceSettings() {
+        return valPersistenceSettings;
+    }
+
+    /**
+     * Returns list of POJO fields to be mapped to Cassandra table columns.
+     *
+     * @return POJO fields list.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public List<PojoField> getFields() {
+        List<PojoField> fields = new LinkedList<>();
+
+        for (PojoField field : keyPersistenceSettings.getFields())
+            fields.add(field);
+
+        for (PojoField field : valPersistenceSettings.getFields())
+            fields.add(field);
+
+        return fields;
+    }
+
+    /**
+     * Returns list of Ignite cache key POJO fields to be mapped to Cassandra table columns.
+     *
+     * @return POJO fields list.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public List<PojoField> getKeyFields() {
+        return keyPersistenceSettings.getFields();
+    }
+
+    /**
+     * Returns list of Ignite cache value POJO fields to be mapped to Cassandra table columns.
+     *
+     * @return POJO fields list.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public List<PojoField> getValueFields() {
+        return valPersistenceSettings.getFields();
+    }
+
+    /**
+     * Returns DDL statement to create Cassandra keyspace.
+     *
+     * @return Keyspace DDL statement.
+     */
+    public String getKeyspaceDDLStatement() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("create keyspace if not exists ").append(keyspace);
+
+        if (keyspaceOptions != null) {
+            if (!keyspaceOptions.trim().toLowerCase().startsWith("with"))
+                builder.append("\nwith");
+
+            builder.append(" ").append(keyspaceOptions);
+        }
+
+        String statement = builder.toString().trim().replaceAll(" +", " ");
+
+        return statement.endsWith(";") ? statement : statement + ";";
+    }
+
+    /**
+     * Returns DDL statement to create Cassandra table.
+     *
+     * @return Table DDL statement.
+     */
+    public String getTableDDLStatement() {
+        String colsDDL = keyPersistenceSettings.getTableColumnsDDL() + ",\n" + valPersistenceSettings.getTableColumnsDDL();
+
+        String primaryKeyDDL = keyPersistenceSettings.getPrimaryKeyDDL();
+
+        String clusteringDDL = keyPersistenceSettings.getClusteringDDL();
+
+        String optionsDDL = tblOptions != null && !tblOptions.trim().isEmpty() ? tblOptions.trim() : "";
+
+        if (clusteringDDL != null && !clusteringDDL.isEmpty())
+            optionsDDL = optionsDDL.isEmpty() ? clusteringDDL : optionsDDL + " and " + clusteringDDL;
+
+        if (!optionsDDL.trim().isEmpty())
+            optionsDDL = optionsDDL.trim().toLowerCase().startsWith("with") ? optionsDDL.trim() : "with " + optionsDDL.trim();
+
+        StringBuilder builder = new StringBuilder();
+
+        builder.append("create table if not exists ").append(keyspace).append(".").append(tbl);
+        builder.append("\n(\n").append(colsDDL).append(",\n").append(primaryKeyDDL).append("\n)");
+
+        if (!optionsDDL.isEmpty())
+            builder.append(" \n").append(optionsDDL);
+
+        String tblDDL = builder.toString().trim().replaceAll(" +", " ");
+
+        return tblDDL.endsWith(";") ? tblDDL : tblDDL + ";";
+    }
+
+    /**
+     * Returns DDL statements to create Cassandra table secondary indexes.
+     *
+     * @return DDL statements to create secondary indexes.
+     */
+    public List<String> getIndexDDLStatements() {
+        List<String> idxDDLs = new LinkedList<>();
+
+        List<PojoField> fields = valPersistenceSettings.getFields();
+
+        for (PojoField field : fields) {
+            if (((PojoValueField)field).isIndexed())
+                idxDDLs.add(((PojoValueField)field).getIndexDDL(keyspace, tbl));
+        }
+
+        return idxDDLs;
+    }
+
+    /**
+     * Loads Ignite cache persistence settings from resource.
+     *
+     * @param in Input stream.
+     * @return String containing xml with Ignite cache persistence settings.
+     */
+    private String loadSettings(InputStream in) {
+        StringBuilder settings = new StringBuilder();
+        BufferedReader reader = null;
+
+        try {
+            reader = new BufferedReader(new InputStreamReader(in));
+
+            String line = reader.readLine();
+
+            while (line != null) {
+                if (settings.length() != 0)
+                    settings.append(SystemHelper.LINE_SEPARATOR);
+
+                settings.append(line);
+
+                line = reader.readLine();
+            }
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to read input stream for Cassandra persistence settings", e);
+        }
+        finally {
+            U.closeQuiet(reader);
+            U.closeQuiet(in);
+        }
+
+        return settings.toString();
+    }
+
+    /**
+     * @param elem Element with data.
+     * @param attr Attribute name.
+     * @return Numeric value for specified attribute.
+     */
+    private int extractIntAttribute(Element elem, String attr) {
+        String val = elem.getAttribute(attr).trim();
+
+        try {
+            return Integer.parseInt(val);
+        }
+        catch (NumberFormatException e) {
+            throw new IllegalArgumentException("Incorrect value '" + val + "' specified for '" + attr + "' attribute");
+        }
+    }
+
+    /**
+     * Initializes persistence settings from XML string.
+     *
+     * @param settings XML string containing Ignite cache persistence settings configuration.
+     */
+    @SuppressWarnings("IfCanBeSwitch")
+    private void init(String settings) {
+        Document doc;
+
+        try {
+            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+            DocumentBuilder builder = factory.newDocumentBuilder();
+            doc = builder.parse(new InputSource(new StringReader(settings)));
+        }
+        catch (Throwable e) {
+            throw new IllegalArgumentException("Failed to parse persistence settings:" +
+                SystemHelper.LINE_SEPARATOR + settings, e);
+        }
+
+        Element root = doc.getDocumentElement();
+
+        if (!PERSISTENCE_NODE.equals(root.getNodeName())) {
+            throw new IllegalArgumentException("Incorrect persistence settings specified. " +
+                "Root XML element should be 'persistence'");
+        }
+
+        if (!root.hasAttribute(KEYSPACE_ATTR)) {
+            throw new IllegalArgumentException("Incorrect persistence settings '" + KEYSPACE_ATTR +
+                "' attribute should be specified");
+        }
+
+        if (!root.hasAttribute(TABLE_ATTR)) {
+            throw new IllegalArgumentException("Incorrect persistence settings '" + TABLE_ATTR +
+                "' attribute should be specified");
+        }
+
+        keyspace = root.getAttribute(KEYSPACE_ATTR).trim();
+        tbl = root.getAttribute(TABLE_ATTR).trim();
+
+        if (root.hasAttribute(TTL_ATTR))
+            ttl = extractIntAttribute(root, TTL_ATTR);
+
+        if (!root.hasChildNodes()) {
+            throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                "there are no key and value persistence settings specified");
+        }
+
+        NodeList children = root.getChildNodes();
+        int cnt = children.getLength();
+
+        for (int i = 0; i < cnt; i++) {
+            Node node = children.item(i);
+
+            if (node.getNodeType() != Node.ELEMENT_NODE)
+                continue;
+
+            Element el = (Element)node;
+            String nodeName = el.getNodeName();
+
+            if (nodeName.equals(TABLE_OPTIONS_NODE)) {
+                tblOptions = el.getTextContent();
+                tblOptions = tblOptions.replace("\n", " ").replace("\r", "").replace("\t", " ");
+            }
+            else if (nodeName.equals(KEYSPACE_OPTIONS_NODE)) {
+                keyspaceOptions = el.getTextContent();
+                keyspaceOptions = keyspaceOptions.replace("\n", " ").replace("\r", "").replace("\t", " ");
+            }
+            else if (nodeName.equals(KEY_PERSISTENCE_NODE))
+                keyPersistenceSettings = new KeyPersistenceSettings(el);
+            else if (nodeName.equals(VALUE_PERSISTENCE_NODE))
+                valPersistenceSettings = new ValuePersistenceSettings(el);
+        }
+
+        if (keyPersistenceSettings == null) {
+            throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                "there are no key persistence settings specified");
+        }
+
+        if (valPersistenceSettings == null) {
+            throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                "there are no value persistence settings specified");
+        }
+
+        List<PojoField> keyFields = keyPersistenceSettings.getFields();
+        List<PojoField> valFields = valPersistenceSettings.getFields();
+
+        if (PersistenceStrategy.POJO.equals(keyPersistenceSettings.getStrategy()) &&
+            (keyFields == null || keyFields.isEmpty())) {
+            throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                "there are no key fields found");
+        }
+
+        if (PersistenceStrategy.POJO.equals(valPersistenceSettings.getStrategy()) &&
+            (valFields == null || valFields.isEmpty())) {
+            throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                "there are no value fields found");
+        }
+
+        if (keyFields == null || keyFields.isEmpty() || valFields == null || valFields.isEmpty())
+            return;
+
+        for (PojoField keyField : keyFields) {
+            for (PojoField valField : valFields) {
+                if (keyField.getColumn().equals(valField.getColumn())) {
+                    throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+                        "key column '" + keyField.getColumn() + "' also specified as a value column");
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b33eb027/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
new file mode 100644
index 0000000..e734ca3
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
+
+/**
+ * Intermediate layer between persistent store (Cassandra) and Ignite cache key/value classes.
+ * Handles  all the mappings to/from Java classes into Cassandra and responsible for all the details
+ * of how Java objects should be written/loaded to/from Cassandra.
+ */
+public class PersistenceController {
+    /** Ignite cache key/value persistence settings. */
+    private KeyValuePersistenceSettings persistenceSettings;
+
+    /** CQL statement to insert row into Cassandra table. */
+    private String writeStatement;
+
+    /** CQL statement to delete row from Cassandra table. */
+    private String delStatement;
+
+    /** CQL statement to select value fields from Cassandra table. */
+    private String loadStatement;
+
+    /** CQL statement to select key/value fields from Cassandra table. */
+    private String loadStatementWithKeyFields;
+
+    /**
+     * Constructs persistence controller from Ignite cache persistence settings.
+     *
+     * @param settings persistence settings.
+     */
+    public PersistenceController(KeyValuePersistenceSettings settings) {
+        if (settings == null)
+            throw new IllegalArgumentException("Persistent settings can't be null");
+
+        this.persistenceSettings = settings;
+    }
+
+    /**
+     * Returns Ignite cache persistence settings.
+     *
+     * @return persistence settings.
+     */
+    public KeyValuePersistenceSettings getPersistenceSettings() {
+        return persistenceSettings;
+    }
+
+    /**
+     * Returns Cassandra keyspace to use.
+     *
+     * @return keyspace.
+     */
+    public String getKeyspace() {
+        return persistenceSettings.getKeyspace();
+    }
+
+    /**
+     * Returns Cassandra table to use.
+     *
+     * @return table.
+     */
+    public String getTable() {
+        return persistenceSettings.getTable();
+    }
+
+    /**
+     * Returns CQL statement to insert row into Cassandra table.
+     *
+     * @return CQL statement.
+     */
+    public String getWriteStatement() {
+        if (writeStatement != null)
+            return writeStatement;
+
+        List<String> cols = getKeyValueColumns();
+
+        StringBuilder colsList = new StringBuilder();
+        StringBuilder questionsList = new StringBuilder();
+
+        for (String column : cols) {
+            if (colsList.length() != 0) {
+                colsList.append(", ");
+                questionsList.append(",");
+            }
+
+            colsList.append(column);
+            questionsList.append("?");
+        }
+
+        writeStatement = "insert into " + persistenceSettings.getKeyspace() + "." + persistenceSettings.getTable() + " (" +
+            colsList.toString() + ") values (" + questionsList.toString() + ")";
+
+        if (persistenceSettings.getTTL() != null)
+            writeStatement += " using ttl " + persistenceSettings.getTTL();
+
+        writeStatement += ";";
+
+        return writeStatement;
+    }
+
+    /**
+     * Returns CQL statement to delete row from Cassandra table.
+     *
+     * @return CQL statement.
+     */
+    public String getDeleteStatement() {
+        if (delStatement != null)
+            return delStatement;
+
+        List<String> cols = getKeyColumns();
+
+        StringBuilder statement = new StringBuilder();
+
+        for (String column : cols) {
+            if (statement.length() != 0)
+                statement.append(" and ");
+
+            statement.append(column).append("=?");
+        }
+
+        statement.append(";");
+
+        delStatement = "delete from " +
+            persistenceSettings.getKeyspace() + "." +
+            persistenceSettings.getTable() + " where " +
+            statement.toString();
+
+        return delStatement;
+    }
+
+    /**
+     * Returns CQL statement to select key/value fields from Cassandra table.
+     *
+     * @param includeKeyFields whether to include/exclude key fields from the returned row.
+     *
+     * @return CQL statement.
+     */
+    public String getLoadStatement(boolean includeKeyFields) {
+        if (loadStatement != null && loadStatementWithKeyFields != null)
+            return includeKeyFields ? loadStatementWithKeyFields : loadStatement;
+
+        List<String> valCols = getValueColumns();
+
+        List<String> keyCols = getKeyColumns();
+
+        StringBuilder hdrWithKeyFields = new StringBuilder("select ");
+
+        for (int i = 0; i < keyCols.size(); i++) {
+            if (i > 0)
+                hdrWithKeyFields.append(", ");
+
+            hdrWithKeyFields.append(keyCols.get(i));
+        }
+
+        StringBuilder hdr = new StringBuilder("select ");
+
+        for (int i = 0; i < valCols.size(); i++) {
+            if (i > 0)
+                hdr.append(", ");
+
+            hdrWithKeyFields.append(",");
+
+            hdr.append(valCols.get(i));
+            hdrWithKeyFields.append(valCols.get(i));
+        }
+
+        StringBuilder statement = new StringBuilder();
+
+        statement.append(" from ");
+        statement.append(persistenceSettings.getKeyspace());
+        statement.append(".").append(persistenceSettings.getTable());
+        statement.append(" where ");
+
+        for (int i = 0; i < keyCols.size(); i++) {
+            if (i > 0)
+                statement.append(" and ");
+
+            statement.append(keyCols.get(i)).append("=?");
+        }
+
+        statement.append(";");
+
+        loadStatement = hdr.toString() + statement.toString();
+        loadStatementWithKeyFields = hdrWithKeyFields.toString() + statement.toString();
+
+        return includeKeyFields ? loadStatementWithKeyFields : loadStatement;
+    }
+
+    /**
+     * Binds Ignite cache key object to {@link com.datastax.driver.core.PreparedStatement}.
+     *
+     * @param statement statement to which key object should be bind.
+     * @param key key object.
+     *
+     * @return statement with bounded key.
+     */
+    public BoundStatement bindKey(PreparedStatement statement, Object key) {
+        KeyPersistenceSettings settings = persistenceSettings.getKeyPersistenceSettings();
+
+        Object[] values = getBindingValues(settings.getStrategy(),
+            settings.getSerializer(), settings.getFields(), key);
+
+        return statement.bind(values);
+    }
+
+    /**
+     * Binds Ignite cache key and value object to {@link com.datastax.driver.core.PreparedStatement}.
+     *
+     * @param statement statement to which key and value object should be bind.
+     * @param key key object.
+     * @param val value object.
+     *
+     * @return statement with bounded key and value.
+     */
+    public BoundStatement bindKeyValue(PreparedStatement statement, Object key, Object val) {
+        KeyPersistenceSettings keySettings = persistenceSettings.getKeyPersistenceSettings();
+        Object[] keyValues = getBindingValues(keySettings.getStrategy(),
+            keySettings.getSerializer(), keySettings.getFields(), key);
+
+        ValuePersistenceSettings valSettings = persistenceSettings.getValuePersistenceSettings();
+        Object[] valValues = getBindingValues(valSettings.getStrategy(),
+            valSettings.getSerializer(), valSettings.getFields(), val);
+
+        Object[] values = new Object[keyValues.length + valValues.length];
+
+        int i = 0;
+
+        for (Object keyVal : keyValues) {
+            values[i] = keyVal;
+            i++;
+        }
+
+        for (Object valVal : valValues) {
+            values[i] = valVal;
+            i++;
+        }
+
+        return statement.bind(values);
+    }
+
+    /**
+     * Builds Ignite cache key object from returned Cassandra table row.
+     *
+     * @param row Cassandra table row.
+     *
+     * @return key object.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public Object buildKeyObject(Row row) {
+        return buildObject(row, persistenceSettings.getKeyPersistenceSettings());
+    }
+
+    /**
+     * Builds Ignite cache value object from Cassandra table row .
+     *
+     * @param row Cassandra table row.
+     *
+     * @return value object.
+     */
+    public Object buildValueObject(Row row) {
+        return buildObject(row, persistenceSettings.getValuePersistenceSettings());
+    }
+
+    /**
+     * Builds object from Cassandra table row.
+     *
+     * @param row Cassandra table row.
+     * @param settings persistence settings to use.
+     *
+     * @return object.
+     */
+    private Object buildObject(Row row, PersistenceSettings settings) {
+        if (row == null)
+            return null;
+
+        PersistenceStrategy stgy = settings.getStrategy();
+
+        Class clazz = settings.getJavaClass();
+
+        String col = settings.getColumn();
+
+        List<PojoField> fields = settings.getFields();
+
+        if (PersistenceStrategy.PRIMITIVE.equals(stgy))
+            return PropertyMappingHelper.getCassandraColumnValue(row, col, clazz, null);
+
+        if (PersistenceStrategy.BLOB.equals(stgy))
+            return settings.getSerializer().deserialize(row.getBytes(col));
+
+        Object obj;
+
+        try {
+            obj = clazz.newInstance();
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to instantiate object of type '" + clazz.getName() + "' using reflection", e);
+        }
+
+        for (PojoField field : fields)
+            field.setValueFromRow(row, obj, settings.getSerializer());
+
+        return obj;
+    }
+
+    /**
+     * Extracts field values from POJO object and converts them into Java types
+     * which could be mapped to Cassandra types.
+     *
+     * @param stgy persistence strategy to use.
+     * @param serializer serializer to use for BLOBs.
+     * @param fields fields who's values should be extracted.
+     * @param obj object instance who's field values should be extracted.
+     *
+     * @return array of object field values converted into Java object instances having Cassandra compatible types
+     */
+    private Object[] getBindingValues(PersistenceStrategy stgy, Serializer serializer, List<PojoField> fields, Object obj) {
+        if (PersistenceStrategy.PRIMITIVE.equals(stgy)) {
+            if (PropertyMappingHelper.getCassandraType(obj.getClass()) == null ||
+                obj.getClass().equals(ByteBuffer.class) || obj instanceof byte[]) {
+                throw new IllegalArgumentException("Couldn't deserialize instance of class '" +
+                    obj.getClass().getName() + "' using PRIMITIVE strategy. Please use BLOB strategy for this case.");
+            }
+
+            return new Object[] {obj};
+        }
+
+        if (PersistenceStrategy.BLOB.equals(stgy))
+            return new Object[] {serializer.serialize(obj)};
+
+        Object[] values = new Object[fields.size()];
+
+        int i = 0;
+
+        for (PojoField field : fields) {
+            Object val = field.getValueFromObject(obj, serializer);
+
+            if (val instanceof byte[])
+                val = ByteBuffer.wrap((byte[]) val);
+
+            values[i] = val;
+
+            i++;
+        }
+
+        return values;
+    }
+
+    /**
+     * Returns list of Cassandra table columns mapped to Ignite cache key and value fields
+     *
+     * @return list of column names
+     */
+    private List<String> getKeyValueColumns() {
+        List<String> cols = getKeyColumns();
+
+        cols.addAll(getValueColumns());
+
+        return cols;
+    }
+
+    /**
+     * Returns list of Cassandra table columns mapped to Ignite cache key fields
+     *
+     * @return list of column names
+     */
+    private List<String> getKeyColumns() {
+        return getColumns(persistenceSettings.getKeyPersistenceSettings());
+    }
+
+    /**
+     * Returns list of Cassandra table columns mapped to Ignite cache value fields
+     *
+     * @return list of column names
+     */
+    private List<String> getValueColumns() {
+        return getColumns(persistenceSettings.getValuePersistenceSettings());
+    }
+
+    /**
+     * Returns list of Cassandra table columns based on persistence strategy to use
+     *
+     * @return list of column names
+     */
+    private List<String> getColumns(PersistenceSettings settings) {
+        List<String> cols = new LinkedList<>();
+
+        if (!PersistenceStrategy.POJO.equals(settings.getStrategy())) {
+            cols.add(settings.getColumn());
+            return cols;
+        }
+
+        for (PojoField field : settings.getFields())
+            cols.add(field.getColumn());
+
+        return cols;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b33eb027/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java
new file mode 100644
index 0000000..20d790a
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import com.datastax.driver.core.DataType;
+import java.beans.PropertyDescriptor;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer;
+import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
+import org.w3c.dom.Element;
+
+/**
+ * Stores persistence settings, which describes how particular key/value
+ * from Ignite cache should be stored in Cassandra.
+ */
+public abstract class PersistenceSettings implements Serializable {
+    /** Xml attribute specifying persistence strategy. */
+    private static final String STRATEGY_ATTR = "strategy";
+
+    /** Xml attribute specifying Cassandra column name. */
+    private static final String COLUMN_ATTR = "column";
+
+    /** Xml attribute specifying BLOB serializer to use. */
+    private static final String SERIALIZER_ATTR = "serializer";
+
+    /** Xml attribute specifying java class of the object to be persisted. */
+    private static final String CLASS_ATTR = "class";
+
+    /** Persistence strategy to use. */
+    private PersistenceStrategy stgy;
+
+    /** Java class of the object to be persisted. */
+    private Class javaCls;
+
+    /** Cassandra table column name where object should be persisted in
+     *  case of using BLOB or PRIMITIVE persistence strategy. */
+    private String col;
+
+    /** Serializer for BLOBs. */
+    private Serializer serializer = new JavaSerializer();
+
+    /**
+     * Extracts property descriptor from the descriptors list by its name.
+     *
+     * @param descriptors descriptors list.
+     * @param propName property name.
+     *
+     * @return property descriptor.
+     */
+    public static PropertyDescriptor findPropertyDescriptor(List<PropertyDescriptor> descriptors, String propName) {
+        if (descriptors == null || descriptors.isEmpty() || propName == null || propName.trim().isEmpty())
+            return null;
+
+        for (PropertyDescriptor descriptor : descriptors) {
+            if (descriptor.getName().equals(propName))
+                return descriptor;
+        }
+
+        return null;
+    }
+
+    /**
+     * Constructs persistence settings from corresponding XML element.
+     *
+     * @param el xml element containing persistence settings configuration.
+     */
+    @SuppressWarnings("unchecked")
+    public PersistenceSettings(Element el) {
+        if (el == null)
+            throw new IllegalArgumentException("DOM element representing key/value persistence object can't be null");
+
+        if (!el.hasAttribute(STRATEGY_ATTR)) {
+            throw new IllegalArgumentException("DOM element representing key/value persistence object should have '" +
+                STRATEGY_ATTR + "' attribute");
+        }
+
+        try {
+            stgy = PersistenceStrategy.valueOf(el.getAttribute(STRATEGY_ATTR).trim().toUpperCase());
+        }
+        catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Incorrect persistence strategy specified: " + el.getAttribute(STRATEGY_ATTR));
+        }
+
+        if (!el.hasAttribute(CLASS_ATTR) && !PersistenceStrategy.BLOB.equals(stgy)) {
+            throw new IllegalArgumentException("DOM element representing key/value persistence object should have '" +
+                CLASS_ATTR + "' attribute or have BLOB persistence strategy");
+        }
+
+        try {
+            javaCls = el.hasAttribute(CLASS_ATTR) ? getClassInstance(el.getAttribute(CLASS_ATTR).trim()) : null;
+        }
+        catch (Throwable e) {
+            throw new IllegalArgumentException("Incorrect java class specified '" + el.getAttribute(CLASS_ATTR) + "' " +
+                "for Cassandra persistence", e);
+        }
+
+        if (!PersistenceStrategy.BLOB.equals(stgy) &&
+            (ByteBuffer.class.equals(javaCls) || byte[].class.equals(javaCls))) {
+            throw new IllegalArgumentException("Java class '" + el.getAttribute(CLASS_ATTR) + "' " +
+                "specified could only be persisted using BLOB persistence strategy");
+        }
+
+        if (PersistenceStrategy.PRIMITIVE.equals(stgy) &&
+            PropertyMappingHelper.getCassandraType(javaCls) == null) {
+            throw new IllegalArgumentException("Current implementation doesn't support persisting '" +
+                javaCls.getName() + "' object using PRIMITIVE strategy");
+        }
+
+        if (PersistenceStrategy.POJO.equals(stgy)) {
+            if (javaCls == null)
+                throw new IllegalStateException("Object java class should be specified for POJO persistence strategy");
+
+            try {
+                javaCls.getConstructor();
+            }
+            catch (Throwable e) {
+                throw new IllegalArgumentException("Java class '" + javaCls.getName() + "' couldn't be used as POJO " +
+                    "cause it doesn't have no arguments constructor", e);
+            }
+        }
+
+        if (el.hasAttribute(COLUMN_ATTR)) {
+            if (!PersistenceStrategy.BLOB.equals(stgy) && !PersistenceStrategy.PRIMITIVE.equals(stgy)) {
+                throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " +
+                    "'" + COLUMN_ATTR + "' attribute is only applicable for PRIMITIVE or BLOB strategy");
+            }
+
+            col = el.getAttribute(COLUMN_ATTR).trim();
+        }
+
+        if (el.hasAttribute(SERIALIZER_ATTR)) {
+            if (!PersistenceStrategy.BLOB.equals(stgy) && !PersistenceStrategy.POJO.equals(stgy)) {
+                throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " +
+                    "'" + SERIALIZER_ATTR + "' attribute is only applicable for BLOB and POJO strategies");
+            }
+
+            Object obj = newObjectInstance(el.getAttribute(SERIALIZER_ATTR).trim());
+
+            if (!(obj instanceof Serializer)) {
+                throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " +
+                    "serializer class '" + el.getAttribute(SERIALIZER_ATTR) + "' doesn't implement '" +
+                    Serializer.class.getName() + "' interface");
+            }
+
+            serializer = (Serializer)obj;
+        }
+
+        if ((PersistenceStrategy.BLOB.equals(stgy) || PersistenceStrategy.PRIMITIVE.equals(stgy)) && col == null)
+            col = defaultColumnName();
+    }
+
+    /**
+     * Returns java class of the object to be persisted.
+     *
+     * @return java class.
+     */
+    public Class getJavaClass() {
+        return javaCls;
+    }
+
+    /**
+     * Returns persistence strategy to use.
+     *
+     * @return persistence strategy.
+     */
+    public PersistenceStrategy getStrategy() {
+        return stgy;
+    }
+
+    /**
+     * Returns Cassandra table column name where object should be persisted in
+     * case of using BLOB or PRIMITIVE persistence strategy.
+     *
+     * @return column name.
+     */
+    public String getColumn() {
+        return col;
+    }
+
+    /**
+     * Returns serializer to be used for BLOBs.
+     *
+     * @return serializer.
+     */
+    public Serializer getSerializer() {
+        return serializer;
+    }
+
+    /**
+     * Returns list of POJO fields to be persisted.
+     *
+     * @return list of fields.
+     */
+    public abstract List<PojoField> getFields();
+
+    /**
+     * Returns Cassandra table columns DDL, corresponding to POJO fields which should be persisted.
+     *
+     * @return DDL statement for Cassandra table fields
+     */
+    public String getTableColumnsDDL() {
+        if (PersistenceStrategy.BLOB.equals(stgy))
+            return "  " + col + " " + DataType.Name.BLOB.toString();
+
+        if (PersistenceStrategy.PRIMITIVE.equals(stgy))
+            return "  " + col + " " + PropertyMappingHelper.getCassandraType(javaCls);
+
+        StringBuilder builder = new StringBuilder();
+
+        for (PojoField field : getFields()) {
+            if (builder.length() > 0)
+                builder.append(",\n");
+
+            builder.append("  ").append(field.getColumnDDL());
+        }
+
+        if (builder.length() == 0) {
+            throw new IllegalStateException("There are no POJO fields found for '" + javaCls.toString()
+                + "' class to be presented as a Cassandra primary key");
+        }
+
+        return builder.toString();
+    }
+
+    /**
+     * Returns default name for Cassandra column (if it's not specified explicitly).
+     *
+     * @return column name
+     */
+    protected abstract String defaultColumnName();
+
+    /**
+     * Checks if there are POJO filed with the same name or same Cassandra column specified in persistence settings
+     *
+     * @param fields list of fields to be persisted into Cassandra
+     */
+    protected void checkDuplicates(List<PojoField> fields) {
+        if (fields == null || fields.isEmpty())
+            return;
+
+        for (PojoField field1 : fields) {
+            boolean sameNames = false;
+            boolean sameCols = false;
+
+            for (PojoField field2 : fields) {
+                if (field1.getName().equals(field2.getName())) {
+                    if (sameNames) {
+                        throw new IllegalArgumentException("Incorrect Cassandra key persistence settings, " +
+                            "two POJO fields with the same name '" + field1.getName() + "' specified");
+                    }
+
+                    sameNames = true;
+                }
+
+                if (field1.getColumn().equals(field2.getColumn())) {
+                    if (sameCols) {
+                        throw new IllegalArgumentException("Incorrect Cassandra persistence settings, " +
+                            "two POJO fields with the same column '" + field1.getColumn() + "' specified");
+                    }
+
+                    sameCols = true;
+                }
+            }
+        }
+    }
+
+    /**
+     * Instantiates Class object for particular class
+     *
+     * @param clazz class name
+     * @return Class object
+     */
+    private Class getClassInstance(String clazz) {
+        try {
+            return Class.forName(clazz);
+        }
+        catch (ClassNotFoundException ignored) {
+        }
+
+        try {
+            return Class.forName(clazz, true, Thread.currentThread().getContextClassLoader());
+        }
+        catch (ClassNotFoundException ignored) {
+        }
+
+        try {
+            return Class.forName(clazz, true, PersistenceSettings.class.getClassLoader());
+        }
+        catch (ClassNotFoundException ignored) {
+        }
+
+        try {
+            return Class.forName(clazz, true, ClassLoader.getSystemClassLoader());
+        }
+        catch (ClassNotFoundException ignored) {
+        }
+
+        throw new IgniteException("Failed to load class '" + clazz + "' using reflection");
+    }
+
+    /**
+     * Creates new object instance of particular class
+     *
+     * @param clazz class name
+     * @return object
+     */
+    private Object newObjectInstance(String clazz) {
+        try {
+            return getClassInstance(clazz).newInstance();
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to instantiate class '" + clazz + "' using default constructor", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b33eb027/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java
new file mode 100644
index 0000000..4b1e2d8
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+/**
+ * Describes persistence strategy to be used to persist object data into Cassandra.
+ */
+public enum PersistenceStrategy {
+    /**
+     * Stores object value as is, by mapping its value to Cassandra table column with corresponding type.
+     * <p>
+     * Could be used for primitive java type (like Integer, String, Long and etc) which could be directly mapped
+     * to appropriate Cassandra types.
+     */
+    PRIMITIVE,
+
+    /**
+     * Stores object value as BLOB, by mapping its value to Cassandra table column with blob type.
+     * Could be used for any java type. Conversion of java object to BLOB is handled by specified serializer.
+     * <p>
+     * Available serializer implementations:
+     * <ul>
+     *     <li>
+     *         org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer - uses standard Java
+     *         serialization framework.
+     *     </li>
+     *     <li>
+     *        org.apache.ignite.cache.store.cassandra.serializer.KryoSerializer - uses Kryo serialization
+     *        framework.
+     *     </li>
+     * </ul>
+     */
+    BLOB,
+
+    /**
+     * Stores each field of an object as a column having corresponding type in Cassandra table.
+     * Provides ability to utilize Cassandra secondary indexes for object fields.
+     * <p>
+     * Could be used for objects which follow JavaBeans convention and having empty public constructor.
+     * Object fields should be:
+     * <ul>
+     *     <li>Primitive java types like int, long, String and etc.</li>
+     *     <li>Collections of primitive java types like List<Integer>, Map<Integer, String>, Set<Long></li>
+     * </ul>
+     */
+    POJO
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b33eb027/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
new file mode 100644
index 0000000..af569fd
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Row;
+import java.beans.PropertyDescriptor;
+import java.io.Serializable;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
+import org.w3c.dom.Element;
+
+/**
+ * Descriptor for particular field in a POJO object, specifying how this field
+ * should be written to or loaded from Cassandra.
+ */
+public abstract class PojoField implements Serializable {
+    /** Name attribute of XML element describing Pojo field. */
+    private static final String NAME_ATTR = "name";
+
+    /** Column attribute of XML element describing Pojo field. */
+    private static final String COLUMN_ATTR = "column";
+
+    /** Field name. */
+    private String name;
+
+    /** Java class to which the field belongs. */
+    private Class javaCls;
+
+    /** Field column name in Cassandra table. */
+    private String col;
+
+    /** Field column DDL.  */
+    private String colDDL;
+
+    /** Field property descriptor. */
+    private transient PropertyDescriptor desc;
+
+    /**
+     * Creates instance of {@link PojoField} based on it's description in XML element.
+     *
+     * @param el XML element describing Pojo field
+     * @param pojoCls Pojo java class.
+     */
+    public PojoField(Element el, Class<?> pojoCls) {
+        if (el == null)
+            throw new IllegalArgumentException("DOM element representing POJO field object can't be null");
+
+        if (!el.hasAttribute(NAME_ATTR)) {
+            throw new IllegalArgumentException("DOM element representing POJO field object should have '"
+                + NAME_ATTR + "' attribute");
+        }
+
+        this.name = el.getAttribute(NAME_ATTR).trim();
+        this.col = el.hasAttribute(COLUMN_ATTR) ? el.getAttribute(COLUMN_ATTR).trim() : name.toLowerCase();
+
+        init(PropertyMappingHelper.getPojoPropertyDescriptor(pojoCls, name));
+    }
+
+    /**
+     * Creates instance of {@link PojoField}  from its property descriptor.
+     *
+     * @param desc Field property descriptor.
+     */
+    public PojoField(PropertyDescriptor desc) {
+        this.name = desc.getName();
+
+        QuerySqlField sqlField = desc.getReadMethod() != null ?
+            desc.getReadMethod().getAnnotation(QuerySqlField.class) :
+            desc.getWriteMethod() == null ?
+                null :
+                desc.getWriteMethod().getAnnotation(QuerySqlField.class);
+
+        this.col = sqlField != null && sqlField.name() != null ? sqlField.name() : name.toLowerCase();
+
+        init(desc);
+
+        if (sqlField != null)
+            init(sqlField);
+    }
+
+    /**
+     * @return field name.
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * @return Cassandra table column name.
+     */
+    public String getColumn() {
+        return col;
+    }
+
+    /**
+     * @return Cassandra table column DDL statement.
+     */
+    public String getColumnDDL() {
+        return colDDL;
+    }
+
+    /**
+     * Gets field value as an object having Cassandra compatible type.
+     * This it could be stored directly into Cassandra without any conversions.
+     *
+     * @param obj Object instance.
+     * @param serializer {@link org.apache.ignite.cache.store.cassandra.serializer.Serializer} to use.
+     * @return Object to store in Cassandra table column.
+     */
+    public Object getValueFromObject(Object obj, Serializer serializer) {
+        try {
+            Object val = propDesc().getReadMethod().invoke(obj);
+
+            if (val == null)
+                return null;
+
+            DataType.Name cassandraType = PropertyMappingHelper.getCassandraType(val.getClass());
+
+            if (cassandraType != null)
+                return val;
+
+            if (serializer == null) {
+                throw new IllegalStateException("Can't serialize value from object '" +
+                    val.getClass().getName() + "' field '" + name + "', cause there is no BLOB serializer specified");
+            }
+
+            return serializer.serialize(val);
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to get value of the field '" + name + "' from the instance " +
+                " of '" + obj.getClass().toString() + "' class", e);
+        }
+    }
+
+    /**
+     * Sets object field value from a {@link com.datastax.driver.core.Row} returned by Cassandra CQL statement.
+     *
+     * @param row {@link com.datastax.driver.core.Row}
+     * @param obj object which field should be populated from {@link com.datastax.driver.core.Row}
+     * @param serializer {@link org.apache.ignite.cache.store.cassandra.serializer.Serializer} to use.
+     */
+    public void setValueFromRow(Row row, Object obj, Serializer serializer) {
+        Object val = PropertyMappingHelper.getCassandraColumnValue(row, col, propDesc().getPropertyType(), serializer);
+
+        try {
+            propDesc().getWriteMethod().invoke(obj, val);
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to set value of the field '" + name + "' of the instance " +
+                " of '" + obj.getClass().toString() + "' class", e);
+        }
+    }
+
+    /**
+     * Initializes field info from annotation.
+     *
+     * @param sqlField {@link QuerySqlField} annotation.
+     */
+    protected abstract void init(QuerySqlField sqlField);
+
+    /**
+     * Initializes field info from property descriptor.
+     *
+     * @param desc {@link PropertyDescriptor} descriptor.
+     */
+    protected void init(PropertyDescriptor desc) {
+        if (desc.getReadMethod() == null) {
+            throw new IllegalArgumentException("Field '" + desc.getName() +
+                "' of the class instance '" + desc.getPropertyType().getName() +
+                "' doesn't provide getter method");
+        }
+
+        if (desc.getWriteMethod() == null) {
+            throw new IllegalArgumentException("Field '" + desc.getName() +
+                "' of POJO object instance of the class '" + desc.getPropertyType().getName() +
+                "' doesn't provide write method");
+        }
+
+        if (!desc.getReadMethod().isAccessible())
+            desc.getReadMethod().setAccessible(true);
+
+        if (!desc.getWriteMethod().isAccessible())
+            desc.getWriteMethod().setAccessible(true);
+
+        DataType.Name cassandraType = PropertyMappingHelper.getCassandraType(desc.getPropertyType());
+        cassandraType = cassandraType == null ? DataType.Name.BLOB : cassandraType;
+
+        this.javaCls = desc.getReadMethod().getDeclaringClass();
+        this.desc = desc;
+        this.colDDL = col + " " + cassandraType.toString();
+    }
+
+    /**
+     * Returns property descriptor of the POJO field
+     *
+     * @return Property descriptor
+     */
+    private PropertyDescriptor propDesc() {
+        return desc != null ? desc : (desc = PropertyMappingHelper.getPojoPropertyDescriptor(javaCls, name));
+    }
+}


Mime
View raw message