ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [03/21] ignite git commit: ignite-1607 WIP
Date Tue, 13 Oct 2015 07:05:36 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index dfe82d4..a620ee5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -19,9 +19,12 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
@@ -30,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.integration.CacheLoaderException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteTransactions;
@@ -42,17 +47,22 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionOptimisticException;
+import org.jsr166.ConcurrentHashMap8;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -67,6 +77,12 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
+    private static final boolean FAST = true;
+
+    /** */
+    private static Map<Integer, Integer> storeMap = new ConcurrentHashMap8<>();
+
+    /** */
     private static final int SRVS = 4;
 
     /** */
@@ -79,42 +95,793 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(SRVS);
+
+        client = true;
+
+        startGridsMultiThreaded(SRVS, CLIENTS);
+
+        client = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 5 * 60_000;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxCommitReadOnly1() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                List<Integer> keys = testKeys(cache);
+
+                for (Integer key : keys) {
+                    log.info("Test key: " + key);
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        Integer val = cache.get(key);
+
+                        assertNull(val);
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, null, cache.getName());
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        Integer val = cache.get(key);
+
+                        assertNull(val);
+
+                        tx.rollback();
+                    }
+
+                    checkValue(key, null, cache.getName());
+
+                    cache.put(key, 1);
+
+                    cache.remove(key);
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        Integer val = cache.get(key);
+
+                        assertNull(val);
+
+                        tx.commit();
+                    }
+                }
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxCommitReadOnly2() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                List<Integer> keys = testKeys(cache);
+
+                for (final Integer key : keys) {
+                    log.info("Test key: " + key);
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        Integer val = cache.get(key);
+
+                        assertNull(val);
+
+                        txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+                            new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+                                @Override
+                                public Void apply(IgniteCache<Integer, Integer> cache) {
+                                    cache.get(key);
+
+                                    return null;
+                                }
+                            }
+                        );
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, null, cache.getName());
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        Integer val = cache.get(key);
+
+                        assertNull(val);
+
+                        txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
+                            new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+                                @Override
+                                public Void apply(IgniteCache<Integer, Integer> cache) {
+                                    cache.get(key);
+
+                                    return null;
+                                }
+                            }
+                        );
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, null, cache.getName());
+                }
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxCommitReadOnlyGetAll() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                Set<Integer> keys = new HashSet<>();
+
+                for (int i = 0; i < 100; i++)
+                    keys.add(i);
+
+                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    Map<Integer, Integer> map = cache.getAll(keys);
+
+                    assertTrue(map.isEmpty());
+
+                    tx.commit();
+                }
+
+                for (Integer key : keys)
+                    checkValue(key, null, cache.getName());
+
+                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    Map<Integer, Integer> map = cache.getAll(keys);
+
+                    assertTrue(map.isEmpty());
+
+                    tx.rollback();
+                }
+
+                for (Integer key : keys)
+                    checkValue(key, null, cache.getName());
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxConflictRead1() throws Exception {
+        txConflictRead(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxConflictRead2() throws Exception {
+        txConflictRead(false);
+    }
+
+    /**
+     * @param noVal If {@code true} there is no cache value when read in tx.
+     * @throws Exception If failed.
+     */
+    private void txConflictRead(boolean noVal) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                List<Integer> keys = testKeys(cache);
+
+                for (Integer key : keys) {
+                    log.info("Test key: " + key);
+
+                    Integer expVal = null;
+
+                    if (!noVal) {
+                        expVal = -1;
+
+                        cache.put(key, expVal);
+                    }
+
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Integer val = cache.get(key);
+
+                            assertEquals(expVal, val);
+
+                            updateKey(cache, key, 1);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+
+                    checkValue(key, 1, cache.getName());
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        Object val = cache.get(key);
+
+                        assertEquals(1, val);
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, 1, cache.getName());
+                }
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxConflictReadWrite1() throws Exception {
+        txConflictReadWrite(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxConflictReadWrite2() throws Exception {
+        txConflictReadWrite(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxConflictReadRemove1() throws Exception {
+        txConflictReadWrite(true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxConflictReadRemove2() throws Exception {
+        txConflictReadWrite(false, true);
+    }
+
+    /**
+     * @param noVal If {@code true} there is no cache value when read in tx.
+     * @param rmv If {@code true} tests remove, otherwise put.
+     * @throws Exception If failed.
+     */
+    private void txConflictReadWrite(boolean noVal, boolean rmv) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                List<Integer> keys = testKeys(cache);
+
+                for (Integer key : keys) {
+                    log.info("Test key: " + key);
+
+                    Integer expVal = null;
+
+                    if (!noVal) {
+                        expVal = -1;
+
+                        cache.put(key, expVal);
+                    }
+
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Integer val = cache.get(key);
+
+                            assertEquals(expVal, val);
+
+                            updateKey(cache, key, 1);
+
+                            if (rmv)
+                                cache.remove(key);
+                            else
+                                cache.put(key, 2);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+
+                    checkValue(key, 1, cache.getName());
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        Integer val = cache.get(key);
+
+                        assertEquals(1, (Object) val);
+
+                        if (rmv)
+                            cache.remove(key);
+                        else
+                            cache.put(key, 2);
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, rmv ? null : 2, cache.getName());
+                }
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testTxConflictGetAndPut1() throws Exception {
+        txConflictGetAndPut(true, false);
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testTxConflictGetAndPut2() throws Exception {
+        txConflictGetAndPut(false, false);
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testTxConflictGetAndRemove1() throws Exception {
+        txConflictGetAndPut(true, true);
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testTxConflictGetAndRemove2() throws Exception {
+        txConflictGetAndPut(false, true);
+    }
+
+    /**
+     * @param noVal If {@code true} there is no cache value when read in tx.
+     * @param rmv If {@code true} tests remove, otherwise put.
+     * @throws Exception If failed.
+     */
+    private void txConflictGetAndPut(boolean noVal, boolean rmv) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                List<Integer> keys = testKeys(cache);
+
+                for (Integer key : keys) {
+                    log.info("Test key: " + key);
+
+                    Integer expVal = null;
+
+                    if (!noVal) {
+                        expVal = -1;
+
+                        cache.put(key, expVal);
+                    }
+
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Integer val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2);
+
+                            assertEquals(expVal, val);
+
+                            updateKey(cache, key, 1);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+
+                    checkValue(key, 1, cache.getName());
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        Object val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2);
+
+                        assertEquals(1, val);
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, rmv ? null : 2, cache.getName());
+                }
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testTxConflictInvoke1() throws Exception {
+        txConflictInvoke(true, false);
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testTxConflictInvoke2() throws Exception {
+        txConflictInvoke(false, false);
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testTxConflictInvoke3() throws Exception {
+        txConflictInvoke(true, true);
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testTxConflictInvoke4() throws Exception {
+        txConflictInvoke(false, true);
+    }
+
+    /**
+     * @param noVal If {@code true} there is no cache value when read in tx.
+     * @param rmv If {@code true} invoke does remove value, otherwise put.
+     * @throws Exception If failed.
+     */
+    private void txConflictInvoke(boolean noVal, boolean rmv) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                List<Integer> keys = testKeys(cache);
+
+                for (Integer key : keys) {
+                    log.info("Test key: " + key);
+
+                    Integer expVal = null;
+
+                    if (!noVal) {
+                        expVal = -1;
+
+                        cache.put(key, expVal);
+                    }
+
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Integer val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2));
+
+                            assertEquals(expVal, val);
+
+                            updateKey(cache, key, 1);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+
+                    checkValue(key, 1, cache.getName());
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        Object val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2));
+
+                        assertEquals(1, val);
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, rmv ? null : 2, cache.getName());
+                }
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxConflictPutIfAbsent() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                List<Integer> keys = testKeys(cache);
+
+                for (Integer key : keys) {
+                    log.info("Test key: " + key);
+
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            boolean put = cache.putIfAbsent(key, 2);
+
+                            assertTrue(put);
+
+                            updateKey(cache, key, 1);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+
+                    checkValue(key, 1, cache.getName());
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        boolean put = cache.putIfAbsent(key, 2);
+
+                        assertFalse(put);
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, 1, cache.getName());
+
+                    cache.remove(key);
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        boolean put = cache.putIfAbsent(key, 2);
+
+                        assertTrue(put);
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, 2, cache.getName());
+
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            boolean put = cache.putIfAbsent(key, 2);
+
+                            assertFalse(put);
+
+                            updateKey(cache, key, 3);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+
+                    checkValue(key, 3, cache.getName());
+                }
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxConflictReplace() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                List<Integer> keys = testKeys(cache);
+
+                for (final Integer key : keys) {
+                    log.info("Test key: " + key);
+
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            boolean replace = cache.replace(key, 2);
+
+                            assertFalse(replace);
+
+                            updateKey(cache, key, 1);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+
+                    checkValue(key, 1, cache.getName());
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        boolean replace = cache.replace(key, 2);
+
+                        assertTrue(replace);
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, 2, cache.getName());
+
+                    cache.remove(key);
+
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        boolean replace = cache.replace(key, 2);
+
+                        assertFalse(replace);
+
+                        tx.commit();
+                    }
+
+                    checkValue(key, null, cache.getName());
+
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            boolean replace = cache.replace(key, 2);
+
+                            assertFalse(replace);
+
+                            updateKey(cache, key, 3);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+
+                    checkValue(key, 3, cache.getName());
+
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            boolean replace = cache.replace(key, 2);
+
+                            assertTrue(replace);
 
-        cfg.setClientMode(client);
+                            txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+                                new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+                                    @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+                                        cache.remove(key);
 
-        return cfg;
-    }
+                                        return null;
+                                    }
+                                }
+                            );
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
+                            tx.commit();
+                        }
 
