ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptupit...@apache.org
Subject [19/35] ignite git commit: IGNITE-3172 Refactoring Ignite-Cassandra serializers. - Fixes #956.
Date Wed, 14 Sep 2016 10:53:24 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/pom.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/pom.xml b/modules/cassandra/store/pom.xml
new file mode 100644
index 0000000..0b233fa
--- /dev/null
+++ b/modules/cassandra/store/pom.xml
@@ -0,0 +1,305 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-cassandra</artifactId>
+        <version>1.8.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>ignite-cassandra-store</artifactId>
+    <version>1.8.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <properties>
+        <commons-beanutils.version>1.8.3</commons-beanutils.version>
+        <cassandra-driver.version>3.0.0</cassandra-driver.version>
+        <cassandra-all.version>3.3</cassandra-all.version>
+        <netty.version>4.0.33.Final</netty.version>
+        <guava.version>19.0</guava.version>
+        <metrics-core.version>3.0.2</metrics-core.version>
+    </properties>
+
+    <dependencies>
+        <!-- Apache commons -->
+        <dependency>
+            <groupId>commons-beanutils</groupId>
+            <artifactId>commons-beanutils</artifactId>
+            <version>${commons-beanutils.version}</version>
+        </dependency>
+
+        <!-- Ignite -->
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Cassandra and required dependencies -->
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-core</artifactId>
+            <version>${cassandra-driver.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>${metrics-core.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.cassandra</groupId>
+            <artifactId>cassandra-all</artifactId>
+            <version>${cassandra-all.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Apache log4j -->
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.2</version>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                    <compilerVersion>1.7</compilerVersion>
+                    <encoding>UTF-8</encoding>
+                    <fork>true</fork>
+                    <debug>false</debug>
+                    <debuglevel>lines,vars,source</debuglevel>
+                    <meminitial>256</meminitial>
+                    <maxmem>512</maxmem>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.10</version>
+                <executions>
+                    <execution>
+                        <id>copy-all-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${project.build.directory}/tests-package/lib</outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                            <excludeArtifactIds>
+                                netty-all,cassandra-all,snappy-java,lz4,compress-lzf,commons-codec,commons-lang3,commons-math3,
+                                concurrentlinkedhashmap-lru,antlr,ST4,antlr-runtime,jcl-over-slf4j,jackson-core-asl,
+                                jackson-mapper-asl,json-simple,high-scale-lib,snakeyaml,jbcrypt,reporter-config3,
+                                reporter-config-base,hibernate-validator,validation-api,jboss-logging,thrift-server,
+                                disruptor,stream,fastutil,logback-core,logback-classic,libthrift,httpclient,httpcore,
+                                cassandra-thrift,jna,jamm,joda-time,sigar,ecj,tools
+                            </excludeArtifactIds>
+                        </configuration>
+                    </execution>
+<!-- -->
+                    <execution>
+                        <id>copy-main-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${project.build.directory}/libs</outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                            <excludeTransitive>true</excludeTransitive>
+                            <excludeGroupIds>
+                                org.apache.ignite,org.springframework,org.gridgain
+                            </excludeGroupIds>
+                            <excludeArtifactIds>
+                                commons-logging,slf4j-api,cache-api,slf4j-api,aopalliance
+                            </excludeArtifactIds>
+                            <includeScope>runtime</includeScope>
+                        </configuration>
+                    </execution>
+<!-- -->
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <version>1.8</version>
+                <dependencies>
+                    <dependency>
+                        <groupId>ant-contrib</groupId>
+                        <artifactId>ant-contrib</artifactId>
+                        <version>1.0b3</version>
+                        <exclusions>
+                            <exclusion>
+                                <groupId>ant</groupId>
+                                <artifactId>ant</artifactId>
+                            </exclusion>
+                        </exclusions>
+                    </dependency>
+                </dependencies>
+                <executions>
+                    <execution>
+                        <id>package-tests</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <target>
+                                <taskdef resource="net/sf/antcontrib/antlib.xml" />
+                                <if>
+                                    <available file="${project.build.directory}/test-classes" type="dir" />
+                                    <then>
+                                        <copy todir="${project.build.directory}/tests-package/lib">
+                                            <fileset dir="${project.build.directory}">
+                                                <include name="*.jar" />
+                                            </fileset>
+                                        </copy>
+
+                                        <jar destfile="${project.build.directory}/tests-package/lib/${project.artifactId}-${project.version}-tests.jar">
+                                            <fileset dir="${project.build.directory}/test-classes">
+                                                <include name="**/*.class" />
+                                            </fileset>
+                                        </jar>
+
+                                        <copy todir="${project.build.directory}/tests-package/settings">
+                                            <fileset dir="${project.build.directory}/test-classes">
+                                                <include name="**/*.properties" />
+                                                <include name="**/*.xml" />
+                                            </fileset>
+                                        </copy>
+
+                                        <copy todir="${project.build.directory}/tests-package">
+                                            <fileset dir="${project.build.testSourceDirectory}/../scripts">
+                                                <include name="**/*" />
+                                            </fileset>
+                                        </copy>
+
+                                        <fixcrlf srcdir="${project.build.directory}/tests-package" eol="lf" eof="remove">
+                                            <include name="*.sh" />
+                                        </fixcrlf>
+
+                                        <copy todir="${project.build.directory}/tests-package">
+                                            <fileset dir="${project.build.testSourceDirectory}/..">
+                                                <include name="bootstrap/**" />
+                                            </fileset>
+                                        </copy>
+
+                                        <fixcrlf srcdir="${project.build.directory}/tests-package/bootstrap" eol="lf" eof="remove">
+                                            <include name="**" />
+                                        </fixcrlf>
+
+                                        <zip destfile="${project.build.directory}/ignite-cassandra-tests-${project.version}.zip" compress="true" whenempty="create" level="9" encoding="UTF-8" useLanguageEncodingFlag="true" createUnicodeExtraFields="not-encodeable">
+
+                                            <zipfileset dir="${project.build.directory}/tests-package" prefix="ignite-cassandra-tests">
+                                                <exclude name="**/*.sh" />
+                                            </zipfileset>
+
+                                            <zipfileset dir="${project.build.directory}/tests-package" prefix="ignite-cassandra-tests" filemode="555">
+                                                <include name="**/*.sh" />
+                                            </zipfileset>
+                                        </zip>
+                                    </then>
+                                </if>
+                            </target>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
new file mode 100644
index 0000000..f7e7917
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import javax.cache.Cache;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
+import org.apache.ignite.cache.store.cassandra.session.ExecutionAssistant;
+import org.apache.ignite.cache.store.cassandra.session.GenericBatchExecutionAssistant;
+import org.apache.ignite.cache.store.cassandra.session.LoadCacheCustomQueryWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * Implementation of {@link CacheStore} backed by Cassandra database.
+ *
+ * @param <K> Ignite cache key type.
+ * @param <V> Ignite cache value type.
+ */
+public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
+    /** Connection attribute property name. */
+    private static final String ATTR_CONN_PROP = "CASSANDRA_STORE_CONNECTION";
+
+    /** Auto-injected store session. */
+    @CacheStoreSessionResource
+    private CacheStoreSession storeSes;
+
+    /** Auto-injected logger instance. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** Cassandra data source. */
+    private DataSource dataSrc;
+
+    /** Max workers thread count. These threads are responsible for load cache. */
+    private int maxPoolSize = Runtime.getRuntime().availableProcessors();
+
+    /** Controller component responsible for serialization logic. */
+    private PersistenceController controller;
+
+    /**
+     * Store constructor.
+     *
+     * @param dataSrc Data source.
+     * @param settings Persistence settings for Ignite key and value objects.
+     * @param maxPoolSize Max workers thread count.
+     */
+    public CassandraCacheStore(DataSource dataSrc, KeyValuePersistenceSettings settings, int maxPoolSize) {
+        this.dataSrc = dataSrc;
+        this.controller = new PersistenceController(settings);
+        this.maxPoolSize = maxPoolSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) throws CacheLoaderException {
+        if (clo == null || args == null || args.length == 0)
+            return;
+
+        ExecutorService pool = null;
+
+        Collection<Future<?>> futs = new ArrayList<>(args.length);
+
+        try {
+            pool = Executors.newFixedThreadPool(maxPoolSize);
+
+            CassandraSession ses = getCassandraSession();
+
+            for (Object obj : args) {
+                if (obj == null || !(obj instanceof String) || !((String)obj).trim().toLowerCase().startsWith("select"))
+                    continue;
+
+                futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo)));
+            }
+
+            for (Future<?> fut : futs)
+                U.get(fut);
+
+            if (log != null && log.isDebugEnabled() && storeSes != null)
+                log.debug("Cache loaded from db: " + storeSes.cacheName());
+        }
+        catch (IgniteCheckedException e) {
+            if (storeSes != null)
+                throw new CacheLoaderException("Failed to load Ignite cache: " + storeSes.cacheName(), e.getCause());
+            else
+                throw new CacheLoaderException("Failed to load cache", e.getCause());
+        }
+        finally {
+            U.shutdownNow(getClass(), pool, log);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sessionEnd(boolean commit) throws CacheWriterException {
+        if (storeSes == null || storeSes.transaction() == null)
+            return;
+
+        CassandraSession cassandraSes = (CassandraSession) storeSes.properties().remove(ATTR_CONN_PROP);
+
+        U.closeQuiet(cassandraSes);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"unchecked"})
+    @Override public V load(final K key) throws CacheLoaderException {
+        if (key == null)
+            return null;
+
+        CassandraSession ses = getCassandraSession();
+
+        try {
+            return ses.execute(new ExecutionAssistant<V>() {
+                @Override public boolean tableExistenceRequired() {
+                    return false;
+                }
+
+                @Override public String getStatement() {
+                    return controller.getLoadStatement(false);
+                }
+
+                @Override public BoundStatement bindStatement(PreparedStatement statement) {
+                    return controller.bindKey(statement, key);
+                }
+
+                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+                    return controller.getPersistenceSettings();
+                }
+
+                @Override public String operationName() {
+                    return "READ";
+                }
+
+                @Override public V process(Row row) {
+                    return row == null ? null : (V)controller.buildValueObject(row);
+                }
+            });
+        }
+        finally {
+            closeCassandraSession(ses);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
+        if (keys == null || !keys.iterator().hasNext())
+            return new HashMap<>();
+
+        CassandraSession ses = getCassandraSession();
+
+        try {
+            return ses.execute(new GenericBatchExecutionAssistant<Map<K, V>, K>() {
+                private Map<K, V> data = new HashMap<>();
+
+                /** {@inheritDoc} */
+                @Override public String getStatement() {
+                    return controller.getLoadStatement(true);
+                }
+
+                /** {@inheritDoc} */
+                @Override  public BoundStatement bindStatement(PreparedStatement statement, K key) {
+                    return controller.bindKey(statement, key);
+                }
+
+                /** {@inheritDoc} */
+                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+                    return controller.getPersistenceSettings();
+                }
+
+                /** {@inheritDoc} */
+                @Override public String operationName() {
+                    return "BULK_READ";
+                }
+
+                /** {@inheritDoc} */
+                @Override public Map<K, V> processedData() {
+                    return data;
+                }
+
+                /** {@inheritDoc} */
+                @Override protected void process(Row row) {
+                    data.put((K)controller.buildKeyObject(row), (V)controller.buildValueObject(row));
+                }
+            }, keys);
+        }
+        finally {
+            closeCassandraSession(ses);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(final Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
+        if (entry == null || entry.getKey() == null)
+            return;
+
+        CassandraSession ses = getCassandraSession();
+
+        try {
+            ses.execute(new ExecutionAssistant<Void>() {
+                @Override public boolean tableExistenceRequired() {
+                    return true;
+                }
+
+                @Override public String getStatement() {
+                    return controller.getWriteStatement();
+                }
+
+                @Override public BoundStatement bindStatement(PreparedStatement statement) {
+                    return controller.bindKeyValue(statement, entry.getKey(), entry.getValue());
+                }
+
+                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+                    return controller.getPersistenceSettings();
+                }
+
+                @Override public String operationName() {
+                    return "WRITE";
+                }
+
+                @Override public Void process(Row row) {
+                    return null;
+                }
+            });
+        }
+        finally {
+            closeCassandraSession(ses);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException {
+        if (entries == null || entries.isEmpty())
+            return;
+
+        CassandraSession ses = getCassandraSession();
+
+        try {
+            ses.execute(new GenericBatchExecutionAssistant<Void, Cache.Entry<? extends K, ? extends V>>() {
+                /** {@inheritDoc} */
+                @Override public String getStatement() {
+                    return controller.getWriteStatement();
+                }
+
+                /** {@inheritDoc} */
+                @Override public BoundStatement bindStatement(PreparedStatement statement,
+                    Cache.Entry<? extends K, ? extends V> entry) {
+                    return controller.bindKeyValue(statement, entry.getKey(), entry.getValue());
+                }
+
+                /** {@inheritDoc} */
+                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+                    return controller.getPersistenceSettings();
+                }
+
+                /** {@inheritDoc} */
+                @Override public String operationName() {
+                    return "BULK_WRITE";
+                }
+
+                /** {@inheritDoc} */
+                @Override public boolean tableExistenceRequired() {
+                    return true;
+                }
+            }, entries);
+        }
+        finally {
+            closeCassandraSession(ses);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void delete(final Object key) throws CacheWriterException {
+        if (key == null)
+            return;
+
+        CassandraSession ses = getCassandraSession();
+
+        try {
+            ses.execute(new ExecutionAssistant<Void>() {
+                @Override public boolean tableExistenceRequired() {
+                    return false;
+                }
+
+                @Override public String getStatement() {
+                    return controller.getDeleteStatement();
+                }
+
+                @Override public BoundStatement bindStatement(PreparedStatement statement) {
+                    return controller.bindKey(statement, key);
+                }
+
+
+                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+                    return controller.getPersistenceSettings();
+                }
+
+                @Override public String operationName() {
+                    return "DELETE";
+                }
+
+                @Override public Void process(Row row) {
+                    return null;
+                }
+            });
+        }
+        finally {
+            closeCassandraSession(ses);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
+        if (keys == null || keys.isEmpty())
+            return;
+
+        CassandraSession ses = getCassandraSession();
+
+        try {
+            ses.execute(new GenericBatchExecutionAssistant<Void, Object>() {
+                /** {@inheritDoc} */
+                @Override public String getStatement() {
+                    return controller.getDeleteStatement();
+                }
+
+                /** {@inheritDoc} */
+                @Override public BoundStatement bindStatement(PreparedStatement statement, Object key) {
+                    return controller.bindKey(statement, key);
+                }
+
+                /** {@inheritDoc} */
+                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+                    return controller.getPersistenceSettings();
+                }
+
+                @Override public String operationName() {
+                    return "BULK_DELETE";
+                }
+            }, keys);
+        }
+        finally {
+            closeCassandraSession(ses);
+        }
+    }
+
+    /**
+     * Gets Cassandra session wrapper or creates new if it doesn't exist.
+     * This wrapper hides all the low-level Cassandra interaction details by providing only high-level methods.
+     *
+     * @return Cassandra session wrapper.
+     */
+    private CassandraSession getCassandraSession() {
+        if (storeSes == null || storeSes.transaction() == null)
+            return dataSrc.session(log != null ? log : new NullLogger());
+
+        CassandraSession ses = (CassandraSession) storeSes.properties().get(ATTR_CONN_PROP);
+
+        if (ses == null) {
+            ses = dataSrc.session(log != null ? log : new NullLogger());
+            storeSes.properties().put(ATTR_CONN_PROP, ses);
+        }
+
+        return ses;
+    }
+
+    /**
+     * Releases Cassandra related resources.
+     *
+     * @param ses Cassandra session wrapper.
+     */
+    private void closeCassandraSession(CassandraSession ses) {
+        if (ses != null && (storeSes == null || storeSes.transaction() == null))
+            U.closeQuiet(ses);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java
new file mode 100644
index 0000000..7584dfb
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra;
+
+import javax.cache.configuration.Factory;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+import org.apache.ignite.internal.IgniteComponentType;
+import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
+import org.apache.ignite.resources.SpringApplicationContextResource;
+
+/**
+ * Factory class to instantiate {@link CassandraCacheStore}.
+ *
+ * @param <K> Ignite cache key type
+ * @param <V> Ignite cache value type
+ */
+public class CassandraCacheStoreFactory<K, V> implements Factory<CassandraCacheStore<K, V>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Auto-injected Spring ApplicationContext resource. */
+    @SpringApplicationContextResource
+    private Object appCtx;
+
+    /** Name of data source bean. */
+    private String dataSrcBean;
+
+    /** Name of persistence settings bean. */
+    private String persistenceSettingsBean;
+
+    /** Data source. */
+    private transient DataSource dataSrc;
+
+    /** Persistence settings. */
+    private KeyValuePersistenceSettings persistenceSettings;
+
+    /** Max workers thread count. These threads are responsible for load cache. */
+    private int maxPoolSize = Runtime.getRuntime().availableProcessors();
+
+    /** {@inheritDoc} */
+    @Override public CassandraCacheStore<K, V> create() {
+        return new CassandraCacheStore<>(getDataSource(), getPersistenceSettings(), getMaxPoolSize());
+    }
+
+    /**
+     * Sets data source.
+     *
+     * @param dataSrc Data source.
+     *
+     * @return {@code This} for chaining.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public CassandraCacheStoreFactory<K, V> setDataSource(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+
+        return this;
+    }
+
+    /**
+     * Sets data source bean name.
+     *
+     * @param beanName Data source bean name.
+     * @return {@code This} for chaining.
+     */
+    public CassandraCacheStoreFactory<K, V> setDataSourceBean(String beanName) {
+        this.dataSrcBean = beanName;
+
+        return this;
+    }
+
+    /**
+     * Sets persistence settings.
+     *
+     * @param settings Persistence settings.
+     * @return {@code This} for chaining.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public CassandraCacheStoreFactory<K, V> setPersistenceSettings(KeyValuePersistenceSettings settings) {
+        this.persistenceSettings = settings;
+
+        return this;
+    }
+
+    /**
+     * Sets persistence settings bean name.
+     *
+     * @param beanName Persistence settings bean name.
+     * @return {@code This} for chaining.
+     */
+    public CassandraCacheStoreFactory<K, V> setPersistenceSettingsBean(String beanName) {
+        this.persistenceSettingsBean = beanName;
+
+        return this;
+    }
+
+    /**
+     * @return Data source.
+     */
+    private DataSource getDataSource() {
+        if (dataSrc != null)
+            return dataSrc;
+
+        if (dataSrcBean == null)
+            throw new IllegalStateException("Either DataSource bean or DataSource itself should be specified");
+
+        if (appCtx == null) {
+            throw new IllegalStateException("Failed to get Cassandra DataSource cause Spring application " +
+                "context wasn't injected into CassandraCacheStoreFactory");
+        }
+
+        Object obj = loadSpringContextBean(appCtx, dataSrcBean);
+
+        if (!(obj instanceof DataSource))
+            throw new IllegalStateException("Incorrect connection bean '" + dataSrcBean + "' specified");
+
+        return dataSrc = (DataSource)obj;
+    }
+
+    /**
+     * @return Persistence settings.
+     */
+    private KeyValuePersistenceSettings getPersistenceSettings() {
+        if (persistenceSettings != null)
+            return persistenceSettings;
+
+        if (persistenceSettingsBean == null) {
+            throw new IllegalStateException("Either persistence settings bean or persistence settings itself " +
+                "should be specified");
+        }
+
+        if (appCtx == null) {
+            throw new IllegalStateException("Failed to get Cassandra persistence settings cause Spring application " +
+                "context wasn't injected into CassandraCacheStoreFactory");
+        }
+
+        Object obj = loadSpringContextBean(appCtx, persistenceSettingsBean);
+
+        if (!(obj instanceof KeyValuePersistenceSettings)) {
+            throw new IllegalStateException("Incorrect persistence settings bean '" +
+                persistenceSettingsBean + "' specified");
+        }
+
+        return persistenceSettings = (KeyValuePersistenceSettings)obj;
+    }
+
+    /**
+     * Get maximum workers thread count. These threads are responsible for queries execution.
+     *
+     * @return Maximum workers thread count.
+     */
+    public int getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    /**
+     * Set Maximum workers thread count. These threads are responsible for queries execution.
+     *
+     * @param maxPoolSize Max workers thread count.
+     * @return {@code This} for chaining.
+     */
+    public CassandraCacheStoreFactory<K, V> setMaxPoolSize(int maxPoolSize) {
+        this.maxPoolSize = maxPoolSize;
+
+        return this;
+    }
+
+    /**
+     * Loads bean from Spring ApplicationContext.
+     *
+     * @param appCtx Application context.
+     * @param beanName Bean name to load.
+     * @return Loaded bean.
+     */
+    private Object loadSpringContextBean(Object appCtx, String beanName) {
+        try {
+            IgniteSpringHelper spring = IgniteComponentType.SPRING.create(false);
+            return spring.loadBeanFromAppContext(appCtx, beanName);
+        }
+        catch (Exception e) {
+            throw new IgniteException("Failed to load bean in application context [beanName=" + beanName + ", igniteConfig=" + appCtx + ']', e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
new file mode 100644
index 0000000..d3bff7f
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.common;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.ReadTimeoutException;
+import java.util.regex.Pattern;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Helper class providing methods to work with Cassandra session and exceptions
+ */
+public class CassandraHelper {
+    /** Cassandra error message if specified keyspace doesn't exist. */
+    private static final Pattern KEYSPACE_EXIST_ERROR1 = Pattern.compile("Keyspace [0-9a-zA-Z_]+ does not exist");
+
+    /** Cassandra error message if trying to create table inside nonexistent keyspace. */
+    private static final Pattern KEYSPACE_EXIST_ERROR2 = Pattern.compile("Cannot add table '[0-9a-zA-Z_]+' to non existing keyspace.*");
+
+    /** Cassandra error message if specified table doesn't exist. */
+    private static final Pattern TABLE_EXIST_ERROR = Pattern.compile("unconfigured table [0-9a-zA-Z_]+");
+
+    /** Cassandra error message if trying to use prepared statement created from another session. */
+    private static final String PREP_STATEMENT_CLUSTER_INSTANCE_ERROR = "You may have used a PreparedStatement that " +
+        "was created with another Cluster instance";
+
+    /** Closes Cassandra driver session. */
+    public static void closeSession(Session driverSes) {
+        if (driverSes == null)
+            return;
+
+        Cluster cluster = driverSes.getCluster();
+
+        if (!driverSes.isClosed())
+            U.closeQuiet(driverSes);
+
+        if (!cluster.isClosed())
+            U.closeQuiet(cluster);
+    }
+
+    /**
+     * Checks if Cassandra keyspace absence error occur.
+     *
+     * @param e Exception to check.
+     * @return {@code true} in case of keyspace absence error.
+     */
+    public static boolean isKeyspaceAbsenceError(Throwable e) {
+        while (e != null) {
+            if (e instanceof InvalidQueryException &&
+                (KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() ||
+                    KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches()))
+                return true;
+
+            e = e.getCause();
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks if Cassandra table absence error occur.
+     *
+     * @param e Exception to check.
+     * @return {@code true} in case of table absence error.
+     */
+    public static boolean isTableAbsenceError(Throwable e) {
+        while (e != null) {
+            if (e instanceof InvalidQueryException &&
+                (TABLE_EXIST_ERROR.matcher(e.getMessage()).matches() ||
+                    KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() ||
+                    KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches()))
+                return true;
+
+            e = e.getCause();
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks if Cassandra host availability error occur, thus host became unavailable.
+     *
+     * @param e Exception to check.
+     * @return {@code true} in case of host not available error.
+     */
+    public static boolean isHostsAvailabilityError(Throwable e) {
+        while (e != null) {
+            if (e instanceof NoHostAvailableException ||
+                e instanceof ReadTimeoutException)
+                return true;
+
+            e = e.getCause();
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks if Cassandra error occur because of prepared statement created in one session was used in another session.
+     *
+     * @param e Exception to check.
+     * @return {@code true} in case of invalid usage of prepared statement.
+     */
+    public static boolean isPreparedStatementClusterError(Throwable e) {
+        while (e != null) {
+            if (e instanceof InvalidQueryException && e.getMessage().contains(PREP_STATEMENT_CLUSTER_INSTANCE_ERROR))
+                return true;
+
+            e = e.getCause();
+        }
+
+        return false;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java
new file mode 100644
index 0000000..9053a93
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.common;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Row;
+import java.beans.PropertyDescriptor;
+import java.lang.annotation.Annotation;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.beanutils.PropertyUtils;
+import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
+
+/**
+ * Helper class providing bunch of methods to discover fields of POJO objects and
+ * map builtin Java types to appropriate Cassandra types.
+ */
+public class PropertyMappingHelper {
+    /** Bytes array Class type. */
+    private static final Class BYTES_ARRAY_CLASS = (new byte[] {}).getClass();
+
+    /** Mapping from Java to Cassandra types. */
+    private static final Map<Class, DataType.Name> JAVA_TO_CASSANDRA_MAPPING = new HashMap<Class, DataType.Name>() {{
+        put(String.class, DataType.Name.TEXT);
+        put(Integer.class, DataType.Name.INT);
+        put(int.class, DataType.Name.INT);
+        put(Short.class, DataType.Name.INT);
+        put(short.class, DataType.Name.INT);
+        put(Long.class, DataType.Name.BIGINT);
+        put(long.class, DataType.Name.BIGINT);
+        put(Double.class, DataType.Name.DOUBLE);
+        put(double.class, DataType.Name.DOUBLE);
+        put(Boolean.class, DataType.Name.BOOLEAN);
+        put(boolean.class, DataType.Name.BOOLEAN);
+        put(Float.class, DataType.Name.FLOAT);
+        put(float.class, DataType.Name.FLOAT);
+        put(ByteBuffer.class, DataType.Name.BLOB);
+        put(BYTES_ARRAY_CLASS, DataType.Name.BLOB);
+        put(BigDecimal.class, DataType.Name.DECIMAL);
+        put(InetAddress.class, DataType.Name.INET);
+        put(Date.class, DataType.Name.TIMESTAMP);
+        put(UUID.class, DataType.Name.UUID);
+        put(BigInteger.class, DataType.Name.VARINT);
+    }};
+
+    /**
+     * Maps Cassandra type to specified Java type.
+     *
+     * @param clazz java class.
+     *
+     * @return Cassandra type.
+     */
+    public static DataType.Name getCassandraType(Class clazz) {
+        return JAVA_TO_CASSANDRA_MAPPING.get(clazz);
+    }
+
+    /**
+     * Returns property descriptor by class property name.
+     *
+     * @param clazz class from which to get property descriptor.
+     * @param prop name of the property.
+     *
+     * @return property descriptor.
+     */
+    public static PropertyDescriptor getPojoPropertyDescriptor(Class clazz, String prop) {
+        List<PropertyDescriptor> descriptors = getPojoPropertyDescriptors(clazz, false);
+
+        if (descriptors == null || descriptors.isEmpty())
+            throw new IllegalArgumentException("POJO class " + clazz.getName() + " doesn't have '" + prop + "' property");
+
+        for (PropertyDescriptor descriptor : descriptors) {
+            if (descriptor.getName().equals(prop))
+                return descriptor;
+        }
+
+        throw new IllegalArgumentException("POJO class " + clazz.getName() + " doesn't have '" + prop + "' property");
+    }
+
+    /**
+     * Extracts all property descriptors from a class.
+     *
+     * @param clazz class which property descriptors should be extracted.
+     * @param primitive boolean flag indicating that only property descriptors for primitive properties should be extracted.
+     *
+     * @return list of class property descriptors
+     */
+    public static List<PropertyDescriptor> getPojoPropertyDescriptors(Class clazz, boolean primitive) {
+        return getPojoPropertyDescriptors(clazz, null, primitive);
+    }
+
+    /**
+     * Extracts all property descriptors having specific annotation from a class.
+     *
+     * @param clazz class which property descriptors should be extracted.
+     * @param annotation annotation to look for.
+     * @param primitive boolean flag indicating that only property descriptors for primitive properties should be extracted.
+     *
+     * @return list of class property descriptors
+     */
+    public static <T extends Annotation> List<PropertyDescriptor> getPojoPropertyDescriptors(Class clazz,
+        Class<T> annotation, boolean primitive) {
+        PropertyDescriptor[] descriptors = PropertyUtils.getPropertyDescriptors(clazz);
+
+        List<PropertyDescriptor> list = new ArrayList<>(descriptors == null ? 1 : descriptors.length);
+
+        if (descriptors == null || descriptors.length == 0)
+            return list;
+
+        for (PropertyDescriptor descriptor : descriptors) {
+            if (descriptor.getReadMethod() == null || descriptor.getWriteMethod() == null ||
+                (primitive && !isPrimitivePropertyDescriptor(descriptor)))
+                continue;
+
+            if (annotation == null || descriptor.getReadMethod().getAnnotation(annotation) != null)
+                list.add(descriptor);
+        }
+
+        return list;
+    }
+
+    /**
+     * Checks if property descriptor describes primitive property (int, boolean, long and etc.)
+     *
+     * @param desc property descriptor.
+     *
+     * @return {@code true} property is primitive
+     */
+    public static boolean isPrimitivePropertyDescriptor(PropertyDescriptor desc) {
+        return PropertyMappingHelper.JAVA_TO_CASSANDRA_MAPPING.containsKey(desc.getPropertyType());
+    }
+
+    /**
+     * Returns value of specific column in the row returned by CQL statement.
+     *
+     * @param row row returned by CQL statement.
+     * @param col column name.
+     * @param clazz java class to which column value should be casted.
+     * @param serializer serializer to use if column stores BLOB otherwise could be null.
+     *
+     * @return row column value.
+     */
+    public static Object getCassandraColumnValue(Row row, String col, Class clazz, Serializer serializer) {
+        if (String.class.equals(clazz))
+            return row.getString(col);
+
+        if (Integer.class.equals(clazz) || int.class.equals(clazz))
+            return row.getInt(col);
+
+        if (Short.class.equals(clazz) || short.class.equals(clazz))
+            return (short)row.getInt(col);
+
+        if (Long.class.equals(clazz) || long.class.equals(clazz))
+            return row.getLong(col);
+
+        if (Double.class.equals(clazz) || double.class.equals(clazz))
+            return row.getDouble(col);
+
+        if (Boolean.class.equals(clazz) || boolean.class.equals(clazz))
+            return row.getBool(col);
+
+        if (Float.class.equals(clazz) || float.class.equals(clazz))
+            return row.getFloat(col);
+
+        if (ByteBuffer.class.equals(clazz))
+            return row.getBytes(col);
+
+        if (PropertyMappingHelper.BYTES_ARRAY_CLASS.equals(clazz)) {
+            ByteBuffer buf = row.getBytes(col);
+
+            return buf == null ? null : buf.array();
+        }
+
+        if (BigDecimal.class.equals(clazz))
+            return row.getDecimal(col);
+
+        if (InetAddress.class.equals(clazz))
+            return row.getInet(col);
+
+        if (Date.class.equals(clazz))
+            return row.getTimestamp(col);
+
+        if (UUID.class.equals(clazz))
+            return row.getUUID(col);
+
+        if (BigInteger.class.equals(clazz))
+            return row.getVarint(col);
+
+        if (serializer == null) {
+            throw new IllegalStateException("Can't deserialize value from '" + col + "' Cassandra column, " +
+                "cause there is no BLOB serializer specified");
+        }
+
+        ByteBuffer buf = row.getBytes(col);
+
+        return buf == null ? null : serializer.deserialize(buf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java
new file mode 100644
index 0000000..6745a16
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.common;
+
+import java.util.Random;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+
+/**
+ * Provides sleep method with randomly selected sleep time from specified range and
+ * incrementally shifts sleep time range for each next sleep attempt
+ *
+ */
+public class RandomSleeper {
+    /** */
+    private int min;
+
+    /** */
+    private int max;
+
+    /** */
+    private int incr;
+
+    /** */
+    private IgniteLogger log;
+
+    /** */
+    private Random random = new Random(System.currentTimeMillis());
+
+    /** */
+    private int summary = 0;
+
+    /**
+     * Creates sleeper instance.
+     *
+     * @param min minimum sleep time (in milliseconds)
+     * @param max maximum sleep time (in milliseconds)
+     * @param incr time range shift increment (in milliseconds)
+     */
+    public RandomSleeper(int min, int max, int incr, IgniteLogger log) {
+        if (min <= 0)
+            throw new IllegalArgumentException("Incorrect min time specified: " + min);
+
+        if (max <= min)
+            throw new IllegalArgumentException("Incorrect max time specified: " + max);
+
+        if (incr < 10)
+            throw new IllegalArgumentException("Incorrect increment specified: " + incr);
+
+        this.min = min;
+        this.max = max;
+        this.incr = incr;
+        this.log = log;
+    }
+
+    /**
+     * Sleeps
+     */
+    public void sleep() {
+        try {
+            int timeout = random.nextInt(max - min + 1) + min;
+
+            if (log != null)
+                log.info("Sleeping for " + timeout + "ms");
+
+            Thread.sleep(timeout);
+
+            summary += timeout;
+
+            if (log != null)
+                log.info("Sleep completed");
+        }
+        catch (InterruptedException e) {
+            throw new IgniteException("Random sleep interrupted", e);
+        }
+
+        min += incr;
+        max += incr;
+    }
+
+    /**
+     * Returns summary sleep time.
+     *
+     * @return Summary sleep time in milliseconds.
+     */
+    public int getSleepSummary() {
+        return summary;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java
new file mode 100644
index 0000000..5d51488
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.common;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Helper class providing system information about the host (ip, hostname, os and etc.)
+ */
+public class SystemHelper {
+    /** System line separator. */
+    public static final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+    /** Host name. */
+    public static final String HOST_NAME;
+
+    /** Host IP address */
+    public static final String HOST_IP;
+
+    static {
+        try {
+            InetAddress addr = InetAddress.getLocalHost();
+            HOST_NAME = addr.getHostName();
+            HOST_IP = addr.getHostAddress();
+        }
+        catch (UnknownHostException e) {
+            throw new IllegalStateException("Failed to get host/ip of current computer", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java
new file mode 100644
index 0000000..c4f5d3b
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains commonly used helper classes
+ */
+package org.apache.ignite.cache.store.cassandra.common;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
new file mode 100644
index 0000000..a2358a6
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.datasource;
+
+import java.io.Serializable;
+
+/**
+ * Provides credentials for Cassandra (instead of specifying user/password directly in Spring context XML).
+ */
+public interface Credentials extends Serializable {
+    /**
+     * Returns user name
+     *
+     * @return user name
+     */
+    public String getUser();
+
+    /**
+     * Returns password
+     *
+     * @return password
+     */
+    public String getPassword();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
new file mode 100644
index 0000000..f582aac
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.datasource;
+
+import com.datastax.driver.core.AuthProvider;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.policies.AddressTranslator;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
+import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Data source abstraction to specify configuration of the Cassandra session to be used.
+ */
+public class DataSource implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Null object, used as a replacement for those Cassandra connection options which
+     * don't support serialization (RetryPolicy, LoadBalancingPolicy and etc).
+     */
+    private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9");
+
+    /** Number of rows to immediately fetch in CQL statement execution. */
+    private Integer fetchSize;
+
+    /** Consistency level for READ operations. */
+    private ConsistencyLevel readConsistency;
+
+    /** Consistency level for WRITE operations. */
+    private ConsistencyLevel writeConsistency;
+
+    /** Username to use for authentication. */
+    private String user;
+
+    /** Password to use for authentication. */
+    private String pwd;
+
+    /** Port to use for Cassandra connection. */
+    private Integer port;
+
+    /** List of contact points to connect to Cassandra cluster. */
+    private List<InetAddress> contactPoints;
+
+    /** List of contact points with ports to connect to Cassandra cluster. */
+    private List<InetSocketAddress> contactPointsWithPorts;
+
+    /** Maximum time to wait for schema agreement before returning from a DDL query. */
+    private Integer maxSchemaAgreementWaitSeconds;
+
+    /** The native protocol version to use. */
+    private Integer protoVer;
+
+    /** Compression to use for the transport. */
+    private String compression;
+
+    /** Use SSL for communications with Cassandra. */
+    private Boolean useSSL;
+
+    /** Enables metrics collection. */
+    private Boolean collectMetrix;
+
+    /** Enables JMX reporting of the metrics. */
+    private Boolean jmxReporting;
+
+    /** Credentials to use for authentication. */
+    private Credentials creds;
+
+    /** Load balancing policy to use. */
+    private LoadBalancingPolicy loadBalancingPlc;
+
+    /** Reconnection policy to use. */
+    private ReconnectionPolicy reconnectionPlc;
+
+    /** Retry policy to use. */
+    private RetryPolicy retryPlc;
+
+    /** Address translator to use. */
+    private AddressTranslator addrTranslator;
+
+    /** Speculative execution policy to use. */
+    private SpeculativeExecutionPolicy speculativeExecutionPlc;
+
+    /** Authentication provider to use. */
+    private AuthProvider authProvider;
+
+    /** SSL options to use. */
+    private SSLOptions sslOptions;
+
+    /** Connection pooling options to use. */
+    private PoolingOptions poolingOptions;
+
+    /** Socket options to use. */
+    private SocketOptions sockOptions;
+
+    /** Netty options to use for connection. */
+    private NettyOptions nettyOptions;
+
+    /** Cassandra session wrapper instance. */
+    private volatile CassandraSession ses;
+
+    /**
+     * Sets user name to use for authentication.
+     *
+     * @param user user name
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setUser(String user) {
+        this.user = user;
+
+        invalidate();
+    }
+
+    /**
+     * Sets password to use for authentication.
+     *
+     * @param pwd password
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setPassword(String pwd) {
+        this.pwd = pwd;
+
+        invalidate();
+    }
+
+    /**
+     * Sets port to use for Cassandra connection.
+     *
+     * @param port port
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setPort(int port) {
+        this.port = port;
+
+        invalidate();
+    }
+
+    /**
+     * Sets list of contact points to connect to Cassandra cluster.
+     *
+     * @param points contact points
+     */
+    public void setContactPoints(String... points) {
+        if (points == null || points.length == 0)
+            return;
+
+        for (String point : points) {
+            if (point.contains(":")) {
+                if (contactPointsWithPorts == null)
+                    contactPointsWithPorts = new LinkedList<>();
+
+                String[] chunks = point.split(":");
+
+                try {
+                    contactPointsWithPorts.add(InetSocketAddress.createUnresolved(chunks[0].trim(), Integer.parseInt(chunks[1].trim())));
+                }
+                catch (Throwable e) {
+                    throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
+                }
+            }
+            else {
+                if (contactPoints == null)
+                    contactPoints = new LinkedList<>();
+
+                try {
+                    contactPoints.add(InetAddress.getByName(point));
+                }
+                catch (Throwable e) {
+                    throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
+                }
+            }
+        }
+
+        invalidate();
+    }
+
+    /** Sets maximum time to wait for schema agreement before returning from a DDL query. */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setMaxSchemaAgreementWaitSeconds(int seconds) {
+        maxSchemaAgreementWaitSeconds = seconds;
+
+        invalidate();
+    }
+
+    /**
+     * Sets the native protocol version to use.
+     *
+     * @param ver version number
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setProtocolVersion(int ver) {
+        protoVer = ver;
+
+        invalidate();
+    }
+
+    /**
+     * Sets compression algorithm to use for the transport.
+     *
+     * @param compression Compression algorithm.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setCompression(String compression) {
+        this.compression = compression == null || compression.trim().isEmpty() ? null : compression.trim();
+
+        try {
+            if (this.compression != null)
+                ProtocolOptions.Compression.valueOf(this.compression);
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Incorrect compression '" + compression + "' specified for Cassandra connection", e);
+        }
+
+        invalidate();
+    }
+
+    /**
+     * Enables SSL for communications with Cassandra.
+     *
+     * @param use Flag to enable/disable SSL.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setUseSSL(boolean use) {
+        useSSL = use;
+
+        invalidate();
+    }
+
+    /**
+     * Enables metrics collection.
+     *
+     * @param collect Flag to enable/disable metrics collection.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setCollectMetrix(boolean collect) {
+        collectMetrix = collect;
+
+        invalidate();
+    }
+
+    /**
+     * Enables JMX reporting of the metrics.
+     *
+     * @param enableReporting Flag to enable/disable JMX reporting.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setJmxReporting(boolean enableReporting) {
+        jmxReporting = enableReporting;
+
+        invalidate();
+    }
+
+    /**
+     * Sets number of rows to immediately fetch in CQL statement execution.
+     *
+     * @param size Number of rows to fetch.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setFetchSize(int size) {
+        fetchSize = size;
+
+        invalidate();
+    }
+
+    /**
+     * Set consistency level for READ operations.
+     *
+     * @param level Consistency level.
+     */
+    public void setReadConsistency(String level) {
+        readConsistency = parseConsistencyLevel(level);
+
+        invalidate();
+    }
+
+    /**
+     * Set consistency level for WRITE operations.
+     *
+     * @param level Consistency level.
+     */
+    public void setWriteConsistency(String level) {
+        writeConsistency = parseConsistencyLevel(level);
+
+        invalidate();
+    }
+
+    /**
+     * Sets credentials to use for authentication.
+     *
+     * @param creds Credentials.
+     */
+    public void setCredentials(Credentials creds) {
+        this.creds = creds;
+
+        invalidate();
+    }
+
+    /**
+     * Sets load balancing policy.
+     *
+     * @param plc Load balancing policy.
+     */
+    public void setLoadBalancingPolicy(LoadBalancingPolicy plc) {
+        loadBalancingPlc = plc;
+
+        invalidate();
+    }
+
+    /**
+     * Sets reconnection policy.
+     *
+     * @param plc Reconnection policy.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setReconnectionPolicy(ReconnectionPolicy plc) {
+        reconnectionPlc = plc;
+
+        invalidate();
+    }
+
+    /**
+     * Sets retry policy.
+     *
+     * @param plc Retry policy.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setRetryPolicy(RetryPolicy plc) {
+        retryPlc = plc;
+
+        invalidate();
+    }
+
+    /**
+     * Sets address translator.
+     *
+     * @param translator Address translator.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setAddressTranslator(AddressTranslator translator) {
+        addrTranslator = translator;
+
+        invalidate();
+    }
+
+    /**
+     * Sets speculative execution policy.
+     *
+     * @param plc Speculative execution policy.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) {
+        speculativeExecutionPlc = plc;
+
+        invalidate();
+    }
+
+    /**
+     * Sets authentication provider.
+     *
+     * @param provider Authentication provider.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setAuthProvider(AuthProvider provider) {
+        authProvider = provider;
+
+        invalidate();
+    }
+
+    /**
+     * Sets SSL options.
+     *
+     * @param options SSL options.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setSslOptions(SSLOptions options) {
+        sslOptions = options;
+
+        invalidate();
+    }
+
+    /**
+     * Sets pooling options.
+     *
+     * @param options pooling options to use.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setPoolingOptions(PoolingOptions options) {
+        poolingOptions = options;
+
+        invalidate();
+    }
+
+    /**
+     * Sets socket options to use.
+     *
+     * @param options Socket options.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setSocketOptions(SocketOptions options) {
+        sockOptions = options;
+
+        invalidate();
+    }
+
+    /**
+     * Sets netty options to use.
+     *
+     * @param options netty options.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setNettyOptions(NettyOptions options) {
+        nettyOptions = options;
+
+        invalidate();
+    }
+
+    /**
+     * Creates Cassandra session wrapper if it wasn't created yet and returns it
+     *
+     * @param log logger
+     * @return Cassandra session wrapper
+     */
+    @SuppressWarnings("deprecation")
+    public synchronized CassandraSession session(IgniteLogger log) {
+        if (ses != null)
+            return ses;
+
+        Cluster.Builder builder = Cluster.builder();
+
+        if (user != null)
+            builder = builder.withCredentials(user, pwd);
+
+        if (port != null)
+            builder = builder.withPort(port);
+
+        if (contactPoints != null)
+            builder = builder.addContactPoints(contactPoints);
+
+        if (contactPointsWithPorts != null)
+            builder = builder.addContactPointsWithPorts(contactPointsWithPorts);
+
+        if (maxSchemaAgreementWaitSeconds != null)
+            builder = builder.withMaxSchemaAgreementWaitSeconds(maxSchemaAgreementWaitSeconds);
+
+        if (protoVer != null)
+            builder = builder.withProtocolVersion(ProtocolVersion.fromInt(protoVer));
+
+        if (compression != null) {
+            try {
+                builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compression.trim().toLowerCase()));
+            }
+            catch (IllegalArgumentException e) {
+                throw new IgniteException("Incorrect compression option '" + compression + "' specified for Cassandra connection", e);
+            }
+        }
+
+        if (useSSL != null && useSSL)
+            builder = builder.withSSL();
+
+        if (sslOptions != null)
+            builder = builder.withSSL(sslOptions);
+
+        if (collectMetrix != null && !collectMetrix)
+            builder = builder.withoutMetrics();
+
+        if (jmxReporting != null && !jmxReporting)
+            builder = builder.withoutJMXReporting();
+
+        if (creds != null)
+            builder = builder.withCredentials(creds.getUser(), creds.getPassword());
+
+        if (loadBalancingPlc != null)
+            builder = builder.withLoadBalancingPolicy(loadBalancingPlc);
+
+        if (reconnectionPlc != null)
+            builder = builder.withReconnectionPolicy(reconnectionPlc);
+
+        if (retryPlc != null)
+            builder = builder.withRetryPolicy(retryPlc);
+
+        if (addrTranslator != null)
+            builder = builder.withAddressTranslator(addrTranslator);
+
+        if (speculativeExecutionPlc != null)
+            builder = builder.withSpeculativeExecutionPolicy(speculativeExecutionPlc);
+
+        if (authProvider != null)
+            builder = builder.withAuthProvider(authProvider);
+
+        if (poolingOptions != null)
+            builder = builder.withPoolingOptions(poolingOptions);
+
+        if (sockOptions != null)
+            builder = builder.withSocketOptions(sockOptions);
+
+        if (nettyOptions != null)
+            builder = builder.withNettyOptions(nettyOptions);
+
+        return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(fetchSize);
+        out.writeObject(readConsistency);
+        out.writeObject(writeConsistency);
+        U.writeString(out, user);
+        U.writeString(out, pwd);
+        out.writeObject(port);
+        out.writeObject(contactPoints);
+        out.writeObject(contactPointsWithPorts);
+        out.writeObject(maxSchemaAgreementWaitSeconds);
+        out.writeObject(protoVer);
+        U.writeString(out, compression);
+        out.writeObject(useSSL);
+        out.writeObject(collectMetrix);
+        out.writeObject(jmxReporting);
+        out.writeObject(creds);
+        writeObject(out, loadBalancingPlc);
+        writeObject(out, reconnectionPlc);
+        writeObject(out, addrTranslator);
+        writeObject(out, speculativeExecutionPlc);
+        writeObject(out, authProvider);
+        writeObject(out, sslOptions);
+        writeObject(out, poolingOptions);
+        writeObject(out, sockOptions);
+        writeObject(out, nettyOptions);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        fetchSize = (Integer)in.readObject();
+        readConsistency = (ConsistencyLevel)in.readObject();
+        writeConsistency = (ConsistencyLevel)in.readObject();
+        user = U.readString(in);
+        pwd = U.readString(in);
+        port = (Integer)in.readObject();
+        contactPoints = (List<InetAddress>)in.readObject();
+        contactPointsWithPorts = (List<InetSocketAddress>)in.readObject();
+        maxSchemaAgreementWaitSeconds = (Integer)in.readObject();
+        protoVer = (Integer)in.readObject();
+        compression = U.readString(in);
+        useSSL = (Boolean)in.readObject();
+        collectMetrix = (Boolean)in.readObject();
+        jmxReporting = (Boolean)in.readObject();
+        creds = (Credentials)in.readObject();
+        loadBalancingPlc = (LoadBalancingPolicy)readObject(in);
+        reconnectionPlc = (ReconnectionPolicy)readObject(in);
+        addrTranslator = (AddressTranslator)readObject(in);
+        speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in);
+        authProvider = (AuthProvider)readObject(in);
+        sslOptions = (SSLOptions)readObject(in);
+        poolingOptions = (PoolingOptions)readObject(in);
+        sockOptions = (SocketOptions)readObject(in);
+        nettyOptions = (NettyOptions)readObject(in);
+    }
+
+    /**
+     * Helper method used to serialize class members
+     * @param out the stream to write the object to
+     * @param obj the object to be written
+     * @throws IOException Includes any I/O exceptions that may occur
+     */
+    private void writeObject(ObjectOutput out, Object obj) throws IOException {
+        out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj);
+    }
+
+    /**
+     * Helper method used to deserialize class members
+     * @param in the stream to read data from in order to restore the object
+     * @throws IOException Includes any I/O exceptions that may occur
+     * @throws ClassNotFoundException If the class for an object being restored cannot be found
+     * @return deserialized object
+     */
+    private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+        Object obj = in.readObject();
+        return NULL_OBJECT.equals(obj) ? null : obj;
+    }
+
+    /**
+     * Parses consistency level provided as string.
+     *
+     * @param level consistency level string.
+     *
+     * @return consistency level.
+     */
+    private ConsistencyLevel parseConsistencyLevel(String level) {
+        if (level == null)
+            return null;
+
+        try {
+            return ConsistencyLevel.valueOf(level.trim().toUpperCase());
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Incorrect consistency level '" + level + "' specified for Cassandra connection", e);
+        }
+    }
+
+    /**
+     * Invalidates session.
+     */
+    private synchronized void invalidate() {
+        ses = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
new file mode 100644
index 0000000..46ebdc5
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.datasource;
+
+/**
+ * Simple implementation of {@link Credentials} which just uses its constructor to hold user/password values.
+ */
+public class PlainCredentials implements Credentials {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** User name. */
+    private String user;
+
+    /** User password. */
+    private String pwd;
+
+    /**
+     * Creates credentials object.
+     *
+     * @param user User name.
+     * @param pwd User password.
+     */
+    public PlainCredentials(String user, String pwd) {
+        this.user = user;
+        this.pwd = pwd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getUser() {
+        return user;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getPassword() {
+        return pwd;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java
new file mode 100644
index 0000000..d5003ae
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains data source implementation
+ */
+package org.apache.ignite.cache.store.cassandra.datasource;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java
new file mode 100644
index 0000000..46f5635
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains {@link org.apache.ignite.cache.store.CacheStore} implementation backed by Cassandra database
+ */
+package org.apache.ignite.cache.store.cassandra;
\ No newline at end of file


Mime
View raw message