Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8B9A8175A7 for ; Thu, 29 Oct 2015 09:02:11 +0000 (UTC) Received: (qmail 3392 invoked by uid 500); 29 Oct 2015 09:02:11 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 3307 invoked by uid 500); 29 Oct 2015 09:02:11 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 2249 invoked by uid 99); 29 Oct 2015 09:02:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Oct 2015 09:02:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 99362E13D3; Thu, 29 Oct 2015 09:02:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 29 Oct 2015 09:02:42 -0000 Message-Id: <85beed1ce3db478c8f90b21d306a8752@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [34/49] ignite git commit: ignite-1607 Implemented deadlock-free optimistic serializable tx mode http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java new file mode 100644 index 0000000..70ddfa0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -0,0 +1,4295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.CacheException; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorResult; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionOptimisticException; +import org.jsr166.ConcurrentHashMap8; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.testframework.GridTestUtils.TestMemoryMode; +import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final boolean FAST = false; + + /** */ + private static Map storeMap = new ConcurrentHashMap8<>(); + + /** */ + private static final int SRVS = 4; + + /** */ + private static final int CLIENTS = 3; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + + client = true; + + startGridsMultiThreaded(SRVS, CLIENTS); + + client = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60_000; + } + + /** + * @throws Exception If failed. + */ + public void testTxStreamerLoad() throws Exception { + txStreamerLoad(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxStreamerLoadAllowOverwrite() throws Exception { + txStreamerLoad(true); + } + + /** + * @param allowOverwrite Streamer flag. + * @throws Exception If failed. + */ + private void txStreamerLoad(boolean allowOverwrite) throws Exception { + Ignite ignite0 = ignite(0); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + if (ccfg.getCacheStoreFactory() == null) + continue; + + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) + txStreamerLoad(ignite0, key, cache.getName(), allowOverwrite); + + txStreamerLoad(ignite(SRVS), 10_000, cache.getName(), allowOverwrite); + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @param ignite Node. + * @param key Key. + * @param cacheName Cache name. + * @param allowOverwrite Streamer flag. + * @throws Exception If failed. + */ + private void txStreamerLoad(Ignite ignite, + Integer key, + String cacheName, + boolean allowOverwrite) throws Exception { + IgniteCache cache = ignite.cache(cacheName); + + log.info("Test key: " + key); + + Integer loadVal = -1; + + IgniteTransactions txs = ignite.transactions(); + + try (IgniteDataStreamer streamer = ignite.dataStreamer(cache.getName())) { + streamer.allowOverwrite(allowOverwrite); + + streamer.addData(key, loadVal); + } + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(loadVal, val); + + tx.commit(); + } + + checkValue(key, loadVal, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(loadVal, val); + + cache.put(key, 0); + + tx.commit(); + } + + checkValue(key, 0, cache.getName()); + } + + /** + * @throws Exception If failed. + */ + public void testTxLoadFromStore() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + if (ccfg.getCacheStoreFactory() == null) + continue; + + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer storeVal = -1; + + storeMap.put(key, storeVal); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(storeVal, val); + + tx.commit(); + } + + checkValue(key, storeVal, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxCommitReadOnly1() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + tx.rollback(); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 1); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + tx.commit(); + } + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxCommitReadOnly2() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (final Integer key : keys) { + log.info("Test key: " + key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + txAsync(cache, OPTIMISTIC, SERIALIZABLE, + new IgniteClosure, Void>() { + @Override public Void apply(IgniteCache cache) { + cache.get(key); + + return null; + } + } + ); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + txAsync(cache, PESSIMISTIC, REPEATABLE_READ, + new IgniteClosure, Void>() { + @Override public Void apply(IgniteCache cache) { + cache.get(key); + + return null; + } + } + ); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxCommit() throws Exception { + Ignite ignite0 = ignite(0); + Ignite ignite1 = ignite(1); + + final IgniteTransactions txs0 = ignite0.transactions(); + final IgniteTransactions txs1 = ignite1.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache0 = ignite0.createCache(ccfg); + IgniteCache cache1 = ignite1.cache(ccfg.getName()); + + List keys = testKeys(cache0); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + for (int i = 0; i < 100; i++) { + try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache0.get(key); + + assertEquals(expVal, val); + + cache0.put(key, i); + + tx.commit(); + + expVal = i; + } + + try (Transaction tx = txs1.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache1.get(key); + + assertEquals(expVal, val); + + cache1.put(key, val); + + tx.commit(); + } + + try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache0.get(key); + + assertEquals(expVal, val); + + cache0.put(key, val); + + tx.commit(); + } + } + + checkValue(key, expVal, cache0.getName()); + + cache0.remove(key); + + try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache0.get(key); + + assertNull(val); + + cache0.put(key, expVal + 1); + + tx.commit(); + } + + checkValue(key, expVal + 1, cache0.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxRollback() throws Exception { + Ignite ignite0 = ignite(0); + Ignite ignite1 = ignite(1); + + final IgniteTransactions txs0 = ignite0.transactions(); + final IgniteTransactions txs1 = ignite1.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache0 = ignite0.createCache(ccfg); + IgniteCache cache1 = ignite1.cache(ccfg.getName()); + + List keys = testKeys(cache0); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + for (int i = 0; i < 100; i++) { + try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache0.get(key); + + assertEquals(expVal, val); + + cache0.put(key, i); + + tx.rollback(); + } + + try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache0.get(key); + + assertEquals(expVal, val); + + cache0.put(key, i); + + tx.commit(); + + expVal = i; + } + + try (Transaction tx = txs1.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache1.get(key); + + assertEquals(expVal, val); + + cache1.put(key, val); + + tx.commit(); + } + } + + checkValue(key, expVal, cache0.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxCommitReadOnlyGetAll() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + Set keys = new HashSet<>(); + + for (int i = 0; i < 100; i++) + keys.add(i); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Map map = cache.getAll(keys); + + assertTrue(map.isEmpty()); + + tx.commit(); + } + + for (Integer key : keys) + checkValue(key, null, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Map map = cache.getAll(keys); + + assertTrue(map.isEmpty()); + + tx.rollback(); + } + + for (Integer key : keys) + checkValue(key, null, cache.getName()); + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxCommitReadWriteTwoNodes() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + Integer key0 = primaryKey(ignite(0).cache(null)); + Integer key1 = primaryKey(ignite(1).cache(null)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key0, key0); + + cache.get(key1); + + tx.commit(); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictRead1() throws Exception { + txConflictRead(true); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictRead2() throws Exception { + txConflictRead(false); + } + + /** + * @param noVal If {@code true} there is no cache value when read in tx. + * @throws Exception If failed. + */ + private void txConflictRead(boolean noVal) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + if (!noVal) { + expVal = -1; + + cache.put(key, expVal); + } + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(expVal, val); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val = cache.get(key); + + assertEquals(1, val); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadWrite1() throws Exception { + txConflictReadWrite(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadWrite2() throws Exception { + txConflictReadWrite(false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadRemove1() throws Exception { + txConflictReadWrite(true, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadRemove2() throws Exception { + txConflictReadWrite(false, true); + } + + /** + * @param noVal If {@code true} there is no cache value when read in tx. + * @param rmv If {@code true} tests remove, otherwise put. + * @throws Exception If failed. + */ + private void txConflictReadWrite(boolean noVal, boolean rmv) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + if (!noVal) { + expVal = -1; + + cache.put(key, expVal); + } + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(expVal, val); + + updateKey(cache, key, 1); + + if (rmv) + cache.remove(key); + else + cache.put(key, 2); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(1, (Object) val); + + if (rmv) + cache.remove(key); + else + cache.put(key, 2); + + tx.commit(); + } + + checkValue(key, rmv ? null : 2, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed + */ + public void testTxConflictGetAndPut1() throws Exception { + txConflictGetAndPut(true, false); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictGetAndPut2() throws Exception { + txConflictGetAndPut(false, false); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictGetAndRemove1() throws Exception { + txConflictGetAndPut(true, true); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictGetAndRemove2() throws Exception { + txConflictGetAndPut(false, true); + } + + /** + * @param noVal If {@code true} there is no cache value when read in tx. + * @param rmv If {@code true} tests remove, otherwise put. + * @throws Exception If failed. + */ + private void txConflictGetAndPut(boolean noVal, boolean rmv) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + if (!noVal) { + expVal = -1; + + cache.put(key, expVal); + } + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2); + + assertEquals(expVal, val); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2); + + assertEquals(1, val); + + tx.commit(); + } + + checkValue(key, rmv ? null : 2, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed + */ + public void testTxConflictInvoke1() throws Exception { + txConflictInvoke(true, false); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictInvoke2() throws Exception { + txConflictInvoke(false, false); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictInvoke3() throws Exception { + txConflictInvoke(true, true); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictInvoke4() throws Exception { + txConflictInvoke(false, true); + } + + /** + * @param noVal If {@code true} there is no cache value when read in tx. + * @param rmv If {@code true} invoke does remove value, otherwise put. + * @throws Exception If failed. + */ + private void txConflictInvoke(boolean noVal, boolean rmv) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + if (!noVal) { + expVal = -1; + + cache.put(key, expVal); + } + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2)); + + assertEquals(expVal, val); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2)); + + assertEquals(1, val); + + tx.commit(); + } + + checkValue(key, rmv ? null : 2, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed + */ + public void testTxConflictInvokeAll() throws Exception { + Ignite ignite0 = ignite(0); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache0 = ignite0.createCache(ccfg); + + final Integer key1 = primaryKey(ignite(0).cache(cache0.getName())); + final Integer key2 = primaryKey(ignite(1).cache(cache0.getName())); + + Map vals = new HashMap<>(); + + int newVal = 0; + + for (Ignite ignite : G.allGrids()) { + log.info("Test node: " + ignite.name()); + + IgniteTransactions txs = ignite.transactions(); + + IgniteCache cache = ignite.cache(cache0.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Map> res = + cache.invokeAll(F.asSet(key1, key2), new SetValueProcessor(newVal)); + + if (!vals.isEmpty()) { + EntryProcessorResult res1 = res.get(key1); + + assertNotNull(res1); + assertEquals(vals.get(key1), res1.get()); + + EntryProcessorResult res2 = res.get(key2); + + assertNotNull(res2); + assertEquals(vals.get(key2), res2.get()); + } + else + assertTrue(res.isEmpty()); + + tx.commit(); + } + + checkValue(key1, newVal, cache.getName()); + checkValue(key2, newVal, cache.getName()); + + vals.put(key1, newVal); + vals.put(key2, newVal); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Map> res = + cache.invokeAll(F.asSet(key1, key2), new SetValueProcessor(newVal + 1)); + + EntryProcessorResult res1 = res.get(key1); + + assertNotNull(res1); + assertEquals(vals.get(key1), res1.get()); + + EntryProcessorResult res2 = res.get(key2); + + assertNotNull(res2); + assertEquals(vals.get(key2), res2.get()); + + updateKey(cache0, key1, -1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key1, -1, cache.getName()); + checkValue(key2, newVal, cache.getName()); + + vals.put(key1, -1); + vals.put(key2, newVal); + + newVal++; + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictPutIfAbsent() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean put = cache.putIfAbsent(key, 2); + + assertTrue(put); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean put = cache.putIfAbsent(key, 2); + + assertFalse(put); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean put = cache.putIfAbsent(key, 2); + + assertTrue(put); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean put = cache.putIfAbsent(key, 2); + + assertFalse(put); + + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictGetAndPutIfAbsent() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndPutIfAbsent(key, 2); + + assertNull(old); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndPutIfAbsent(key, 2); + + assertEquals(1, old); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndPutIfAbsent(key, 2); + + assertNull(old); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndPutIfAbsent(key, 4); + + assertEquals(2, old); + + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReplace() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (final Integer key : keys) { + log.info("Test key: " + key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertFalse(replace); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertTrue(replace); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertFalse(replace); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertFalse(replace); + + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertTrue(replace); + + txAsync(cache, OPTIMISTIC, SERIALIZABLE, + new IgniteClosure, Void>() { + @Override public Void apply(IgniteCache cache) { + cache.remove(key); + + return null; + } + } + ); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertTrue(replace); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictGetAndReplace() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (final Integer key : keys) { + log.info("Test key: " + key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndReplace(key, 2); + + assertNull(old); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndReplace(key, 2); + + assertEquals(1, old); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndReplace(key, 2); + + assertNull(old); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndReplace(key, 2); + + assertNull(old); + + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndReplace(key, 2); + + assertEquals(3, old); + + txAsync(cache, OPTIMISTIC, SERIALIZABLE, + new IgniteClosure, Void>() { + @Override public Void apply(IgniteCache cache) { + cache.remove(key); + + return null; + } + } + ); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndReplace(key, 2); + + assertEquals(1, old); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictRemoveWithOldValue() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (final Integer key : keys) { + log.info("Test key: " + key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 2); + + assertFalse(rmv); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 1); + + assertTrue(rmv); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 2); + + assertFalse(rmv); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 2); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 2); + + assertTrue(rmv); + + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 3); + + assertTrue(rmv); + + txAsync(cache, OPTIMISTIC, SERIALIZABLE, + new IgniteClosure, Void>() { + @Override public Void apply(IgniteCache cache) { + cache.remove(key); + + return null; + } + } + ); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 2); + + assertFalse(rmv); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 1); + + assertTrue(rmv); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictCasReplace() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (final Integer key : keys) { + log.info("Test key: " + key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 1, 2); + + assertFalse(replace); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 1, 2); + + assertTrue(replace); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 1, 2); + + assertFalse(replace); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 2); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2, 1); + + assertTrue(replace); + + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 3, 4); + + assertTrue(replace); + + txAsync(cache, OPTIMISTIC, SERIALIZABLE, + new IgniteClosure, Void>() { + @Override public Void apply(IgniteCache cache) { + cache.remove(key); + + return null; + } + } + ); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2, 3); + + assertFalse(replace); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 1, 3); + + assertTrue(replace); + + tx.commit(); + } + + checkValue(key, 3, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictRemoveReturnBoolean1() throws Exception { + txConflictRemoveReturnBoolean(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictRemoveReturnBoolean2() throws Exception { + txConflictRemoveReturnBoolean(true); + } + + /** + * @param noVal If {@code true} there is no cache value when do update in tx. + * @throws Exception If failed. + */ + private void txConflictRemoveReturnBoolean(boolean noVal) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (final Integer key : keys) { + log.info("Test key: " + key); + + if (!noVal) + cache.put(key, -1); + + if (noVal) { + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.remove(key); + + assertFalse(res); + + updateKey(cache, key, -1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, -1, cache.getName()); + } + else { + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.remove(key); + + assertTrue(res); + + txAsync(cache, PESSIMISTIC, REPEATABLE_READ, + new IgniteClosure, Void>() { + @Override public Void apply(IgniteCache cache) { + cache.remove(key); + + return null; + } + } + ); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, -1); + } + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.remove(key); + + assertTrue(res); + + updateKey(cache, key, 2); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + // Check no conflict for removeAll with single key. + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.removeAll(Collections.singleton(key)); + + txAsync(cache, PESSIMISTIC, REPEATABLE_READ, + new IgniteClosure, Void>() { + @Override public Void apply(IgniteCache cache) { + cache.remove(key); + + return null; + } + } + ); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.remove(key); + + assertTrue(res); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.remove(key); + + assertFalse(res); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + try { + cache.put(key, 1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val = cache.get(key); + + assertEquals(1, val); + + boolean res = cache.remove(key); + + assertTrue(res); + + updateKey(cache, key, 2); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictPut1() throws Exception { + txNoConflictUpdate(true, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictPut2() throws Exception { + txNoConflictUpdate(false, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictPut3() throws Exception { + txNoConflictUpdate(false, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictRemove1() throws Exception { + txNoConflictUpdate(true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictRemove2() throws Exception { + txNoConflictUpdate(false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictRemove3() throws Exception { + txNoConflictUpdate(false, true, true); + } + + /** + * @throws Exception If failed. + * @param noVal If {@code true} there is no cache value when do update in tx. + * @param rmv If {@code true} tests remove, otherwise put. + * @param getAfterUpdate If {@code true} tries to get value in tx after update. + */ + private void txNoConflictUpdate(boolean noVal, boolean rmv, boolean getAfterUpdate) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + if (!noVal) + cache.put(key, -1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + if (rmv) + cache.remove(key); + else + cache.put(key, 2); + + if (getAfterUpdate) { + Object val = cache.get(key); + + if (rmv) + assertNull(val); + else + assertEquals(2, val); + } + + if (!rmv) + updateKey(cache, key, 1); + + tx.commit(); + } + + checkValue(key, rmv ? null : 2, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 3); + + tx.commit(); + } + + checkValue(key, 3, cache.getName()); + } + + Map map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + if (rmv) + cache.removeAll(map.keySet()); + else + cache.putAll(map); + + if (getAfterUpdate) { + Map res = cache.getAll(map.keySet()); + + if (rmv) { + for (Integer key : map.keySet()) + assertNull(res.get(key)); + } + else { + for (Integer key : map.keySet()) + assertEquals(map.get(key), res.get(key)); + } + } + + txAsync(cache, PESSIMISTIC, REPEATABLE_READ, + new IgniteClosure, Void>() { + @Override public Void apply(IgniteCache cache) { + Map map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, -1); + + cache.putAll(map); + + return null; + } + } + ); + + tx.commit(); + } + + for (int i = 0; i < 100; i++) + checkValue(i, rmv ? null : i, cache.getName()); + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictContainsKey1() throws Exception { + txNoConflictContainsKey(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictContainsKey2() throws Exception { + txNoConflictContainsKey(true); + } + + /** + * @param noVal If {@code true} there is no cache value when do update in tx. + * @throws Exception If failed. + */ + private void txNoConflictContainsKey(boolean noVal) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + if (!noVal) + cache.put(key, -1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.containsKey(key); + + assertEquals(!noVal, res); + + updateKey(cache, key, 1); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.containsKey(key); + + assertTrue(res); + + updateKey(cache, key, 2); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.containsKey(key); + + assertTrue(res); + + tx.commit(); + } + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.containsKey(key); + + assertFalse(res); + + updateKey(cache, key, 3); + + tx.commit(); + } + + checkValue(key, 3, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked1() throws Exception { + Ignite ignite0 = ignite(0); + + IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + List keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture fut = lockKey(latch, cache, key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 2); + + log.info("Commit"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + fut.get(); + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 2); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked2() throws Exception { + rollbackIfLockedPartialLock(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked3() throws Exception { + rollbackIfLockedPartialLock(true); + } + + /** + * @param locKey If {@code true} gets lock for local key. + * @throws Exception If failed. + */ + private void rollbackIfLockedPartialLock(boolean locKey) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache cache = ignite0.createCache(ccfg); + + final Integer key1 = primaryKey(ignite(1).cache(cache.getName())); + final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName())); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture fut = lockKey(latch, cache, key1); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, 2); + cache.put(key2, 2); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + fut.get(); + + checkValue(key1, 1, cache.getName()); + checkValue(key2, null, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, 2); + cache.put(key2, 2); + + tx.commit(); + } + + checkValue(key1, 2, cache.getName()); + checkValue(key2, 2, cache.getName()); + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testNearCacheReaderUpdate() throws Exception { + Ignite ignite0 = ignite(0); + + IgniteCache cache0 = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + + final String cacheName = cache0.getName(); + + try { + Ignite client1 = ignite(SRVS); + Ignite client2 = ignite(SRVS + 1); + + IgniteCache cache1 = client1.createNearCache(cacheName, + new NearCacheConfiguration()); + IgniteCache cache2 = client2.createNearCache(cacheName, + new NearCacheConfiguration()); + + Integer key = primaryKey(ignite(0).cache(cacheName)); + + try (Transaction tx = client1.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + assertNull(cache1.get(key)); + cache1.put(key, 1); + + tx.commit(); + } + + try (Transaction tx = client2.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + assertEquals(1, (Object) cache2.get(key)); + cache2.put(key, 2); + + tx.commit(); + } + + try (Transaction tx = client1.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + assertEquals(2, (Object)cache1.get(key)); + cache1.put(key, 3); + + tx.commit(); + } + } + finally { + ignite0.destroyCache(cacheName); + } + } + + /** + * @throws Exception If failed. + */ + public void testRollbackNearCache1() throws Exception { + rollbackNearCacheWrite(true); + } + + /** + * @throws Exception If failed. + */ + public void testRollbackNearCache2() throws Exception { + rollbackNearCacheWrite(false); + } + + /** + * @param near If {@code true} locks entry using the same near cache. + * @throws Exception If failed. + */ + private void rollbackNearCacheWrite(boolean near) throws Exception { + Ignite ignite0 = ignite(0); + + IgniteCache cache0 = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + + final String cacheName = cache0.getName(); + + try { + Ignite ignite = ignite(SRVS); + + IgniteCache cache = ignite.createNearCache(cacheName, + new NearCacheConfiguration()); + + IgniteTransactions txs = ignite.transactions(); + + Integer key1 = primaryKey(ignite(0).cache(cacheName)); + Integer key2 = primaryKey(ignite(1).cache(cacheName)); + Integer key3 = primaryKey(ignite(2).cache(cacheName)); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture fut = null; + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, key1); + cache.put(key2, key2); + cache.put(key3, key3); + + fut = lockKey(latch, near ? cache : cache0, key2); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + assert fut != null; + + fut.get(); + + checkValue(key1, null, cacheName); + checkValue(key2, 1, cacheName); + checkValue(key3, null, cacheName); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, key1); + cache.put(key2, key2); + cache.put(key3, key3); + + tx.commit(); + } + + checkValue(key1, key1, cacheName); + checkValue(key2, key2, cacheName); + checkValue(key3, key3, cacheName); + } + finally { + ignite0.destroyCache(cacheName); + } + } + + /** + * @throws Exception If failed. + */ + public void testRollbackNearCache3() throws Exception { + rollbackNearCacheRead(true); + } + + /** + * @throws Exception If failed. + */ + public void testRollbackNearCache4() throws Exception { + rollbackNearCacheRead(false); + } + + /** + * @param near If {@code true} updates entry using the same near cache. + * @throws Exception If failed. + */ + private void rollbackNearCacheRead(boolean near) throws Exception { + Ignite ignite0 = ignite(0); + + IgniteCache cache0 = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + + final String cacheName = cache0.getName(); + + try { + Ignite ignite = ignite(SRVS); + + IgniteCache cache = ignite.createNearCache(cacheName, + new NearCacheConfiguration()); + + IgniteTransactions txs = ignite.transactions(); + + Integer key1 = primaryKey(ignite(0).cache(cacheName)); + Integer key2 = primaryKey(ignite(1).cache(cacheName)); + Integer key3 = primaryKey(ignite(2).cache(cacheName)); + + cache0.put(key1, -1); + cache0.put(key2, -1); + cache0.put(key3, -1); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.get(key1); + cache.get(key2); + cache.get(key3); + + updateKey(near ? cache : cache0, key2, -2); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key1, -1, cacheName); + checkValue(key2, -2, cacheName); + checkValue(key3, -1, cacheName); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, key1); + cache.put(key2, key2); + cache.put(key3, key3); + + tx.commit(); + } + + checkValue(key1, key1, cacheName); + checkValue(key2, key2, cacheName); + checkValue(key3, key3, cacheName); + } + finally { + ignite0.destroyCache(cacheName); + } + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTx() throws Exception { + Ignite ignite0 = ignite(0); + + final String CACHE1 = "cache1"; + final String CACHE2 = "cache2"; + + try { + CacheConfiguration ccfg1 = + cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false); + + ccfg1.setName(CACHE1); + + ignite0.createCache(ccfg1); + + CacheConfiguration ccfg2= + cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false); + + ccfg2.setName(CACHE2); + + ignite0.createCache(ccfg2); + + Integer newVal = 0; + + List keys = testKeys(ignite0.cache(CACHE1)); + + for (Ignite ignite : G.allGrids()) { + log.info("Test node: " + ignite.name()); + + IgniteCache cache1 = ignite.cache(CACHE1); + IgniteCache cache2 = ignite.cache(CACHE2); + + IgniteTransactions txs = ignite.transactions(); + + for (Integer key : keys) { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key, newVal); + cache2.put(key, newVal); + + tx.commit(); + } + + checkValue(key, newVal, CACHE1); + checkValue(key, newVal, CACHE2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key); + Object val2 = cache2.get(key); + + assertEquals(newVal, val1); + assertEquals(newVal, val2); + + tx.commit(); + } + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key, newVal + 1); + cache2.put(key, newVal + 1); + + tx.rollback(); + } + + checkValue(key, newVal, CACHE1); + checkValue(key, newVal, CACHE2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key); + Object val2 = cache2.get(key); + + assertEquals(newVal, val1); + assertEquals(newVal, val2); + + cache1.put(key, newVal + 1); + cache2.put(key, newVal + 1); + + tx.commit(); + } + + newVal++; + + checkValue(key, newVal, CACHE1); + checkValue(key, newVal, CACHE2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key, newVal); + cache2.put(-key, newVal); + + tx.commit(); + } + + checkValue(key, newVal, CACHE1); + checkValue(-key, null, CACHE1); + + checkValue(key, newVal, CACHE2); + checkValue(-key, newVal, CACHE2); + } + + newVal++; + + Integer key1 = primaryKey(ignite(0).cache(CACHE1)); + Integer key2 = primaryKey(ignite(1).cache(CACHE1)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key1, newVal); + cache1.put(key2, newVal); + + cache2.put(key1, newVal); + cache2.put(key2, newVal); + + tx.commit(); + } + + checkValue(key1, newVal, CACHE1); + checkValue(key2, newVal, CACHE1); + checkValue(key1, newVal, CACHE2); + checkValue(key2, newVal, CACHE2); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture fut = lockKey(latch, cache1, key1); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key1, newVal + 1); + cache2.put(key1, newVal + 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + fut.get(); + + checkValue(key1, 1, CACHE1); + checkValue(key1, newVal, CACHE2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key1, newVal + 1); + cache2.put(key1, newVal + 1); + + tx.commit(); + } + + newVal++; + + cache1.put(key2, newVal); + cache2.put(key2, newVal); + + checkValue(key1, newVal, CACHE1); + checkValue(key1, newVal, CACHE2); + + latch = new CountDownLatch(1); + + fut = lockKey(latch, cache1, key1); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key1, newVal + 1); + cache2.put(key2, newVal + 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + fut.get(); + + checkValue(key1, 1, CACHE1); + checkValue(key2, newVal, CACHE2); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key1); +