-        startGridsMultiThreaded(SRVS);
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
 
-        client = true;
+                    checkValue(key, null, cache.getName());
 
-        startGridsMultiThreaded(SRVS, CLIENTS);
+                    cache.put(key, 1);
 
-        client = false;
-    }
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        boolean replace = cache.replace(key, 2);
 
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        super.afterTestsStopped();
+                        assertTrue(replace);
 
-        stopAllGrids();
-    }
+                        tx.commit();
+                    }
 
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 5 * 60_000;
+                    checkValue(key, 2, cache.getName());
+                }
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testTxCommitReadOnly() throws Exception {
+    public void testTxConflictGetAndReplace() throws Exception {
         Ignite ignite0 = ignite(0);
 
         final IgniteTransactions txs = ignite0.transactions();
@@ -125,99 +892,86 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
             try {
                 IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
 
-                List<Integer> keys = new ArrayList<>();
+                List<Integer> keys = testKeys(cache);
 
-                keys.add(nearKey(cache));
-                keys.add(primaryKey(cache));
+                for (final Integer key : keys) {
+                    log.info("Test key: " + key);
 
-                if (ccfg.getBackups() != 0)
-                    keys.add(backupKey(cache));
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Object old = cache.getAndReplace(key, 2);
 
-                for (Integer key : keys) {
-                    log.info("Test key: " + key);
+                            assertNull(old);
 
-                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                        Integer val = cache.get(key);
+                            updateKey(cache, key, 1);
 
-                        assertNull(val);
+                            tx.commit();
+                        }
 
-                        tx.commit();
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
                     }
 
-                    checkValue(key, null, ccfg.getName());
+                    checkValue(key, 1, cache.getName());
 
                     try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                        Integer val = cache.get(key);
+                        Object old = cache.getAndReplace(key, 2);
 
-                        assertNull(val);
+                        assertEquals(1, old);
 
-                        tx.rollback();
+                        tx.commit();
                     }
 
-                    checkValue(key, null, ccfg.getName());
-                }
-            }
-            finally {
-                ignite0.destroyCache(ccfg.getName());
-            }
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxRollbackRead1() throws Exception {
-        txRollbackRead(true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxRollbackRead2() throws Exception {
-        txRollbackRead(false);
-    }
-
-    /**
-     * @param noVal If {@code true} there is no cache value when read in tx.
-     * @throws Exception If failed.
-     */
-    private void txRollbackRead(boolean noVal) throws Exception {
-        Ignite ignite0 = ignite(0);
-
-        final IgniteTransactions txs = ignite0.transactions();
+                    checkValue(key, 2, cache.getName());
 
-        List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
+                    cache.remove(key);
 
-        ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false));
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        Object old = cache.getAndReplace(key, 2);
 
-        for (CacheConfiguration<Integer, Integer> ccfg : ccfgs) {
-            logCacheInfo(ccfg);
+                        assertNull(old);
 
-            try {
-                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+                        tx.commit();
+                    }
 
-                List<Integer> keys = new ArrayList<>();
+                    checkValue(key, null, cache.getName());
 
-                keys.add(nearKey(cache));
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Object old = cache.getAndReplace(key, 2);
 
-                for (Integer key : keys) {
-                    log.info("Test key: " + key);
+                            assertNull(old);
 
-                    Integer expVal = null;
+                            updateKey(cache, key, 3);
 
-                    if (!noVal) {
-                        expVal = -1;
+                            tx.commit();
+                        }
 
-                        cache.put(key, expVal);
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
                     }
 
+                    checkValue(key, 3, cache.getName());
+
                     try {
                         try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                            Integer val = cache.get(key);
+                            Object old = cache.getAndReplace(key, 2);
 
-                            assertEquals(expVal, val);
+                            assertEquals(3, old);
 
-                            updateKey(cache, key, 1);
+                            txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+                                new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+                                    @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+                                        cache.remove(key);
+
+                                        return null;
+                                    }
+                                }
+                            );
 
                             tx.commit();
                         }
