ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shro...@apache.org
Subject [16/50] [abbrv] ignite git commit: IGNITE-3609 Utilize Cassandra logged batches for transactions. - Fixes #1111.
Date Mon, 07 Nov 2016 04:30:03 GMT
IGNITE-3609 Utilize Cassandra logged batches for transactions. - Fixes #1111.

Signed-off-by: Alexey Kuznetsov <akuznetsov@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b8aca64
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b8aca64
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b8aca64

Branch: refs/heads/ignite-2788
Commit: 3b8aca64b8ebe6ba21f5d02f50cf69ad46dbbc95
Parents: 00576d8
Author: Igor <irudyak@gmail.com>
Authored: Fri Sep 30 14:39:30 2016 +0700
Committer: Alexey Kuznetsov <akuznetsov@apache.org>
Committed: Fri Sep 30 14:39:30 2016 +0700

----------------------------------------------------------------------
 .../store/cassandra/CassandraCacheStore.java    | 112 ++++--
 .../store/cassandra/common/CassandraHelper.java |  29 +-
 .../store/cassandra/persistence/PojoField.java  |   9 +-
 .../cassandra/persistence/PojoValueField.java   |   2 -
 .../cassandra/session/CassandraSession.java     |  10 +
 .../cassandra/session/CassandraSessionImpl.java | 113 +++++-
 .../session/transaction/BaseMutation.java       |  68 ++++
 .../session/transaction/DeleteMutation.java     |  57 +++
 .../cassandra/session/transaction/Mutation.java |  64 +++
 .../session/transaction/WriteMutation.java      |  60 +++
 .../session/transaction/package-info.java       |  21 +
 .../tests/CassandraDirectPersistenceTest.java   | 396 ++++++++++++++++---
 .../ignite/tests/CassandraLocalServer.java      |  58 +++
 .../apache/ignite/tests/DDLGeneratorTest.java   |  35 +-
 .../ignite/tests/IgnitePersistentStoreTest.java | 265 +++++++++++++
 .../org/apache/ignite/tests/pojos/Product.java  | 123 ++++++
 .../apache/ignite/tests/pojos/ProductOrder.java | 148 +++++++
 .../ignite/tests/utils/CacheStoreHelper.java    |  19 +-
 .../ignite/tests/utils/TestCacheSession.java    |  12 +-
 .../ignite/tests/utils/TestTransaction.java     | 133 +++++++
 .../apache/ignite/tests/utils/TestsHelper.java  | 299 +++++++++++++-
 .../tests/persistence/pojo/ignite-config.xml    |  41 +-
 .../ignite/tests/persistence/pojo/order.xml     |  21 +
 .../ignite/tests/persistence/pojo/product.xml   |  21 +
 .../store/src/test/resources/tests.properties   |  15 +
 25 files changed, 2005 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index 6aef0c4..aead39a 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -22,8 +22,10 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -41,6 +43,9 @@ import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
 import org.apache.ignite.cache.store.cassandra.session.ExecutionAssistant;
 import org.apache.ignite.cache.store.cassandra.session.GenericBatchExecutionAssistant;
 import org.apache.ignite.cache.store.cassandra.session.LoadCacheCustomQueryWorker;
+import org.apache.ignite.cache.store.cassandra.session.transaction.DeleteMutation;
+import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
+import org.apache.ignite.cache.store.cassandra.session.transaction.WriteMutation;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.logger.NullLogger;
@@ -54,14 +59,16 @@ import org.apache.ignite.resources.LoggerResource;
  * @param <V> Ignite cache value type.
  */
 public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
-    /** Connection attribute property name. */
-    private static final String ATTR_CONN_PROP = "CASSANDRA_STORE_CONNECTION";
+    /** Buffer to store mutations performed withing transaction. */
+    private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER";
 
     /** Auto-injected store session. */
+    @SuppressWarnings("unused")
     @CacheStoreSessionResource
     private CacheStoreSession storeSes;
 
     /** Auto-injected logger instance. */
+    @SuppressWarnings("unused")
     @LoggerResource
     private IgniteLogger log;
 
@@ -127,12 +134,22 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
 
     /** {@inheritDoc} */
     @Override public void sessionEnd(boolean commit) throws CacheWriterException {
-        if (storeSes == null || storeSes.transaction() == null)
+        if (!storeSes.isWithinTransaction())
             return;
 
-        CassandraSession cassandraSes = (CassandraSession) storeSes.properties().remove(ATTR_CONN_PROP);
+        List<Mutation> mutations = mutations();
+        if (mutations == null || mutations.isEmpty())
+            return;
 
-        U.closeQuiet(cassandraSes);
+        CassandraSession ses = getCassandraSession();
+
+        try {
+            ses.execute(mutations);
+        }
+        finally {
+            mutations.clear();
+            U.closeQuiet(ses);
+        }
     }
 
     /** {@inheritDoc} */
@@ -182,7 +199,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             });
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -235,7 +252,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             }, keys);
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -244,6 +261,11 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
         if (entry == null || entry.getKey() == null)
             return;
 
+        if (storeSes.isWithinTransaction()) {
+            accumulate(new WriteMutation(entry, cassandraTable(), controller));
+            return;
+        }
+
         CassandraSession ses = getCassandraSession();
 
         try {
@@ -285,7 +307,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             });
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -294,6 +316,13 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
         if (entries == null || entries.isEmpty())
             return;
 
+        if (storeSes.isWithinTransaction()) {
+            for (Cache.Entry<?, ?> entry : entries)
+                accumulate(new WriteMutation(entry, cassandraTable(), controller));
+
+            return;
+        }
+
         CassandraSession ses = getCassandraSession();
 
         try {
@@ -331,7 +360,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             }, entries);
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -340,6 +369,11 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
         if (key == null)
             return;
 
