Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0AA3D17478 for ; Tue, 2 Jun 2015 09:01:37 +0000 (UTC) Received: (qmail 80624 invoked by uid 500); 2 Jun 2015 09:01:36 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 80592 invoked by uid 500); 2 Jun 2015 09:01:36 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 80583 invoked by uid 99); 2 Jun 2015 09:01:36 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jun 2015 09:01:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 708B2CAB01 for ; Tue, 2 Jun 2015 09:01:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id olcIMIsz3RdH for ; Tue, 2 Jun 2015 09:01:30 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 10BB5455D8 for ; Tue, 2 Jun 2015 09:01:23 +0000 (UTC) Received: (qmail 79657 invoked by uid 99); 2 Jun 2015 09:01:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jun 2015 09:01:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5E1F5E0283; Tue, 2 Jun 2015 09:01:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 02 Jun 2015 09:01:26 -0000 Message-Id: <1060097c4cdd43488cd7f637542c4867@git.apache.org> In-Reply-To: <5bc6b57086ff4adea5372220b9b8a4ad@git.apache.org> References: <5bc6b57086ff4adea5372220b9b8a4ad@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/38] incubator-ignite git commit: IGNITE-891 - Cache store improvements IGNITE-891 - Cache store improvements Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/16f045f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/16f045f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/16f045f8 Branch: refs/heads/ignite-883_1 Commit: 16f045f8d5e8ca4950bc5b0ec55a83db2b7164d1 Parents: 79258ba Author: Valentin Kulichenko Authored: Mon May 18 17:18:07 2015 -0700 Committer: Valentin Kulichenko Committed: Mon May 18 17:18:07 2015 -0700 ---------------------------------------------------------------------- .../cache/store/CacheStoreManager.java | 2 +- .../store/GridCacheStoreManagerAdapter.java | 25 +++- .../transactions/IgniteTxLocalAdapter.java | 59 ++++++--- ...cheStoreSessionListenerAbstractSelfTest.java | 111 ++++++++++++++++ .../CacheStoreSessionJdbcListenerSelfTest.java | 39 +++++- .../IgniteCrossCacheTxStoreSelfTest.java | 131 +++++++++++++++---- ...heStoreSessionHibernateListenerSelfTest.java | 6 +- ...CacheStoreSessionSpringListenerSelfTest.java | 27 +++- 8 files changed, 337 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java index d9f50ac..327b879 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java @@ -160,7 +160,7 @@ public interface CacheStoreManager extends GridCacheManager { * @param commit Commit. * @throws IgniteCheckedException If failed. */ - public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException; + public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException; /** * End session initiated by write-behind store. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index a9ea2c0..aeca58f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -637,8 +637,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ - @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) - throws IgniteCheckedException { + @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) throws IgniteCheckedException { if (F.isEmpty(keys)) return true; @@ -700,7 +699,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ - @Override public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException { + @Override public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException { assert store != null; sessionInit0(tx); @@ -711,10 +710,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt lsnr.onSessionEnd(locSes, commit); } - store.sessionEnd(commit); + if (!sesHolder.get().storeEnded(store)) + store.sessionEnd(commit); } finally { - if (sesHolder != null) { + if (last && sesHolder != null) { sesHolder.set(null); tx.removeMeta(SES_ATTR); @@ -752,7 +752,6 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt */ private void sessionInit0(@Nullable IgniteInternalTx tx) { assert sesHolder != null; - assert sesHolder.get() == null; SessionData ses; @@ -794,7 +793,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt lsnr.onSessionEnd(locSes, !threwEx); } - store.sessionEnd(!threwEx); + if (!sesHolder.get().storeEnded(store)) + store.sessionEnd(!threwEx); } } catch (Exception e) { @@ -840,6 +840,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt /** */ private boolean started; + /** */ + private final Set endedStores = new GridSetWrapper<>(new IdentityHashMap()); + /** * @param tx Current transaction. * @param cacheName Cache name. @@ -893,6 +896,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt return started; } + /** + * @param store Cache store. + * @return Whether session already ended on this store instance. + */ + private boolean storeEnded(CacheStore store) { + return !endedStores.add(store); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SessionData.class, this, "tx", CU.txString(tx)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index fa64e12..854448d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -531,11 +531,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } // Batch-process puts if cache ID has changed. - if (writeStore != null && writeStore != cacheCtx.store() && putMap != null && !putMap.isEmpty()) { - writeStore.putAll(this, putMap); + if (writeStore != null && writeStore != cacheCtx.store()) { + if (putMap != null && !putMap.isEmpty()) { + writeStore.putAll(this, putMap); - // Reset. - putMap.clear(); + // Reset. + putMap.clear(); + } writeStore = null; } @@ -574,11 +576,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter writeStore = null; } - if (writeStore != null && writeStore != cacheCtx.store() && rmvCol != null && !rmvCol.isEmpty()) { - writeStore.removeAll(this, rmvCol); + if (writeStore != null && writeStore != cacheCtx.store()) { + if (rmvCol != null && !rmvCol.isEmpty()) { + writeStore.removeAll(this, rmvCol); - // Reset. - rmvCol.clear(); + // Reset. + rmvCol.clear(); + } writeStore = null; } @@ -623,8 +627,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } // Commit while locks are held. - for (CacheStoreManager store : stores) - store.sessionEnd(this, true); + sessionEnd(stores, true); } catch (IgniteCheckedException ex) { commitError(ex); @@ -649,6 +652,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex); } + finally { + if (isRollbackOnly()) + sessionEnd(stores, false); + } } } @@ -984,13 +991,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().resetContext(); } } - else { + else if (!internal()) { Collection stores = stores(); - if (stores != null && !stores.isEmpty() && !internal()) { + if (stores != null && !stores.isEmpty()) { try { - for (CacheStoreManager store : stores) - store.sessionEnd(this, true); + sessionEnd(stores, true); } catch (IgniteCheckedException e) { commitError(e); @@ -1091,13 +1097,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().rollbackTx(this); - Collection stores = stores(); + if (!internal()) { + Collection stores = stores(); - if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht())) { - if (!internal()) { - for (CacheStoreManager store : stores) - store.sessionEnd(this, false); - } + if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht())) + sessionEnd(stores, false); } } catch (Error | IgniteCheckedException | RuntimeException e) { @@ -1109,6 +1113,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param stores Store managers. + * @param commit Commit flag. + * @throws IgniteCheckedException In case of error. + */ + private void sessionEnd(Collection stores, boolean commit) throws IgniteCheckedException { + Iterator it = stores.iterator(); + + while (it.hasNext()) { + CacheStoreManager store = it.next(); + + store.sessionEnd(this, commit, !it.hasNext()); + } + } + + /** * Checks if there is a cached or swapped value for * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java index 5a01c2d..5df8f68 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.cache.store; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -27,7 +28,9 @@ import org.apache.ignite.testframework.junits.common.*; import org.apache.ignite.transactions.*; import javax.cache.configuration.*; +import javax.cache.integration.*; import java.io.*; +import java.sql.*; import java.util.concurrent.atomic.*; /** @@ -38,6 +41,9 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ + protected static final String URL = "jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"; + + /** */ protected static final AtomicInteger loadCacheCnt = new AtomicInteger(); /** */ @@ -52,6 +58,12 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm /** */ protected static final AtomicInteger reuseCnt = new AtomicInteger(); + /** */ + protected static final AtomicBoolean write = new AtomicBoolean(); + + /** */ + protected static final AtomicBoolean fail = new AtomicBoolean(); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -77,11 +89,22 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { + try (Connection conn = DriverManager.getConnection(URL)) { + conn.createStatement().executeUpdate("DROP TABLE IF EXISTS Table1"); + conn.createStatement().executeUpdate("DROP TABLE IF EXISTS Table2"); + + conn.createStatement().executeUpdate("CREATE TABLE Table1 (key INT, value INT)"); + conn.createStatement().executeUpdate("CREATE TABLE Table2 (key INT, value INT)"); + } + loadCacheCnt.set(0); loadCnt.set(0); writeCnt.set(0); deleteCnt.set(0); reuseCnt.set(0); + + write.set(false); + fail.set(false); } /** @@ -174,6 +197,94 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm } /** + * @throws Exception If failed. + */ + public void testCommit() throws Exception { + write.set(true); + + CacheConfiguration cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL); + CacheConfiguration cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL); + + try ( + IgniteCache cache1 = ignite(0).createCache(cfg1); + IgniteCache cache2 = ignite(0).createCache(cfg2) + ) { + try (Transaction tx = ignite(0).transactions().txStart()) { + cache1.put(1, 1); + cache2.put(2, 2); + + tx.commit(); + } + } + + try (Connection conn = DriverManager.getConnection(URL)) { + checkTable(conn, 1, false); + checkTable(conn, 2, false); + } + } + + /** + * @throws Exception If failed. + */ + public void testRollback() throws Exception { + write.set(true); + fail.set(true); + + CacheConfiguration cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL); + CacheConfiguration cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL); + + try ( + IgniteCache cache1 = ignite(0).createCache(cfg1); + IgniteCache cache2 = ignite(0).createCache(cfg2) + ) { + try (Transaction tx = ignite(0).transactions().txStart()) { + cache1.put(1, 1); + cache2.put(2, 2); + + tx.commit(); + + assert false : "Exception was not thrown."; + } + catch (IgniteException e) { + CacheWriterException we = X.cause(e, CacheWriterException.class); + + assertNotNull(we); + + assertEquals("Expected failure.", we.getMessage()); + } + } + + try (Connection conn = DriverManager.getConnection(URL)) { + checkTable(conn, 1, true); + checkTable(conn, 2, true); + } + } + + /** + * @param conn Connection. + * @param idx Table index. + * @param empty If table expected to be empty. + * @throws Exception In case of error. + */ + private void checkTable(Connection conn, int idx, boolean empty) throws Exception { + ResultSet rs = conn.createStatement().executeQuery("SELECT key, value FROM Table" + idx); + + int cnt = 0; + + while (rs.next()) { + int key = rs.getInt(1); + int val = rs.getInt(2); + + assertEquals(idx, key); + assertEquals(idx, val); + + cnt++; + } + + assertEquals(empty ? 0 : 1, cnt); + } + + /** * @param name Cache name. * @param atomicity Atomicity mode. * @return Cache configuration. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java index 9020e0d..e4dac88 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheStoreSessionJdbcListenerSelfTest.java @@ -47,7 +47,7 @@ public class CacheStoreSessionJdbcListenerSelfTest extends CacheStoreSessionList @Override public CacheStoreSessionListener create() { CacheStoreSessionJdbcListener lsnr = new CacheStoreSessionJdbcListener(); - lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", "")); + lsnr.setDataSource(JdbcConnectionPool.create(URL, "", "")); return lsnr; } @@ -86,6 +86,43 @@ public class CacheStoreSessionJdbcListenerSelfTest extends CacheStoreSessionList writeCnt.incrementAndGet(); checkConnection(); + + if (write.get()) { + Connection conn = connection(); + + try { + String table; + + switch (ses.cacheName()) { + case "cache1": + table = "Table1"; + + break; + + case "cache2": + if (fail.get()) + throw new CacheWriterException("Expected failure."); + + table = "Table2"; + + break; + + default: + throw new CacheWriterException("Wring cache: " + ses.cacheName()); + } + + PreparedStatement stmt = conn.prepareStatement( + "INSERT INTO " + table + " (key, value) VALUES (?, ?)"); + + stmt.setInt(1, entry.getKey()); + stmt.setInt(2, entry.getValue()); + + stmt.executeUpdate(); + } + catch (SQLException e) { + throw new CacheWriterException(e); + } + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java index cb32b13..f72ea47 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java @@ -101,19 +101,28 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { grid(0).cache("cacheA").removeAll(); grid(0).cache("cacheB").removeAll(); grid(0).cache("cacheC").removeAll(); + + for (CacheStore store : firstStores.values()) + ((TestStore)store).clear(); + + for (CacheStore store : secondStores.values()) + ((TestStore)store).clear(); } /** * @throws Exception If failed. */ - public void testWriteThrough() throws Exception { + public void testSameStore() throws Exception { IgniteEx grid = grid(0); TestStore firstStore = (TestStore)firstStores.get(grid.name()); + TestStore secondStore = (TestStore)secondStores.get(grid.name()); assertNotNull(firstStore); + assertNotNull(secondStore); - Collection evts = firstStore.events(); + Collection firstStoreEvts = firstStore.events(); + Collection secondStoreEvts = secondStore.events(); try (Transaction tx = grid.transactions().txStart()) { IgniteCache cacheA = grid.cache("cacheA"); @@ -138,58 +147,122 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { } assertEqualsCollections(F.asList( - "writeAll cacheA 2", - "writeAll cacheB 2", - "deleteAll cacheA 2", - "deleteAll cacheB 2", - "write cacheA", - "delete cacheA", - "write cacheB", - "sessionEnd true" - ), - evts); + "writeAll cacheA 2", + "writeAll cacheB 2", + "deleteAll cacheA 2", + "deleteAll cacheB 2", + "write cacheA", + "delete cacheA", + "write cacheB", + "sessionEnd true" + ), + firstStoreEvts); + + assertEquals(0, secondStoreEvts.size()); } /** * @throws Exception If failed. */ - public void testIncompatibleCaches1() throws Exception { + public void testDifferentStores() throws Exception { IgniteEx grid = grid(0); - try (Transaction ignored = grid.transactions().txStart()) { + TestStore firstStore = (TestStore)firstStores.get(grid.name()); + TestStore secondStore = (TestStore)secondStores.get(grid.name()); + + assertNotNull(firstStore); + assertNotNull(secondStore); + + Collection firstStoreEvts = firstStore.events(); + Collection secondStoreEvts = secondStore.events(); + + try (Transaction tx = grid.transactions().txStart()) { IgniteCache cacheA = grid.cache("cacheA"); IgniteCache cacheC = grid.cache("cacheC"); - cacheA.put("1", "2"); + cacheA.put("1", "1"); + cacheA.put("2", "2"); + cacheC.put("1", "1"); + cacheC.put("2", "2"); + + cacheA.remove("3"); + cacheA.remove("4"); + cacheC.remove("3"); + cacheC.remove("4"); + + cacheA.put("5", "5"); + cacheA.remove("6"); - cacheC.put("1", "2"); + cacheC.put("7", "7"); - fail("Must not allow to enlist caches with different stores to one transaction"); - } - catch (CacheException e) { - assertTrue(e.getMessage().contains("Failed to enlist new cache to existing transaction")); + tx.commit(); } + + assertEqualsCollections(F.asList( + "writeAll cacheA 2", + "deleteAll cacheA 2", + "write cacheA", + "delete cacheA", + "sessionEnd true" + ), + firstStoreEvts); + + assertEqualsCollections(F.asList( + "writeAll cacheC 2", + "deleteAll cacheC 2", + "write cacheC", + "sessionEnd true" + ), + secondStoreEvts); } /** * @throws Exception If failed. */ - public void testIncompatibleCaches2() throws Exception { + public void testNonPersistentCache() throws Exception { IgniteEx grid = grid(0); - try (Transaction ignored = grid.transactions().txStart()) { + TestStore firstStore = (TestStore)firstStores.get(grid.name()); + TestStore secondStore = (TestStore)secondStores.get(grid.name()); + + assertNotNull(firstStore); + assertNotNull(secondStore); + + Collection firstStoreEvts = firstStore.events(); + Collection secondStoreEvts = secondStore.events(); + + try (Transaction tx = grid.transactions().txStart()) { IgniteCache cacheA = grid.cache("cacheA"); - IgniteCache cacheC = grid.cache("cacheD"); + IgniteCache cacheD = grid.cache("cacheD"); + + cacheA.put("1", "1"); + cacheA.put("2", "2"); + cacheD.put("1", "1"); + cacheD.put("2", "2"); - cacheA.put("1", "2"); + cacheA.remove("3"); + cacheA.remove("4"); + cacheD.remove("3"); + cacheD.remove("4"); + + cacheA.put("5", "5"); + cacheA.remove("6"); - cacheC.put("1", "2"); + cacheD.put("7", "7"); - fail("Must not allow to enlist caches with different stores to one transaction"); - } - catch (CacheException e) { - assertTrue(e.getMessage().contains("Failed to enlist new cache to existing transaction")); + tx.commit(); } + + assertEqualsCollections(F.asList( + "writeAll cacheA 2", + "deleteAll cacheA 2", + "write cacheA", + "delete cacheA", + "sessionEnd true" + ), + firstStoreEvts); + + assertEquals(0, secondStoreEvts.size()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java index 85b0b95..d631393 100644 --- a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java +++ b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheStoreSessionHibernateListenerSelfTest.java @@ -23,7 +23,6 @@ import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.hibernate.*; import org.hibernate.cfg.Configuration; -import org.hibernate.service.*; import javax.cache.Cache; import javax.cache.configuration.*; @@ -50,10 +49,9 @@ public class CacheStoreSessionHibernateListenerSelfTest extends CacheStoreSessio CacheStoreSessionHibernateListener lsnr = new CacheStoreSessionHibernateListener(); Configuration cfg = new Configuration(). - setProperty("hibernate.dialect", "org.hibernate.dialect.H2Dialect"). - setProperty("hibernate.connection.datasource", "jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"); + setProperty("hibernate.connection.url", URL); - lsnr.setSessionFactory(cfg.buildSessionFactory(new ServiceRegistryBuilder().buildServiceRegistry())); + lsnr.setSessionFactory(cfg.buildSessionFactory()); return lsnr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f045f8/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java index a7ca317..79d5b5e 100644 --- a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java @@ -37,7 +37,7 @@ import java.util.*; */ public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest { /** */ - private static final DataSource DATA_SRC = new DriverManagerDataSource("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"); + private static final DataSource DATA_SRC = new DriverManagerDataSource(URL); /** {@inheritDoc} */ @Override protected Factory> storeFactory() { @@ -106,6 +106,31 @@ public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionLi checkTransaction(); checkConnection(); + + if (write.get()) { + String table; + + switch (ses.cacheName()) { + case "cache1": + table = "Table1"; + + break; + + case "cache2": + if (fail.get()) + throw new CacheWriterException("Expected failure."); + + table = "Table2"; + + break; + + default: + throw new CacheWriterException("Wring cache: " + ses.cacheName()); + } + + jdbc.update("INSERT INTO " + table + " (key, value) VALUES (?, ?)", + entry.getKey(), entry.getValue()); + } } /** {@inheritDoc} */