ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [04/41] incubator-ignite git commit: IGNITE-891 - Cache store improvements
Date Tue, 02 Jun 2015 10:37:34 GMT
IGNITE-891 - Cache store improvements


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/16f045f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/16f045f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/16f045f8

Branch: refs/heads/ignite-218
Commit: 16f045f8d5e8ca4950bc5b0ec55a83db2b7164d1
Parents: 79258ba
Author: Valentin Kulichenko <vkulichenko@gridgain.com>
Authored: Mon May 18 17:18:07 2015 -0700
Committer: Valentin Kulichenko <vkulichenko@gridgain.com>
Committed: Mon May 18 17:18:07 2015 -0700

----------------------------------------------------------------------
 .../cache/store/CacheStoreManager.java          |   2 +-
 .../store/GridCacheStoreManagerAdapter.java     |  25 +++-
 .../transactions/IgniteTxLocalAdapter.java      |  59 ++++++---
 ...cheStoreSessionListenerAbstractSelfTest.java | 111 ++++++++++++++++
 .../CacheStoreSessionJdbcListenerSelfTest.java  |  39 +++++-
 .../IgniteCrossCacheTxStoreSelfTest.java        | 131 +++++++++++++++----
 ...heStoreSessionHibernateListenerSelfTest.java |   6 +-
 ...CacheStoreSessionSpringListenerSelfTest.java |  27 +++-
 8 files changed, 337 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
index d9f50ac..327b879 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
@@ -160,7 +160,7 @@ public interface CacheStoreManager<K, V> extends GridCacheManager<K,
V> {
      * @param commit Commit.
      * @throws IgniteCheckedException If failed.
      */
-    public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException;
+    public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException;
 
     /**
      * End session initiated by write-behind store.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index a9ea2c0..aeca58f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -637,8 +637,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     }
 
     /** {@inheritDoc} */
-    @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys)
-        throws IgniteCheckedException {
+    @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) throws
IgniteCheckedException {
         if (F.isEmpty(keys))
             return true;
 
@@ -700,7 +699,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     }
 
     /** {@inheritDoc} */
-    @Override public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException
{
+    @Override public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws
IgniteCheckedException {
         assert store != null;
 
         sessionInit0(tx);
@@ -711,10 +710,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                     lsnr.onSessionEnd(locSes, commit);
             }
 
-            store.sessionEnd(commit);
+            if (!sesHolder.get().storeEnded(store))
+                store.sessionEnd(commit);
         }
         finally {
-            if (sesHolder != null) {
+            if (last && sesHolder != null) {
                 sesHolder.set(null);
 
                 tx.removeMeta(SES_ATTR);
@@ -752,7 +752,6 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
      */
     private void sessionInit0(@Nullable IgniteInternalTx tx) {
         assert sesHolder != null;
-        assert sesHolder.get() == null;
 
         SessionData ses;
 
@@ -794,7 +793,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                         lsnr.onSessionEnd(locSes, !threwEx);
                 }
 
-                store.sessionEnd(!threwEx);
+                if (!sesHolder.get().storeEnded(store))
+                    store.sessionEnd(!threwEx);
             }
         }
         catch (Exception e) {
@@ -840,6 +840,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
         /** */
         private boolean started;
 
+        /** */
+        private final Set<CacheStore> endedStores = new GridSetWrapper<>(new
IdentityHashMap<CacheStore, Object>());
+
         /**
          * @param tx Current transaction.
          * @param cacheName Cache name.
@@ -893,6 +896,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             return started;
         }
 
+        /**
+         * @param store Cache store.
+         * @return Whether session already ended on this store instance.
+         */
+        private boolean storeEnded(CacheStore store) {
+            return !endedStores.add(store);
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(SessionData.class, this, "tx", CU.txString(tx));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index fa64e12..854448d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -531,11 +531,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             }
 
                             // Batch-process puts if cache ID has changed.
-                            if (writeStore != null && writeStore != cacheCtx.store()
&& putMap != null && !putMap.isEmpty()) {
-                                writeStore.putAll(this, putMap);
+                            if (writeStore != null && writeStore != cacheCtx.store())
{
+                                if (putMap != null && !putMap.isEmpty()) {
+                                    writeStore.putAll(this, putMap);
 
-                                // Reset.
-                                putMap.clear();
+                                    // Reset.
+                                    putMap.clear();
+                                }
 
                                 writeStore = null;
                             }
@@ -574,11 +576,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 writeStore = null;
                             }
 
-                            if (writeStore != null && writeStore != cacheCtx.store()
&& rmvCol != null && !rmvCol.isEmpty()) {
-                                writeStore.removeAll(this, rmvCol);
+                            if (writeStore != null && writeStore != cacheCtx.store())
{
+                                if (rmvCol != null && !rmvCol.isEmpty()) {
+                                    writeStore.removeAll(this, rmvCol);
 
-                                // Reset.
-                                rmvCol.clear();
+                                    // Reset.
+                                    rmvCol.clear();
+                                }
 
                                 writeStore = null;
                             }
@@ -623,8 +627,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 }
 
                 // Commit while locks are held.
-                for (CacheStoreManager store : stores)
-                    store.sessionEnd(this, true);
+                sessionEnd(stores, true);
             }
             catch (IgniteCheckedException ex) {
                 commitError(ex);
@@ -649,6 +652,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 throw new IgniteCheckedException("Failed to commit transaction to database:
" + this, ex);
             }
+            finally {
+                if (isRollbackOnly())
+                    sessionEnd(stores, false);
+            }
         }
     }
 