@@ -228,21 +982,23 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
                         log.info("Expected exception: " + e);
                     }
 
-                    assertEquals(1, (Object)cache.get(key));
+                    checkValue(key, null, cache.getName());
+
+                    cache.put(key, 1);
 
                     try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                        Object val = cache.get(key);
+                        Object old = cache.getAndReplace(key, 2);
 
-                        assertEquals(1, val);
+                        assertEquals(1, old);
 
                         tx.commit();
                     }
 
-                    assertEquals(1, (Object)cache.get(key));
+                    checkValue(key, 2, cache.getName());
                 }
             }
             finally {
-                ignite0.destroyCache(ccfg.getName());
+                destroyCache(ignite0, ccfg.getName());
             }
         }
     }
@@ -250,46 +1006,113 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void _testTxRollbackReadWrite() throws Exception {
+    public void testTxNoConflictPut1() throws Exception {
+        txNoConflictUpdate(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoConflictPut2() throws Exception {
+        txNoConflictUpdate(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoConflictRemove1() throws Exception {
+        txNoConflictUpdate(true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoConflictRemove2() throws Exception {
+        txNoConflictUpdate(false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     * @param noVal If {@code true} there is no cache value when do update in tx.
+     * @param rmv If {@code true} tests remove, otherwise put.
+     */
+    private void txNoConflictUpdate(boolean noVal, boolean rmv) throws Exception {
         Ignite ignite0 = ignite(0);
 
         final IgniteTransactions txs = ignite0.transactions();
 
-        final IgniteCache<Integer, Integer> cache =
-            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
 
-        final Integer key = nearKey(cache);
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
 
-        try {
-            try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                Integer val = cache.get(key);
+                List<Integer> keys = testKeys(cache);
 
-                assertNull(val);
+                for (Integer key : keys) {
+                    log.info("Test key: " + key);
 
-                updateKey(cache, key, 1);
+                    if (!noVal)
+                        cache.put(key, -1);
 
-                cache.put(key, 2);
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        if (rmv)
+                            cache.remove(key);
+                        else
+                            cache.put(key, 2);
 
-                log.info("Commit");
+                        updateKey(cache, key, 1);
 
-                tx.commit();
-            }
+                        tx.commit();
+                    }
 
-            fail();
-        }
-        catch (TransactionOptimisticException e) {
-            log.info("Expected exception: " + e);
-        }
+                    checkValue(key, rmv ? null : 2, cache.getName());
 
-        assertEquals(1, (Object)cache.get(key));
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        cache.put(key, 3);
 
-        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-            cache.put(key, 2);
+                        tx.commit();
+                    }
 
-            tx.commit();
-        }
+                    checkValue(key, 3, cache.getName());
+                }
+
+                Map<Integer, Integer> map = new HashMap<>();
+
+                for (int i = 0; i < 100; i++)
+                    map.put(i, i);
+
+                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    if (rmv)
+                        cache.removeAll(map.keySet());
+                    else
+                        cache.putAll(map);
+
+                    txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
+                        new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+                            @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+                                Map<Integer, Integer> map = new HashMap<>();
+
+                                for (int i = 0; i < 100; i++)
+                                    map.put(i, -1);
+
+                                cache.putAll(map);
 
-        assertEquals(2, (Object) cache.get(key));
+                                return null;
+                            }
+                        }
+                    );
+
+                    tx.commit();
+                }
+
+                for (int i = 0; i < 100; i++)
+                    checkValue(i, rmv ? null : i, cache.getName());
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
     }
 
     /**
@@ -306,43 +1129,47 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
             try {
                 IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
 
-                final Integer key = nearKey(cache);
+                List<Integer> keys = testKeys(cache);
 
-                CountDownLatch latch = new CountDownLatch(1);
+                for (Integer key : keys) {
+                    log.info("Test key: " + key);
 
-                IgniteInternalFuture<?> fut = lockKey(latch, cache, key);
+                    CountDownLatch latch = new CountDownLatch(1);
 
-                try {
-                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                        cache.put(key, 2);
+                    IgniteInternalFuture<?> fut = lockKey(latch, cache, key);
 
-                        log.info("Commit");
+                    try {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            cache.put(key, 2);
 
-                        tx.commit();
+                            log.info("Commit");
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
                     }
 
-                    fail();
-                }
-                catch (TransactionOptimisticException e) {
-                    log.info("Expected exception: " + e);
-                }
+                    latch.countDown();
 
-                latch.countDown();
+                    fut.get();
 
-                fut.get();
+                    checkValue(key, 1, cache.getName());
 
-                assertEquals(1, (Object)cache.get(key));
+                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        cache.put(key, 2);
 
-                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
-                    cache.put(key, 2);
+                        tx.commit();
+                    }
 
-                    tx.commit();
+                    checkValue(key, 2, cache.getName());
                 }
-
-                assertEquals(2, (Object)cache.get(key));
             }
             finally {
-                ignite0.destroyCache(ccfg.getName());
+                destroyCache(ignite0, ccfg.getName());
             }
         }
     }
@@ -365,7 +1192,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
      * @param locKey If {@code true} gets lock for local key.
      * @throws Exception If failed.
      */
-    public void rollbackIfLockedPartialLock(boolean locKey) throws Exception {
+    private void rollbackIfLockedPartialLock(boolean locKey) throws Exception {
         Ignite ignite0 = ignite(0);
 
         final IgniteTransactions txs = ignite0.transactions();
@@ -388,8 +1215,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
                         cache.put(key1, 2);
                         cache.put(key2, 2);
 
-                        log.info("Commit2");
-
                         tx.commit();
                     }
 
@@ -403,23 +1228,21 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
 
                 fut.get();
 
-                assertEquals(1, (Object) cache.get(key1));
-                assertNull(cache.get(key2));
+                checkValue(key1, 1, cache.getName());
+                checkValue(key2, null, cache.getName());
 
                 try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
                     cache.put(key1, 2);
                     cache.put(key2, 2);
 
-                    log.info("Commit3");
-
                     tx.commit();
                 }
 
-                assertEquals(2, (Object) cache.get(key2));
-                assertEquals(2, (Object) cache.get(key2));
+                checkValue(key1, 2, cache.getName());
+                checkValue(key2, 2, cache.getName());
             }
             finally {
-                ignite0.destroyCache(ccfg.getName());
+                destroyCache(ignite0, ccfg.getName());
             }
         }
     }