+        if (storeSes.isWithinTransaction()) {
+            accumulate(new DeleteMutation(key, cassandraTable(), controller));
+            return;
+        }
+
         CassandraSession ses = getCassandraSession();
 
         try {
@@ -382,7 +416,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             });
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -391,6 +425,13 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
         if (keys == null || keys.isEmpty())
             return;
 
+        if (storeSes.isWithinTransaction()) {
+            for (Object key : keys)
+                accumulate(new DeleteMutation(key, cassandraTable(), controller));
+
+            return;
+        }
+
         CassandraSession ses = getCassandraSession();
 
         try {
@@ -422,7 +463,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             }, keys);
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -433,36 +474,43 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
      * @return Cassandra session wrapper.
      */
     private CassandraSession getCassandraSession() {
-        if (storeSes == null || storeSes.transaction() == null)
-            return dataSrc.session(log != null ? log : new NullLogger());
-
-        CassandraSession ses = (CassandraSession) storeSes.properties().get(ATTR_CONN_PROP);
-
-        if (ses == null) {
-            ses = dataSrc.session(log != null ? log : new NullLogger());
-            storeSes.properties().put(ATTR_CONN_PROP, ses);
-        }
+        return dataSrc.session(log != null ? log : new NullLogger());
+    }
 
-        return ses;
+    /**
+     * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE).
+     *
+     * @return Table name.
+     */
+    private String cassandraTable() {
+        return controller.getPersistenceSettings().getTable() != null ?
+            controller.getPersistenceSettings().getTable() : storeSes.cacheName().trim().toLowerCase();
     }
 
     /**
-     * Releases Cassandra related resources.
+     * Accumulates mutation in the transaction buffer.
      *
-     * @param ses Cassandra session wrapper.
+     * @param mutation Mutation operation.
      */