@@ -984,13 +991,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 cctx.tm().resetContext();
             }
         }
-        else {
+        else if (!internal()) {
             Collection<CacheStoreManager> stores = stores();
 
-            if (stores != null && !stores.isEmpty() && !internal()) {
+            if (stores != null && !stores.isEmpty()) {
                 try {
-                    for (CacheStoreManager store : stores)
-                        store.sessionEnd(this, true);
+                    sessionEnd(stores, true);
                 }
                 catch (IgniteCheckedException e) {
                     commitError(e);
@@ -1091,13 +1097,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 cctx.tm().rollbackTx(this);
 
-                Collection<CacheStoreManager> stores = stores();
+                if (!internal()) {
+                    Collection<CacheStoreManager> stores = stores();
 
-                if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht()))
{
-                    if (!internal()) {
-                        for (CacheStoreManager store : stores)
-                            store.sessionEnd(this, false);
-                    }
+                    if (stores != null && !stores.isEmpty() && (near() ||
F.first(stores).isWriteToStoreFromDht()))
+                        sessionEnd(stores, false);
                 }
             }
             catch (Error | IgniteCheckedException | RuntimeException e) {
@@ -1109,6 +1113,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param stores Store managers.
+     * @param commit Commit flag.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void sessionEnd(Collection<CacheStoreManager> stores, boolean commit) throws
IgniteCheckedException {
+        Iterator<CacheStoreManager> it = stores.iterator();
+
+        while (it.hasNext()) {
+            CacheStoreManager store = it.next();
+
+            store.sessionEnd(this, commit, !it.hasNext());
+        }
+    }
+
+    /**
      * Checks if there is a cached or swapped value for
      * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean,
boolean, boolean)} method.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
index 5a01c2d..5df8f68 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cache.store;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -27,7 +28,9 @@ import org.apache.ignite.testframework.junits.common.*;
 import org.apache.ignite.transactions.*;
 
 import javax.cache.configuration.*;
+import javax.cache.integration.*;
 import java.io.*;
+import java.sql.*;
 import java.util.concurrent.atomic.*;
 
 /**
@@ -38,6 +41,9 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends
GridComm
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
+    protected static final String URL = "jdbc:h2:mem:example;DB_CLOSE_DELAY=-1";
+
+    /** */
     protected static final AtomicInteger loadCacheCnt = new AtomicInteger();
 
     /** */
@@ -52,6 +58,12 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends
GridComm
     /** */
     protected static final AtomicInteger reuseCnt = new AtomicInteger();
 
+    /** */
+    protected static final AtomicBoolean write = new AtomicBoolean();
+
+    /** */
+    protected static final AtomicBoolean fail = new AtomicBoolean();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -77,11 +89,22 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends
GridComm
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
+        try (Connection conn = DriverManager.getConnection(URL)) {
+            conn.createStatement().executeUpdate("DROP TABLE IF EXISTS Table1");
+            conn.createStatement().executeUpdate("DROP TABLE IF EXISTS Table2");
+
+            conn.createStatement().executeUpdate("CREATE TABLE Table1 (key INT, value INT)");
+            conn.createStatement().executeUpdate("CREATE TABLE Table2 (key INT, value INT)");
+        }
+
         loadCacheCnt.set(0);
         loadCnt.set(0);
         writeCnt.set(0);
         deleteCnt.set(0);
         reuseCnt.set(0);
+
+        write.set(false);
+        fail.set(false);
     }
 
     /**
@@ -174,6 +197,94 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends
GridComm
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCommit() throws Exception {
+        write.set(true);
+
+        CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
+        CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
+
+        try (
+            IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+            IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
+        ) {
+            try (Transaction tx = ignite(0).transactions().txStart()) {
+                cache1.put(1, 1);
+                cache2.put(2, 2);
+
+                tx.commit();
+            }
+        }
+
+        try (Connection conn = DriverManager.getConnection(URL)) {
+            checkTable(conn, 1, false);
+            checkTable(conn, 2, false);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRollback() throws Exception {
+        write.set(true);
+        fail.set(true);
+
+        CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
+        CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
+
+        try (
+            IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+            IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
+        ) {
+            try (Transaction tx = ignite(0).transactions().txStart()) {
+                cache1.put(1, 1);
+                cache2.put(2, 2);
+
+                tx.commit();
+
+                assert false : "Exception was not thrown.";
+            }
+            catch (IgniteException e) {
+                CacheWriterException we = X.cause(e, CacheWriterException.class);
+
+                assertNotNull(we);
+
+                assertEquals("Expected failure.", we.getMessage());
+            }
+        }
+
+        try (Connection conn = DriverManager.getConnection(URL)) {
+            checkTable(conn, 1, true);
+            checkTable(conn, 2, true);
+        }
+    }
+
+    /**
+     * @param conn Connection.
+     * @param idx Table index.
+     * @param empty If table expected to be empty.
+     * @throws Exception In case of error.
+     */
+    private void checkTable(Connection conn, int idx, boolean empty) throws Exception {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT key, value FROM Table"
+ idx);
+
+        int cnt = 0;
+
+        while (rs.next()) {
+            int key = rs.getInt(1);
+            int val = rs.getInt(2);
+
+            assertEquals(idx, key);
+            assertEquals(idx, val);
+
+            cnt++;
+        }
+
+        assertEquals(empty ? 0 : 1, cnt);
+    }
+
+    /**
      * @param name Cache name.
      * @param atomicity Atomicity mode.
      * @return Cache configuration.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
index 9020e0d..e4dac88 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java
@@ -47,7 +47,7 @@ public class CacheStoreSessionJdbcListenerSelfTest extends CacheStoreSessionList
             @Override public CacheStoreSessionListener create() {
                 CacheStoreSessionJdbcListener lsnr = new CacheStoreSessionJdbcListener();
 
-                lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1",
"", ""));
+                lsnr.setDataSource(JdbcConnectionPool.create(URL, "", ""));
 
                 return lsnr;
             }
@@ -86,6 +86,43 @@ public class CacheStoreSessionJdbcListenerSelfTest extends CacheStoreSessionList
             writeCnt.incrementAndGet();
 
             checkConnection();
+
+            if (write.get()) {
+                Connection conn = connection();
+
+                try {
+                    String table;
+
+                    switch (ses.cacheName()) {
+                        case "cache1":
+                            table = "Table1";
+
+                            break;
+
+                        case "cache2":
+                            if (fail.get())
+                                throw new CacheWriterException("Expected failure.");
+
+                            table = "Table2";
+
+                            break;
+
+                        default:
+                            throw new CacheWriterException("Wring cache: " + ses.cacheName());
+                    }
+
+                    PreparedStatement stmt = conn.prepareStatement(
+                        "INSERT INTO " + table + " (key, value) VALUES (?, ?)");
+
+                    stmt.setInt(1, entry.getKey());
+                    stmt.setInt(2, entry.getValue());
+
+                    stmt.executeUpdate();
+                }
+                catch (SQLException e) {
+                    throw new CacheWriterException(e);
+                }
+            }
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index cb32b13..f72ea47 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -101,19 +101,28 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest
{
         grid(0).cache("cacheA").removeAll();
         grid(0).cache("cacheB").removeAll();
         grid(0).cache("cacheC").removeAll();
+
+        for (CacheStore store : firstStores.values())
+            ((TestStore)store).clear();
+
+        for (CacheStore store : secondStores.values())
+            ((TestStore)store).clear();
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testWriteThrough() throws Exception {
+    public void testSameStore() throws Exception {
         IgniteEx grid = grid(0);
 
         TestStore firstStore = (TestStore)firstStores.get(grid.name());
+        TestStore secondStore = (TestStore)secondStores.get(grid.name());
 
         assertNotNull(firstStore);
+        assertNotNull(secondStore);
 
-        Collection<String> evts = firstStore.events();
+        Collection<String> firstStoreEvts = firstStore.events();
+        Collection<String> secondStoreEvts = secondStore.events();
 
         try (Transaction tx = grid.transactions().txStart()) {
             IgniteCache<Object, Object> cacheA = grid.cache("cacheA");
@@ -138,58 +147,122 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest
{
         }
 
         assertEqualsCollections(F.asList(
-                "writeAll cacheA 2",
-                "writeAll cacheB 2",
-                "deleteAll cacheA 2",
-                "deleteAll cacheB 2",
-                "write cacheA",
-                "delete cacheA",
-                "write cacheB",
-                "sessionEnd true"
-            ),
-            evts);
+            "writeAll cacheA 2",
+            "writeAll cacheB 2",
+            "deleteAll cacheA 2",
+            "deleteAll cacheB 2",
+            "write cacheA",
+            "delete cacheA",
+            "write cacheB",
+            "sessionEnd true"
+        ),
+        firstStoreEvts);
+
+        assertEquals(0, secondStoreEvts.size());
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testIncompatibleCaches1() throws Exception {
+    public void testDifferentStores() throws Exception {
         IgniteEx grid = grid(0);
 
-        try (Transaction ignored = grid.transactions().txStart()) {
+        TestStore firstStore = (TestStore)firstStores.get(grid.name());
+        TestStore secondStore = (TestStore)secondStores.get(grid.name());
+
+        assertNotNull(firstStore);
+        assertNotNull(secondStore);
+
+        Collection<String> firstStoreEvts = firstStore.events();
+        Collection<String> secondStoreEvts = secondStore.events();
+
+        try (Transaction tx = grid.transactions().txStart()) {
             IgniteCache<Object, Object> cacheA = grid.cache("cacheA");
             IgniteCache<Object, Object> cacheC = grid.cache("cacheC");
 
-            cacheA.put("1", "2");
+            cacheA.put("1", "1");
+            cacheA.put("2", "2");
+            cacheC.put("1", "1");
+            cacheC.put("2", "2");
+
+            cacheA.remove("3");
+            cacheA.remove("4");
+            cacheC.remove("3");
+            cacheC.remove("4");
+
+            cacheA.put("5", "5");
+            cacheA.remove("6");
 
-            cacheC.put("1", "2");
+            cacheC.put("7", "7");
 
-            fail("Must not allow to enlist caches with different stores to one transaction");
-        }
-        catch (CacheException e) {
-            assertTrue(e.getMessage().contains("Failed to enlist new cache to existing transaction"));
+            tx.commit();
         }
+
+        assertEqualsCollections(F.asList(
+            "writeAll cacheA 2",
+            "deleteAll cacheA 2",
+            "write cacheA",
+            "delete cacheA",
+            "sessionEnd true"
+        ),
+        firstStoreEvts);
+
+        assertEqualsCollections(F.asList(
+            "writeAll cacheC 2",
+            "deleteAll cacheC 2",
+            "write cacheC",
+            "sessionEnd true"
+        ),
+        secondStoreEvts);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testIncompatibleCaches2() throws Exception {
+    public void testNonPersistentCache() throws Exception {
         IgniteEx grid = grid(0);
 
-        try (Transaction ignored = grid.transactions().txStart()) {
+        TestStore firstStore = (TestStore)firstStores.get(grid.name());
+        TestStore secondStore = (TestStore)secondStores.get(grid.name());
+
+        assertNotNull(firstStore);
+        assertNotNull(secondStore);
+
+        Collection<String> firstStoreEvts = firstStore.events();
+        Collection<String> secondStoreEvts = secondStore.events();
+
+        try (Transaction tx = grid.transactions().txStart()) {
             IgniteCache<Object, Object> cacheA = grid.cache("cacheA");
-            IgniteCache<Object, Object> cacheC = grid.cache("cacheD");
+            IgniteCache<Object, Object> cacheD = grid.cache("cacheD");
+
+            cacheA.put("1", "1");
+            cacheA.put("2", "2");
+            cacheD.put("1", "1");
+            cacheD.put("2", "2");
 
-            cacheA.put("1", "2");
+            cacheA.remove("3");
+            cacheA.remove("4");
+            cacheD.remove("3");
+            cacheD.remove("4");
+
+            cacheA.put("5", "5");
+            cacheA.remove("6");
 
-            cacheC.put("1", "2");
+            cacheD.put("7", "7");
 
-            fail("Must not allow to enlist caches with different stores to one transaction");
-        }
-        catch (CacheException e) {
-            assertTrue(e.getMessage().contains("Failed to enlist new cache to existing transaction"));
+            tx.commit();
         }
+
+        assertEqualsCollections(F.asList(
+            "writeAll cacheA 2",
+            "deleteAll cacheA 2",
+            "write cacheA",
+            "delete cacheA",
+            "sessionEnd true"
+        ),
+        firstStoreEvts);
+
+        assertEquals(0, secondStoreEvts.size());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
index 85b0b95..d631393 100644
--- a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
+++ b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java
@@ -23,7 +23,6 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.hibernate.*;
 import org.hibernate.cfg.Configuration;
-import org.hibernate.service.*;
 
 import javax.cache.Cache;
 import javax.cache.configuration.*;
@@ -50,10 +49,9 @@ public class CacheStoreSessionHibernateListenerSelfTest extends CacheStoreSessio
                 CacheStoreSessionHibernateListener lsnr = new CacheStoreSessionHibernateListener();
 
                 Configuration cfg = new Configuration().
-                    setProperty("hibernate.dialect", "org.hibernate.dialect.H2Dialect").
-                    setProperty("hibernate.connection.datasource", "jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
+                    setProperty("hibernate.connection.url", URL);
 
-                lsnr.setSessionFactory(cfg.buildSessionFactory(new ServiceRegistryBuilder().buildServiceRegistry()));
+                lsnr.setSessionFactory(cfg.buildSessionFactory());
 
                 return lsnr;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
index a7ca317..79d5b5e 100644
--- a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
@@ -37,7 +37,7 @@ import java.util.*;
  */
 public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest
{
     /** */
-    private static final DataSource DATA_SRC = new DriverManagerDataSource("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
+    private static final DataSource DATA_SRC = new DriverManagerDataSource(URL);
 
     /** {@inheritDoc} */
     @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory()
{
@@ -106,6 +106,31 @@ public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionLi
 
             checkTransaction();
             checkConnection();
+
+            if (write.get()) {
+                String table;
+
+                switch (ses.cacheName()) {
+                    case "cache1":
+                        table = "Table1";
+
+                        break;
+
+                    case "cache2":
+                        if (fail.get())
+                            throw new CacheWriterException("Expected failure.");
+
+                        table = "Table2";
+
+                        break;
+
+                    default:
+                        throw new CacheWriterException("Wring cache: " + ses.cacheName());
+                }
+
+                jdbc.update("INSERT INTO " + table + " (key, value) VALUES (?, ?)",
+                    entry.getKey(), entry.getValue());
+            }
         }
 
         /** {@inheritDoc} */


Mime
View raw message