@@ -511,7 +1334,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
                     });
                 }
 
-                for (int i = 0; i < 10; i++) {
+                int ITERS = FAST ? 1 : 10;
+
+                for (int i = 0; i < ITERS; i++) {
                     log.info("Iteration: " + i);
 
                     final long stopTime = U.currentTimeMillis() + 10_000;
@@ -594,7 +1419,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
             }
         }
         finally {
-            ignite(1).destroyCache(cacheName);
+            destroyCache(ignite(1), cacheName);
         }
     }
 
@@ -608,11 +1433,13 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
         ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false));
         ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
         ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, false));
+        ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0, false, false));
 
         // Store, no near.
         ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, false));
         ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false));
         ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, false));
+        ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0, true, false));
 
         // No store, near.
         ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, true));
@@ -640,32 +1467,74 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
 
     /**
      * @param cache Cache.
-     * @param key Key.
-     * @param val Value.
+     * @return Test keys.
      * @throws Exception If failed.
      */
-    private void updateKey(
-        final IgniteCache<Integer, Integer> cache,
-        final Integer key,
-        final Integer val) throws Exception {
+    private List<Integer> testKeys(IgniteCache<Integer, Integer> cache) throws Exception {
+        CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        List<Integer> keys = new ArrayList<>();
+
+        if (ccfg.getCacheMode() == PARTITIONED)
+            keys.add(nearKey(cache));
+
+        keys.add(primaryKey(cache));
+
+        if (ccfg.getBackups() != 0)
+            keys.add(backupKey(cache));
+
+        return keys;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolcation.
+     * @param c Closure to run in transaction.
+     * @throws Exception If failed.
+     */
+    private void txAsync(final IgniteCache<Integer, Integer> cache,
+        final TransactionConcurrency concurrency,
+        final TransactionIsolation isolation,
+        final IgniteClosure<IgniteCache<Integer, Integer>, Void> c) throws Exception {
         IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
-            @Override public Void call() throws Exception {
+            @Override
+            public Void call() throws Exception {
                 IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
 
-                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                    cache.put(key, val);
+                try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                    c.apply(cache);
 
                     tx.commit();
                 }
 
                 return null;
             }
-        }, "update-thread");
+        }, "async-thread");
 
         fut.get();
     }
 
     /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param val Value.