-    private void closeCassandraSession(CassandraSession ses) {
-        if (ses != null && (storeSes == null || storeSes.transaction() == null))
-            U.closeQuiet(ses);
+    private void accumulate(Mutation mutation) {
+        //noinspection unchecked
+        List<Mutation> mutations = (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER);
+
+        if (mutations == null) {
+            mutations = new LinkedList<>();
+            storeSes.properties().put(TRANSACTION_BUFFER, mutations);
+        }
+
+        mutations.add(mutation);
     }
 
     /**
-     * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE).
+     * Returns all the mutations performed withing transaction.
      *
-     * @return Table name.
+     * @return Mutations
      */
-    private String cassandraTable() {
-        return controller.getPersistenceSettings().getTable() != null ?
-            controller.getPersistenceSettings().getTable() : storeSes.cacheName().toLowerCase();
+    private List<Mutation> mutations() {
+        //noinspection unchecked
+        return (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
index 9066112..badd5df 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
@@ -20,9 +20,13 @@ package org.apache.ignite.cache.store.cassandra.common;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.core.exceptions.ReadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.regex.Pattern;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -36,8 +40,15 @@ public class CassandraHelper {
     /** Cassandra error message if trying to create table inside nonexistent keyspace. */
     private static final Pattern KEYSPACE_EXIST_ERROR2 = Pattern.compile("Cannot add table '[0-9a-zA-Z_]+' to non existing keyspace.*");
 
+    /** Cassandra error message if trying to create table inside nonexistent keyspace. */
+    private static final Pattern KEYSPACE_EXIST_ERROR3 = Pattern.compile("Error preparing query, got ERROR INVALID: " +
+            "Keyspace [0-9a-zA-Z_]+ does not exist");
+
+    /** Cassandra error message if specified table doesn't exist. */
+    private static final Pattern TABLE_EXIST_ERROR1 = Pattern.compile("unconfigured table [0-9a-zA-Z_]+");
+
     /** Cassandra error message if specified table doesn't exist. */
-    private static final Pattern TABLE_EXIST_ERROR = Pattern.compile("unconfigured table [0-9a-zA-Z_]+");
+    private static final String TABLE_EXIST_ERROR2 = "Error preparing query, got ERROR INVALID: unconfigured table";
 
     /** Cassandra error message if trying to use prepared statement created from another session. */
     private static final String PREP_STATEMENT_CLUSTER_INSTANCE_ERROR = "You may have used a PreparedStatement that " +
@@ -85,11 +96,25 @@ public class CassandraHelper {
     public static boolean isTableAbsenceError(Throwable e) {
         while (e != null) {
             if (e instanceof InvalidQueryException &&
-                (TABLE_EXIST_ERROR.matcher(e.getMessage()).matches() ||
+                (TABLE_EXIST_ERROR1.matcher(e.getMessage()).matches() ||
                     KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() ||
                     KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches()))
                 return true;
 
+            if (e instanceof NoHostAvailableException && ((NoHostAvailableException) e).getErrors() != null) {
+                NoHostAvailableException ex = (NoHostAvailableException)e;
+
+                for (Map.Entry<InetSocketAddress, Throwable> entry : ex.getErrors().entrySet()) {
+                    //noinspection ThrowableResultOfMethodCallIgnored
+                    Throwable error = entry.getValue();
+
+                    if (error instanceof DriverException &&
+                        (error.getMessage().contains(TABLE_EXIST_ERROR2) ||
+                             KEYSPACE_EXIST_ERROR3.matcher(error.getMessage()).matches()))
+                        return true;
+                }
+            }
+
             e = e.getCause();
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
index d708a34..78e75a9 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
@@ -85,11 +85,10 @@ public abstract class PojoField implements Serializable {
     public PojoField(PropertyDescriptor desc) {
         this.name = desc.getName();
 
-        QuerySqlField sqlField = desc.getReadMethod() != null ?
-            desc.getReadMethod().getAnnotation(QuerySqlField.class) :
-            desc.getWriteMethod() == null ?
-                null :
-                desc.getWriteMethod().getAnnotation(QuerySqlField.class);
+        QuerySqlField sqlField = desc.getReadMethod() != null &&
+                desc.getReadMethod().getAnnotation(QuerySqlField.class) != null ?
+                desc.getReadMethod().getAnnotation(QuerySqlField.class) :
+                    desc.getWriteMethod() == null ? null : desc.getWriteMethod().getAnnotation(QuerySqlField.class);
 
         col = sqlField != null && sqlField.name() != null &&
             !sqlField.name().trim().isEmpty() ? sqlField.name() : name.toLowerCase();

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
index c3512c3..3e636c0 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
@@ -146,7 +146,5 @@ public class PojoValueField extends PojoField {
      * @param sqlField {@link QuerySqlField} annotation.
      */
     protected void init(QuerySqlField sqlField) {
-        if (sqlField.index())
-            isIndexed = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
index 506982f..b0e50ec 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.cache.store.cassandra.session;
 
+import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
+
 import java.io.Closeable;
+import java.util.List;
 
 /**
  * Wrapper around Cassandra driver session, to automatically handle:
@@ -57,4 +60,11 @@ public interface CassandraSession extends Closeable {
      * @param assistant execution assistance to perform the main operation logic.
      */
     public void execute(BatchLoaderAssistant assistant);
+
+    /**
+     * Executes all the mutations performed withing Ignite transaction against Cassandra database.
+     *
+     * @param mutations Mutations.
+     */
+    public void execute(List<Mutation> mutations);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
index d2c9e97..4857fa4 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
@@ -43,6 +43,7 @@ import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
 import org.apache.ignite.cache.store.cassandra.common.RandomSleeper;
 import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
 import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool;
+import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 
 /**
@@ -162,7 +163,8 @@ public class CassandraSessionImpl implements CassandraSession {
                         throw new IgniteException(errorMsg, e);
                 }
 
-                sleeper.sleep();
+                if (!CassandraHelper.isTableAbsenceError(error))
+                    sleeper.sleep();
 
                 attempt++;
             }
@@ -320,7 +322,8 @@ public class CassandraSessionImpl implements CassandraSession {
                     handlePreparedStatementClusterError(prepStatEx);
                 }
 
-                sleeper.sleep();
+                if (!CassandraHelper.isTableAbsenceError(error))
+                    sleeper.sleep();
 
                 attempt++;
             }
@@ -402,6 +405,103 @@ public class CassandraSessionImpl implements CassandraSession {
     }
 
     /** {@inheritDoc} */
+    @Override public void execute(List<Mutation> mutations) {
+        if (mutations == null || mutations.isEmpty())
+            return;
+
+        Throwable error = null;
+        String errorMsg = "Failed to apply " + mutations.size() + " mutations performed withing Ignite " +
+                "transaction into Cassandra";
+
+        int attempt = 0;
+        boolean tableExistenceRequired = false;
+        Map<String, PreparedStatement> statements = new HashMap<>();
+        Map<String, KeyValuePersistenceSettings> tableSettings = new HashMap<>();
+        RandomSleeper sleeper = newSleeper();
+
+        incrementSessionRefs();
+
+        try {
+            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                error = null;
+
+                if (attempt != 0) {
+                    log.warning("Trying " + (attempt + 1) + " attempt to apply " + mutations.size() + " mutations " +
+                            "performed withing Ignite transaction into Cassandra");
+                }
+
+                try {
+                    BatchStatement batch = new BatchStatement();
+
+                    // accumulating all the mutations into one Cassandra logged batch
+                    for (Mutation mutation : mutations) {
+                        String key = mutation.getTable() + mutation.getClass().getName();
+                        PreparedStatement st = statements.get(key);
+
+                        if (st == null) {
+                            st = prepareStatement(mutation.getTable(), mutation.getStatement(),
+                                    mutation.getPersistenceSettings(), mutation.tableExistenceRequired());
+
+                            if (st != null)
+                                statements.put(key, st);
+                        }
+
+                        if (st != null)
+                            batch.add(mutation.bindStatement(st));
+
+                        if (attempt == 0) {
+                            if (mutation.tableExistenceRequired()) {
+                                tableExistenceRequired = true;
+
+                                if (!tableSettings.containsKey(mutation.getTable()))
+                                    tableSettings.put(mutation.getTable(), mutation.getPersistenceSettings());
+                            }
+                        }
+                    }
+
+                    // committing logged batch into Cassandra
+                    if (batch.size() > 0)
+                        session().execute(tuneStatementExecutionOptions(batch));
+
+                    return;
+                } catch (Throwable e) {
+                    error = e;
+
+                    if (CassandraHelper.isTableAbsenceError(e)) {
+                        if (tableExistenceRequired) {
+                            for (Map.Entry<String, KeyValuePersistenceSettings> entry : tableSettings.entrySet())
+                                handleTableAbsenceError(entry.getKey(), entry.getValue());
+                        }
+                        else
+                            return;
+                    } else if (CassandraHelper.isHostsAvailabilityError(e)) {
+                        if (handleHostsAvailabilityError(e, attempt, errorMsg))
+                            statements.clear();
+                    } else if (CassandraHelper.isPreparedStatementClusterError(e)) {
+                        handlePreparedStatementClusterError(e);
+                        statements.clear();
+                    } else {
+                        // For an error which we don't know how to handle, we will not try next attempts and terminate.
+                        throw new IgniteException(errorMsg, e);
+                    }
+                }
+
+                if (!CassandraHelper.isTableAbsenceError(error))
+                    sleeper.sleep();
+
+                attempt++;
+            }
+        } catch (Throwable e) {
+            error = e;
+        } finally {
+            decrementSessionRefs();
+        }
+
+        log.error(errorMsg, error);
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /** {@inheritDoc} */
     @Override public synchronized void close() throws IOException {
         if (decrementSessionRefs() == 0 && ses != null) {
             SessionPool.put(this, ses);
@@ -517,7 +617,8 @@ public class CassandraSessionImpl implements CassandraSession {
                     error = e;
                 }
 
-                sleeper.sleep();
+                if (!CassandraHelper.isTableAbsenceError(error))
+                    sleeper.sleep();
 
                 attempt++;
             }
@@ -585,7 +686,7 @@ public class CassandraSessionImpl implements CassandraSession {
                 log.info("-----------------------------------------------------------------------");
                 log.info("Creating Cassandra table '" + tableFullName + "'");
                 log.info("-----------------------------------------------------------------------\n\n" +
-                        tableFullName + "\n");
+                        settings.getTableDDLStatement(table) + "\n");
                 log.info("-----------------------------------------------------------------------");
                 session().execute(settings.getTableDDLStatement(table));
                 log.info("Cassandra table '" + tableFullName + "' was successfully created");
@@ -634,10 +735,14 @@ public class CassandraSessionImpl implements CassandraSession {
 
         while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
             try {
+                log.info("-----------------------------------------------------------------------");
                 log.info("Creating indexes for Cassandra table '" + tableFullName + "'");
+                log.info("-----------------------------------------------------------------------");
 
                 for (String statement : indexDDLStatements) {
                     try {
+                        log.info(statement);
+                        log.info("-----------------------------------------------------------------------");
                         session().execute(statement);
                     }
                     catch (AlreadyExistsException ignored) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java
new file mode 100644
index 0000000..2625e87
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+
+/**
+ * Base class to inherit from to implement specific mutations operation.
+ */
+public abstract class BaseMutation implements Mutation {
+    /** Cassandra table to use. */
+    private final String table;
+
+    /** Persistence controller to be utilized for mutation. */
+    private final PersistenceController ctrl;
+
+    /**
+     * Creates instance of mutation operation.
+     *
+     * @param table Cassandra table which should be used for the mutation.
+     * @param ctrl Persistence controller to use.
+     */
+    public BaseMutation(String table, PersistenceController ctrl) {
+        if (table == null || table.trim().isEmpty())
+            throw new IllegalArgumentException("Table name should be specified");
+
+        if (ctrl == null)
+            throw new IllegalArgumentException("Persistence controller should be specified");
+
+        this.table = table;
+        this.ctrl = ctrl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getTable() {
+        return table;
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+        return ctrl.getPersistenceSettings();
+    }
+
+    /**
+     * Service method to get persistence controller instance
+     *
+     * @return Persistence controller to use for the mutation
+     */
+    protected PersistenceController controller() {
+        return ctrl;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java
new file mode 100644
index 0000000..79c0bfe
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+
+/**
+ * Mutation which deletes object from Cassandra.
+ */
+public class DeleteMutation extends BaseMutation {
+    /** Ignite cache key of the object which should be deleted. */
+    private final Object key;
+
+    /**
+     * Creates instance of delete mutation operation.
+     *
+     * @param key Ignite cache key of the object which should be deleted.
+     * @param table Cassandra table which should be used for the mutation.
+     * @param ctrl Persistence controller to use.
+     */
+    public DeleteMutation(Object key, String table, PersistenceController ctrl) {
+        super(table, ctrl);
+        this.key = key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tableExistenceRequired() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getStatement() {
+        return controller().getDeleteStatement(getTable());
+    }
+
+    /** {@inheritDoc} */
+    @Override public BoundStatement bindStatement(PreparedStatement statement) {
+        return controller().bindKey(statement, key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
new file mode 100644
index 0000000..cb014f8
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Provides information about particular mutation operation performed withing transaction.
+ */
+public interface Mutation {
+    /**
+     * Cassandra table to use for an operation.
+     *
+     * @return Table name.
+     */
+    public String getTable();
+
+    /**
+     * Indicates if Cassandra tables existence is required for this operation.
+     *
+     * @return {@code true} true if table existence required.
+     */
+    public boolean tableExistenceRequired();
+
+    /**
+     *  Returns Ignite cache key/value persistence settings.
+     *
+     * @return persistence settings.
+     */
+    public KeyValuePersistenceSettings getPersistenceSettings();
+
+    /**
+     * Returns unbind CLQ statement for to be executed.
+     *
+     * @return Unbind CQL statement.
+     */
+    public String getStatement();
+
+    /**
+     * Binds prepared statement to current Cassandra session.
+     *
+     * @param statement Statement.
+     * @param obj Parameters for statement binding.
+     * @return Bounded statement.
+     */
+    public BoundStatement bindStatement(PreparedStatement statement);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java
new file mode 100644
index 0000000..3c74378
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import javax.cache.Cache;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+
+/**
+ * Mutation which writes(inserts) object into Cassandra.
+ */
+public class WriteMutation extends BaseMutation {
+    /** Ignite cache entry to be inserted into Cassandra. */
+    private final Cache.Entry entry;
+
+    /**
+     * Creates instance of delete mutation operation.
+     *
+     * @param entry Ignite cache entry to be inserted into Cassandra.
+     * @param table Cassandra table which should be used for the mutation.
+     * @param ctrl Persistence controller to use.
+     */
+    public WriteMutation(Cache.Entry entry, String table, PersistenceController ctrl) {
+        super(table, ctrl);
+        this.entry = entry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tableExistenceRequired() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getStatement() {
+        return controller().getWriteStatement(getTable());
+    }
+
+    /** {@inheritDoc} */
+    @Override public BoundStatement bindStatement(PreparedStatement statement) {
+        return controller().bindKeyValue(statement, entry.getKey(), entry.getValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java
new file mode 100644
index 0000000..7141845
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains mutations implementation, to store changes made inside Ignite transaction
+ */
+package org.apache.ignite.cache.store.cassandra.session.transaction;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
index 9974898..f9e9649 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
@@ -18,14 +18,21 @@
 package org.apache.ignite.tests;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.tests.pojos.Person;
 import org.apache.ignite.tests.pojos.PersonId;
+import org.apache.ignite.tests.pojos.Product;
+import org.apache.ignite.tests.pojos.ProductOrder;
 import org.apache.ignite.tests.utils.CacheStoreHelper;
 import org.apache.ignite.tests.utils.CassandraHelper;
+import org.apache.ignite.tests.utils.TestCacheSession;
+import org.apache.ignite.tests.utils.TestTransaction;
 import org.apache.ignite.tests.utils.TestsHelper;
+import org.apache.ignite.transactions.Transaction;
 import org.apache.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -113,31 +120,31 @@ public class CassandraDirectPersistenceTest {
 
         LOGGER.info("Running PRIMITIVE strategy write tests");
 
-        LOGGER.info("Running single operation write tests");
+        LOGGER.info("Running single write operation tests");
         store1.write(longEntries.iterator().next());
         store2.write(strEntries.iterator().next());
-        LOGGER.info("Single operation write tests passed");
+        LOGGER.info("Single write operation tests passed");
 
-        LOGGER.info("Running bulk operation write tests");
+        LOGGER.info("Running bulk write operation tests");
         store1.writeAll(longEntries);
         store2.writeAll(strEntries);
-        LOGGER.info("Bulk operation write tests passed");
+        LOGGER.info("Bulk write operation tests passed");
 
         LOGGER.info("PRIMITIVE strategy write tests passed");
 
         LOGGER.info("Running PRIMITIVE strategy read tests");
 
-        LOGGER.info("Running single operation read tests");
+        LOGGER.info("Running single read operation tests");
 
         LOGGER.info("Running real keys read tests");
 
         Long longVal = (Long)store1.load(longEntries.iterator().next().getKey());
         if (!longEntries.iterator().next().getValue().equals(longVal))
-            throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
 
         String strVal = (String)store2.load(strEntries.iterator().next().getKey());
         if (!strEntries.iterator().next().getValue().equals(strVal))
-            throw new RuntimeException("String values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("String values were incorrectly deserialized from Cassandra");
 
         LOGGER.info("Running fake keys read tests");
 
@@ -149,31 +156,31 @@ public class CassandraDirectPersistenceTest {
         if (strVal != null)
             throw new RuntimeException("String value with fake key '-1' was found in Cassandra");
 
-        LOGGER.info("Single operation read tests passed");
+        LOGGER.info("Single read operation tests passed");
 
-        LOGGER.info("Running bulk operation read tests");
+        LOGGER.info("Running bulk read operation tests");
 
         LOGGER.info("Running real keys read tests");
 
         Map longValues = store1.loadAll(TestsHelper.getKeys(longEntries));
         if (!TestsHelper.checkCollectionsEqual(longValues, longEntries))
-            throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
 
         Map strValues = store2.loadAll(TestsHelper.getKeys(strEntries));
         if (!TestsHelper.checkCollectionsEqual(strValues, strEntries))
-            throw new RuntimeException("String values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("String values were incorrectly deserialized from Cassandra");
 
         LOGGER.info("Running fake keys read tests");
 
         longValues = store1.loadAll(fakeLongKeys);
         if (!TestsHelper.checkCollectionsEqual(longValues, longEntries))
-            throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
 
         strValues = store2.loadAll(fakeStrKeys);
         if (!TestsHelper.checkCollectionsEqual(strValues, strEntries))
-            throw new RuntimeException("String values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("String values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Bulk operation read tests passed");
+        LOGGER.info("Bulk read operation tests passed");
 
         LOGGER.info("PRIMITIVE strategy read tests passed");
 
@@ -219,53 +226,53 @@ public class CassandraDirectPersistenceTest {
 
         LOGGER.info("Running BLOB strategy write tests");
 
-        LOGGER.info("Running single operation write tests");
+        LOGGER.info("Running single write operation tests");
         store1.write(longEntries.iterator().next());
         store2.write(personEntries.iterator().next());
         store3.write(personEntries.iterator().next());
-        LOGGER.info("Single operation write tests passed");
+        LOGGER.info("Single write operation tests passed");
 
-        LOGGER.info("Running bulk operation write tests");
+        LOGGER.info("Running bulk write operation tests");
         store1.writeAll(longEntries);
         store2.writeAll(personEntries);
         store3.writeAll(personEntries);
-        LOGGER.info("Bulk operation write tests passed");
+        LOGGER.info("Bulk write operation tests passed");
 
         LOGGER.info("BLOB strategy write tests passed");
 
         LOGGER.info("Running BLOB strategy read tests");
 
-        LOGGER.info("Running single operation read tests");
+        LOGGER.info("Running single read operation tests");
 
         Long longVal = (Long)store1.load(longEntries.iterator().next().getKey());
         if (!longEntries.iterator().next().getValue().equals(longVal))
-            throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
 
         Person personVal = (Person)store2.load(personEntries.iterator().next().getKey());
         if (!personEntries.iterator().next().getValue().equals(personVal))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         personVal = (Person)store3.load(personEntries.iterator().next().getKey());
         if (!personEntries.iterator().next().getValue().equals(personVal))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Single operation read tests passed");
+        LOGGER.info("Single read operation tests passed");
 
-        LOGGER.info("Running bulk operation read tests");
+        LOGGER.info("Running bulk read operation tests");
 
         Map longValues = store1.loadAll(TestsHelper.getKeys(longEntries));
         if (!TestsHelper.checkCollectionsEqual(longValues, longEntries))
-            throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
 
         Map personValues = store2.loadAll(TestsHelper.getKeys(personEntries));
         if (!TestsHelper.checkPersonCollectionsEqual(personValues, personEntries, false))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         personValues = store3.loadAll(TestsHelper.getKeys(personEntries));
         if (!TestsHelper.checkPersonCollectionsEqual(personValues, personEntries, false))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Bulk operation read tests passed");
+        LOGGER.info("Bulk read operation tests passed");
 
         LOGGER.info("BLOB strategy read tests passed");
 
@@ -303,69 +310,99 @@ public class CassandraDirectPersistenceTest {
             new ClassPathResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml"),
             CassandraHelper.getAdminDataSrc());
 
+        CacheStore productStore = CacheStoreHelper.createCacheStore("product",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/product.xml"),
+            CassandraHelper.getAdminDataSrc());
+
+        CacheStore orderStore = CacheStoreHelper.createCacheStore("order",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/order.xml"),
+            CassandraHelper.getAdminDataSrc());
+
         Collection<CacheEntryImpl<Long, Person>> entries1 = TestsHelper.generateLongsPersonsEntries();
         Collection<CacheEntryImpl<PersonId, Person>> entries2 = TestsHelper.generatePersonIdsPersonsEntries();
         Collection<CacheEntryImpl<PersonId, Person>> entries3 = TestsHelper.generatePersonIdsPersonsEntries();
+        Collection<CacheEntryImpl<Long, Product>> productEntries = TestsHelper.generateProductEntries();
+        Collection<CacheEntryImpl<Long, ProductOrder>> orderEntries = TestsHelper.generateOrderEntries();
 
         LOGGER.info("Running POJO strategy write tests");
 
-        LOGGER.info("Running single operation write tests");
+        LOGGER.info("Running single write operation tests");
         store1.write(entries1.iterator().next());
         store2.write(entries2.iterator().next());
         store3.write(entries3.iterator().next());
         store4.write(entries3.iterator().next());
-        LOGGER.info("Single operation write tests passed");
+        productStore.write(productEntries.iterator().next());
+        orderStore.write(orderEntries.iterator().next());
+        LOGGER.info("Single write operation tests passed");
 
-        LOGGER.info("Running bulk operation write tests");
+        LOGGER.info("Running bulk write operation tests");
         store1.writeAll(entries1);
         store2.writeAll(entries2);
         store3.writeAll(entries3);
         store4.writeAll(entries3);
-        LOGGER.info("Bulk operation write tests passed");
+        productStore.writeAll(productEntries);
+        orderStore.writeAll(orderEntries);
+        LOGGER.info("Bulk write operation tests passed");
 
         LOGGER.info("POJO strategy write tests passed");
 
         LOGGER.info("Running POJO strategy read tests");
 
-        LOGGER.info("Running single operation read tests");
+        LOGGER.info("Running single read operation tests");
 
         Person person = (Person)store1.load(entries1.iterator().next().getKey());
         if (!entries1.iterator().next().getValue().equalsPrimitiveFields(person))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         person = (Person)store2.load(entries2.iterator().next().getKey());
         if (!entries2.iterator().next().getValue().equalsPrimitiveFields(person))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         person = (Person)store3.load(entries3.iterator().next().getKey());
         if (!entries3.iterator().next().getValue().equals(person))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         person = (Person)store4.load(entries3.iterator().next().getKey());
         if (!entries3.iterator().next().getValue().equals(person))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Single operation read tests passed");
+        Product product = (Product)productStore.load(productEntries.iterator().next().getKey());
+        if (!productEntries.iterator().next().getValue().equals(product))
+            throw new RuntimeException("Product values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Running bulk operation read tests");
+        ProductOrder order = (ProductOrder)orderStore.load(orderEntries.iterator().next().getKey());
+        if (!orderEntries.iterator().next().getValue().equals(order))
+            throw new RuntimeException("Order values were incorrectly deserialized from Cassandra");
+
+        LOGGER.info("Single read operation tests passed");
+
+        LOGGER.info("Running bulk read operation tests");
 
         Map persons = store1.loadAll(TestsHelper.getKeys(entries1));
         if (!TestsHelper.checkPersonCollectionsEqual(persons, entries1, true))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         persons = store2.loadAll(TestsHelper.getKeys(entries2));
         if (!TestsHelper.checkPersonCollectionsEqual(persons, entries2, true))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         persons = store3.loadAll(TestsHelper.getKeys(entries3));
         if (!TestsHelper.checkPersonCollectionsEqual(persons, entries3, false))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         persons = store4.loadAll(TestsHelper.getKeys(entries3));
         if (!TestsHelper.checkPersonCollectionsEqual(persons, entries3, false))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
+
+        Map products = productStore.loadAll(TestsHelper.getKeys(productEntries));
+        if (!TestsHelper.checkProductCollectionsEqual(products, productEntries))
+            throw new RuntimeException("Product values were incorrectly deserialized from Cassandra");
+
+        Map orders = orderStore.loadAll(TestsHelper.getKeys(orderEntries));
+        if (!TestsHelper.checkOrderCollectionsEqual(orders, orderEntries))
+            throw new RuntimeException("Order values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Bulk operation read tests passed");
+        LOGGER.info("Bulk read operation tests passed");
 
         LOGGER.info("POJO strategy read tests passed");
 
@@ -383,6 +420,277 @@ public class CassandraDirectPersistenceTest {
         store4.delete(entries3.iterator().next().getKey());
         store4.deleteAll(TestsHelper.getKeys(entries3));
 
+        productStore.delete(productEntries.iterator().next().getKey());
+        productStore.deleteAll(TestsHelper.getKeys(productEntries));
+
+        orderStore.delete(orderEntries.iterator().next().getKey());
+        orderStore.deleteAll(TestsHelper.getKeys(orderEntries));
+
         LOGGER.info("POJO strategy delete tests passed");
     }
+
+    /** */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void pojoStrategyTransactionTest() {
+        Map<Object, Object> sessionProps = U.newHashMap(1);
+        Transaction sessionTx = new TestTransaction();
+
+        CacheStore productStore = CacheStoreHelper.createCacheStore("product",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/product.xml"),
+            CassandraHelper.getAdminDataSrc(), new TestCacheSession("product", sessionTx, sessionProps));
+
+        CacheStore orderStore = CacheStoreHelper.createCacheStore("order",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/order.xml"),
+            CassandraHelper.getAdminDataSrc(), new TestCacheSession("order", sessionTx, sessionProps));
+
+        List<CacheEntryImpl<Long, Product>> productEntries = TestsHelper.generateProductEntries();
+        Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> ordersPerProduct =
+                TestsHelper.generateOrdersPerProductEntries(productEntries, 2);
+
+        Collection<Long> productIds =  TestsHelper.getProductIds(productEntries);
+        Collection<Long> orderIds =  TestsHelper.getOrderIds(ordersPerProduct);
+
+        LOGGER.info("Running POJO strategy transaction write tests");
+
+        LOGGER.info("Running single write operation tests");
+
+        CassandraHelper.dropTestKeyspaces();
+
+        Product product = productEntries.iterator().next().getValue();
+        ProductOrder order = ordersPerProduct.get(product.getId()).iterator().next().getValue();
+
+        productStore.write(productEntries.iterator().next());
+        orderStore.write(ordersPerProduct.get(product.getId()).iterator().next());
+
+        if (productStore.load(product.getId()) != null || orderStore.load(order.getId()) != null) {
+            throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already persisted into Cassandra");
+        }
+
+        Map<Long, Product> products = (Map<Long, Product>)productStore.loadAll(productIds);
+        Map<Long, ProductOrder> orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
+            throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already persisted into Cassandra");
+        }
+
+        //noinspection deprecation
+        orderStore.sessionEnd(true);
+        //noinspection deprecation
+        productStore.sessionEnd(true);
+
+        Product product1 = (Product)productStore.load(product.getId());
+        ProductOrder order1 = (ProductOrder)orderStore.load(order.getId());
+
+        if (product1 == null || order1 == null) {
+            throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+                    "no objects were persisted into Cassandra");
+        }
+
+        if (!product.equals(product1) || !order.equals(order1)) {
+            throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+                    "objects were incorrectly persisted/loaded to/from Cassandra");
+        }
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) {
+            throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+                    "no objects were persisted into Cassandra");
+        }
+
+        if (products.size() > 1 || orders.size() > 1) {
+            throw new RuntimeException("Single write operation test failed. There were committed more objects " +
+                    "into Cassandra than expected");
+        }
+
+        product1 = products.entrySet().iterator().next().getValue();
+        order1 = orders.entrySet().iterator().next().getValue();
+
+        if (!product.equals(product1) || !order.equals(order1)) {
+            throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+                    "objects were incorrectly persisted/loaded to/from Cassandra");
+        }
+
+        LOGGER.info("Single write operation tests passed");
+
+        LOGGER.info("Running bulk write operation tests");
+
+        CassandraHelper.dropTestKeyspaces();
+        sessionProps.clear();
+
+        productStore.writeAll(productEntries);
+
+        for (Long productId : ordersPerProduct.keySet())
+            orderStore.writeAll(ordersPerProduct.get(productId));
+
+        for (Long productId : productIds) {
+            if (productStore.load(productId) != null) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
+                        "objects were already persisted into Cassandra");
+            }
+        }
+
+        for (Long orderId : orderIds) {
+            if (orderStore.load(orderId) != null) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
+                        "objects were already persisted into Cassandra");
+            }
+        }
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
+            throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already persisted into Cassandra");
+        }
+
+        //noinspection deprecation
+        productStore.sessionEnd(true);
+        //noinspection deprecation
+        orderStore.sessionEnd(true);
+
+        for (CacheEntryImpl<Long, Product> entry : productEntries) {
+            product = (Product)productStore.load(entry.getKey());
+
+            if (!entry.getValue().equals(product)) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                        "not all objects were persisted into Cassandra");
+            }
+        }
+
+        for (Long productId : ordersPerProduct.keySet()) {
+            for (CacheEntryImpl<Long, ProductOrder> entry : ordersPerProduct.get(productId)) {
+                order = (ProductOrder)orderStore.load(entry.getKey());
+
+                if (!entry.getValue().equals(order)) {
+                    throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                            "not all objects were persisted into Cassandra");
+                }
+            }
+        }
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) {
+            throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                    "no objects were persisted into Cassandra");
+        }
+
+        if (products.size() < productIds.size() || orders.size() < orderIds.size()) {
+            throw new RuntimeException("Bulk write operation test failed. There were committed less objects " +
+                    "into Cassandra than expected");
+        }
+
+        if (products.size() > productIds.size() || orders.size() > orderIds.size()) {
+            throw new RuntimeException("Bulk write operation test failed. There were committed more objects " +
+                    "into Cassandra than expected");
+        }
+
+        for (CacheEntryImpl<Long, Product> entry : productEntries) {
+            product = products.get(entry.getKey());
+
+            if (!entry.getValue().equals(product)) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                        "some objects were incorrectly persisted/loaded to/from Cassandra");
+            }
+        }
+
+        for (Long productId : ordersPerProduct.keySet()) {
+            for (CacheEntryImpl<Long, ProductOrder> entry : ordersPerProduct.get(productId)) {
+                order = orders.get(entry.getKey());
+
+                if (!entry.getValue().equals(order)) {
+                    throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                            "some objects were incorrectly persisted/loaded to/from Cassandra");
+                }
+            }
+        }
+
+        LOGGER.info("Bulk write operation tests passed");
+
+        LOGGER.info("POJO strategy transaction write tests passed");
+
+        LOGGER.info("Running POJO strategy transaction delete tests");
+
+        LOGGER.info("Running single delete tests");
+
+        sessionProps.clear();
+
+        Product deletedProduct = productEntries.remove(0).getValue();
+        ProductOrder deletedOrder = ordersPerProduct.get(deletedProduct.getId()).remove(0).getValue();
+
+        productStore.delete(deletedProduct.getId());
+        orderStore.delete(deletedOrder.getId());
+
+        if (productStore.load(deletedProduct.getId()) == null || orderStore.load(deletedOrder.getId()) == null) {
+            throw new RuntimeException("Single delete operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already deleted from Cassandra");
+        }
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if (products.size() != productIds.size() || orders.size() != orderIds.size()) {
+            throw new RuntimeException("Single delete operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already deleted from Cassandra");
+        }
+
+        //noinspection deprecation
+        productStore.sessionEnd(true);
+        //noinspection deprecation
+        orderStore.sessionEnd(true);
+
+        if (productStore.load(deletedProduct.getId()) != null || orderStore.load(deletedOrder.getId()) != null) {
+            throw new RuntimeException("Single delete operation test failed. Transaction was committed, but " +
+                    "objects were not deleted from Cassandra");
+        }
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if (products.get(deletedProduct.getId()) != null || orders.get(deletedOrder.getId()) != null) {
+            throw new RuntimeException("Single delete operation test failed. Transaction was committed, but " +
+                    "objects were not deleted from Cassandra");
+        }
+
+        LOGGER.info("Single delete tests passed");
+
+        LOGGER.info("Running bulk delete tests");
+
+        sessionProps.clear();
+
+        productStore.deleteAll(productIds);
+        orderStore.deleteAll(orderIds);
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) {
+            throw new RuntimeException("Bulk delete operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already deleted from Cassandra");
+        }
+
+        //noinspection deprecation
+        orderStore.sessionEnd(true);
+        //noinspection deprecation
+        productStore.sessionEnd(true);
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
+            throw new RuntimeException("Bulk delete operation test failed. Transaction was committed, but " +
+                    "objects were not deleted from Cassandra");
+        }
+
+        LOGGER.info("Bulk delete tests passed");
+
+        LOGGER.info("POJO strategy transaction delete tests passed");
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
new file mode 100644
index 0000000..fc54e5b
--- /dev/null
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests;
+
+import org.apache.ignite.tests.utils.CassandraHelper;
+import org.apache.log4j.Logger;
+
+/**
+ * Simple helper class to run Cassandra on localhost
+ */
+public class CassandraLocalServer {
+    /** */
+    private static final Logger LOGGER = Logger.getLogger(CassandraLocalServer.class.getName());
+
+    /** */
+    public static void main(String[] args) {
+        try {
+            CassandraHelper.startEmbeddedCassandra(LOGGER);
+        }
+        catch (Throwable e) {
+            throw new RuntimeException("Failed to start embedded Cassandra instance", e);
+        }
+
+        LOGGER.info("Testing admin connection to Cassandra");
+        CassandraHelper.testAdminConnection();
+
+        LOGGER.info("Testing regular connection to Cassandra");
+        CassandraHelper.testRegularConnection();
+
+        LOGGER.info("Dropping all artifacts from previous tests execution session");
+        CassandraHelper.dropTestKeyspaces();
+
+        while (true) {
+            try {
+                System.out.println("Cassandra server running");
+                Thread.sleep(10000);
+            }
+            catch (Throwable e) {
+                throw new RuntimeException("Cassandra server terminated", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
index 43b6d3c..6465580 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
@@ -25,33 +25,30 @@ import org.junit.Test;
  * DDLGenerator test.
  */
 public class DDLGeneratorTest {
-    private static final String URL1 = "org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml";
-    private static final String URL2 = "org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml";
-    private static final String URL3 = "org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml";
+    private static final String[] RESOURCES = new String[] {
+        "org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml",
+        "org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml",
+        "org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml",
+        "org/apache/ignite/tests/persistence/pojo/product.xml",
+        "org/apache/ignite/tests/persistence/pojo/order.xml"
+    };
 
     @Test
     @SuppressWarnings("unchecked")
     /** */
     public void generatorTest() {
-        ClassLoader clsLdr = DDLGeneratorTest.class.getClassLoader();
-
-        URL url1 = clsLdr.getResource(URL1);
-        if (url1 == null)
-            throw new IllegalStateException("Failed to find resource: " + URL1);
+        String[] files = new String[RESOURCES.length];
 
-        URL url2 = clsLdr.getResource(URL2);
-        if (url2 == null)
-            throw new IllegalStateException("Failed to find resource: " + URL2);
+        ClassLoader clsLdr = DDLGeneratorTest.class.getClassLoader();
 
-        URL url3 = clsLdr.getResource(URL3);
-        if (url3 == null)
-            throw new IllegalStateException("Failed to find resource: " + URL3);
+        for (int i = 0; i < RESOURCES.length; i++) {
+            URL url = clsLdr.getResource(RESOURCES[i]);
+            if (url == null)
+                throw new IllegalStateException("Failed to find resource: " + RESOURCES[i]);
 
-        String file1 = url1.getFile();
-        String file2 = url2.getFile();
-        String file3 = url3.getFile();
+            files[i] = url.getFile();
+        }
 
-        DDLGenerator.main(new String[]{file1, file2, file3});
+        DDLGenerator.main(files);
     }
-
 }


Mime
View raw message