+     * @throws Exception If failed.
+     */
+    private void updateKey(
+        final IgniteCache<Integer, Integer> cache,
+        final Integer key,
+        final Integer val) throws Exception {
+        txAsync(cache, PESSIMISTIC, REPEATABLE_READ, new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+            @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+                cache.put(key, val);
+
+                return null;
+            }
+        });
+    }
+
+    /**
      * @param key Key.
      * @param expVal Expected value.
      * @param cacheName Cache name.
@@ -719,6 +1588,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param ignite Node.
+     * @param cacheName Cache name.
+     */
+    private void destroyCache(Ignite ignite, String cacheName) {
+        storeMap.clear();
+
+        ignite.destroyCache(cacheName);
+    }
+
+    /**
      * @param cacheMode Cache mode.
      * @param syncMode Write synchronization mode.
      * @param backups Number of backups.
@@ -736,9 +1615,11 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
 
         ccfg.setCacheMode(cacheMode);
         ccfg.setAtomicityMode(TRANSACTIONAL);
-        ccfg.setBackups(backups);
         ccfg.setWriteSynchronizationMode(syncMode);
 
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
         if (storeEnabled) {
             ccfg.setCacheStoreFactory(new TestStoreFactory());
             ccfg.setWriteThrough(true);
@@ -759,17 +1640,44 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
         @Override public CacheStore<Integer, Integer> create() {
             return new CacheStoreAdapter<Integer, Integer>() {
                 @Override public Integer load(Integer key) throws CacheLoaderException {
-                    return null;
+                    return storeMap.get(key);
                 }
 
                 @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
-                    // No-op.
+                    storeMap.put(entry.getKey(), entry.getValue());
                 }
 
                 @Override public void delete(Object key) {
-                    // No-op.
+                    storeMap.remove(key);
                 }
             };
         }
     }
+
+    /**
+     * Sets given value, returns old value.
+     */
+    public static final class SetValueProcessor implements EntryProcessor<Integer, Integer, Integer> {
+        /** */
+        private Integer newVal;
+
+        /**
+         * @param newVal New value to set.
+         */
+        SetValueProcessor(Integer newVal) {
+            this.newVal = newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) {
+            Integer val = entry.getValue();
+
+            if (newVal == null)
+                entry.remove();
+            else
+                entry.setValue(newVal);
+
+            return val;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index e46c139..6b2a6c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -442,7 +442,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
     }
 
     /** @inheritDoc */
-    @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(boolean readSwap,
+    @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+        IgniteInternalTx tx,
+        boolean readSwap,
         boolean unmarshal,
         boolean updateMetrics,
         boolean evt,


Mime
View raw message