ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [16/46] incubator-ignite git commit: GG-9141 - Renaming
Date Sun, 21 Dec 2014 23:04:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java
new file mode 100644
index 0000000..58c3333
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java
@@ -0,0 +1,484 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.cache.affinity.*;
+import org.gridgain.grid.kernal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+/**
+ * Tests for local transactions.
+ */
+@SuppressWarnings( {"BusyWait"})
+abstract class IgniteTxAbstractTest extends GridCommonAbstractTest {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Execution count. */
+    private static final AtomicInteger cntr = new AtomicInteger();
+
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     * Start grid by default.
+     */
+    protected IgniteTxAbstractTest() {
+        super(false /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        return c;
+    }
+
+    /**
+     * @return Grid count.
+     */
+    protected abstract int gridCount();
+
+    /**
+     * @return Key count.
+     */
+    protected abstract int keyCount();
+
+    /**
+     * @return Maximum key value.
+     */
+    protected abstract int maxKeyValue();
+
+    /**
+     * @return Thread iterations.
+     */
+    protected abstract int iterations();
+
+    /**
+     * @return True if in-test logging is enabled.
+     */
+    protected abstract boolean isTestDebug();
+
+    /**
+     * @return {@code True} if memory stats should be printed.
+     */
+    protected abstract boolean printMemoryStats();
+
+    /** {@inheritDoc} */
+    private void debug(String msg) {
+        if (isTestDebug())
+            info(msg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 0; i < gridCount(); i++)
+            startGrid(i);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @param i Grid index.
+     * @return Cache.
+     */
+    @SuppressWarnings("unchecked")
+    @Override protected GridCache<Integer, String> cache(int i) {
+        return grid(i).cache(null);
+    }
+
+    /**
+     * @return Keys.
+     */
+    protected Iterable<Integer> getKeys() {
+        List<Integer> keys = new ArrayList<>(keyCount());
+
+        for (int i = 0; i < keyCount(); i++)
+            keys.add(RAND.nextInt(maxKeyValue()) + 1);
+
+        Collections.sort(keys);
+
+        return Collections.unmodifiableList(keys);
+    }
+
+    /**
+     * @return Random cache operation.
+     */
+    protected OP getOp() {
+        switch (RAND.nextInt(3)) {
+            case 0: { return OP.READ; }
+            case 1: { return OP.WRITE; }
+            case 2: { return OP.REMOVE; }
+
+            // Should never be reached.
+            default: { assert false; return null; }
+        }
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws Exception If check failed.
+     */
+    protected void checkCommit(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation) throws Exception {
+        int gridIdx = RAND.nextInt(gridCount());
+
+        Ignite ignite = grid(gridIdx);
+
+        if (isTestDebug())
+            debug("Checking commit on grid: " + ignite.cluster().localNode().id());
+
+        for (int i = 0; i < iterations(); i++) {
+            GridCache<Integer, String> cache = cache(gridIdx);
+
+            IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0);
+
+            try {
+                int prevKey = -1;
+
+                for (Integer key : getKeys()) {
+                    // Make sure we have the same locking order for all concurrent transactions.
+                    assert key >= prevKey : "key: " + key + ", prevKey: " + prevKey;
+
+                    if (isTestDebug()) {
+                        GridCacheAffinityFunction aff = cache.configuration().getAffinity();
+
+                        int part = aff.partition(key);
+
+                        debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" +
+                            U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']');
+                    }
+
+                    String val = Integer.toString(key);
+
+                    switch (getOp()) {
+                        case READ: {
+                            if (isTestDebug())
+                                debug("Reading key [key=" + key + ", i=" + i + ']');
+
+                            val = cache.get(key);
+
+                            if (isTestDebug())
+                                debug("Read value for key [key=" + key + ", val=" + val + ']');
+
+                            break;
+                        }
+
+                        case WRITE: {
+                            if (isTestDebug())
+                                debug("Writing key and value [key=" + key + ", val=" + val + ", i=" + i + ']');
+
+                            cache.put(key, val);
+
+                            break;
+                        }
+
+                        case REMOVE: {
+                            if (isTestDebug())
+                                debug("Removing key [key=" + key + ", i=" + i  + ']');
+
+                            cache.remove(key);
+
+                            break;
+                        }
+
+                        default: { assert false; }
+                    }
+                }
+
+                tx.commit();
+
+                if (isTestDebug())
+                    debug("Committed transaction [i=" + i + ", tx=" + tx + ']');
+            }
+            catch (GridCacheTxOptimisticException e) {
+                if (concurrency != OPTIMISTIC || isolation != SERIALIZABLE) {
+                    error("Received invalid optimistic failure.", e);
+
+                    throw e;
+                }
+
+                if (isTestDebug())
+                    info("Optimistic transaction failure (will rollback) [i=" + i + ", msg=" + e.getMessage() +
+                        ", tx=" + tx.xid() + ']');
+
+                try {
+                    tx.rollback();
+                }
+                catch (IgniteCheckedException ex) {
+                    error("Failed to rollback optimistic failure: " + tx, ex);
+
+                    throw ex;
+                }
+            }
+            catch (Exception e) {
+                error("Transaction failed (will rollback): " + tx, e);
+
+                tx.rollback();
+
+                throw e;
+            }
+            catch (Error e) {
+                error("Error when executing transaction (will rollback): " + tx, e);
+
+                tx.rollback();
+
+                throw e;
+            }
+            finally {
+                IgniteTx t = cache.tx();
+
+                assert t == null : "Thread should not have transaction upon completion ['t==tx'=" + (t == tx) +
+                    ", t=" + t + (t != tx ? "tx=" + tx : "tx=''") + ']';
+            }
+        }
+
+        if (printMemoryStats()) {
+            if (cntr.getAndIncrement() % 100 == 0)
+                // Print transaction memory stats.
+                ((GridKernal)grid(gridIdx)).internalCache().context().tm().printMemoryStats();
+        }
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws IgniteCheckedException If check failed.
+     */
+    protected void checkRollback(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation)
+        throws Exception {
+        checkRollback(new ConcurrentHashMap<Integer, String>(), concurrency, isolation);
+    }
+
+    /**
+     * @param map Map to check.
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws IgniteCheckedException If check failed.
+     */
+    protected void checkRollback(ConcurrentMap<Integer, String> map, GridCacheTxConcurrency concurrency,
+        GridCacheTxIsolation isolation) throws Exception {
+        int gridIdx = RAND.nextInt(gridCount());
+
+        Ignite ignite = grid(gridIdx);
+
+        if (isTestDebug())
+            debug("Checking commit on grid: " + ignite.cluster().localNode().id());
+
+        for (int i = 0; i < iterations(); i++) {
+            GridCache<Integer, String> cache = cache(gridIdx);
+
+            IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0);
+
+            try {
+                for (Integer key : getKeys()) {
+                    if (isTestDebug()) {
+                        GridCacheAffinityFunction aff = cache.configuration().getAffinity();
+
+                        int part = aff.partition(key);
+
+                        debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" +
+                            U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']');
+                    }
+
+                    String val = Integer.toString(key);
+
+                    switch (getOp()) {
+                        case READ: {
+                            debug("Reading key: " + key);
+
+                            checkMap(map, key, cache.get(key));
+
+                            break;
+                        }
+
+                        case WRITE: {
+                            debug("Writing key and value [key=" + key + ", val=" + val + ']');
+
+                            checkMap(map, key, cache.put(key, val));
+
+                            break;
+                        }
+
+                        case REMOVE: {
+                            debug("Removing key: " + key);
+
+                            checkMap(map, key, cache.remove(key));
+
+                            break;
+                        }
+
+                        default: { assert false; }
+                    }
+                }
+
+                tx.rollback();
+
+                debug("Rolled back transaction: " + tx);
+            }
+            catch (GridCacheTxOptimisticException e) {
+                tx.rollback();
+
+                log.warning("Rolled back transaction due to optimistic exception [tx=" + tx + ", e=" + e + ']');
+
+                throw e;
+            }
+            catch (Exception e) {
+                tx.rollback();
+
+                error("Rolled back transaction due to exception [tx=" + tx + ", e=" + e + ']');
+
+                throw e;
+            }
+            finally {
+                IgniteTx t1 = cache.tx();
+
+                debug("t1=" + t1);
+
+                assert t1 == null : "Thread should not have transaction upon completion ['t==tx'=" + (t1 == tx) +
+                    ", t=" + t1 + ']';
+            }
+        }
+    }
+
+    /**
+     * @param map Map to check against.
+     * @param key Key.
+     * @param val Value.
+     */
+    private void checkMap(ConcurrentMap<Integer, String> map, Integer key, String val) {
+        if (val != null) {
+            String v = map.putIfAbsent(key, val);
+
+            assert v == null || v.equals(val);
+        }
+    }
+
+    /**
+     * Checks integrity of all caches after tests.
+     *
+     * @throws IgniteCheckedException If check failed.
+     */
+    @SuppressWarnings({"ErrorNotRethrown"})
+    protected void finalChecks() throws Exception {
+        for (int i = 1; i <= maxKeyValue(); i++) {
+            for (int k = 0; k < 3; k++) {
+                try {
+                    GridCacheEntry<Integer, String> e1 = null;
+
+                    String v1 = null;
+
+                    for (int j = 0; j < gridCount(); j++) {
+                        GridCache<Integer, String> cache = cache(j);
+
+                        IgniteTx tx = cache.tx();
+
+                        assertNull("Transaction is not completed: " + tx, tx);
+
+                        if (j == 0) {
+                            e1 = cache.entry(i);
+
+                            v1 = e1.get();
+                        }
+                        else {
+                            GridCacheEntry<Integer, String> e2 = cache.entry(i);
+
+                            String v2 = e2.get();
+
+                            if (!F.eq(v2, v1)) {
+                                v1 = e1.get();
+                                v2 = e2.get();
+                            }
+
+                            assert F.eq(v2, v1) :
+                                "Invalid cached value [key=" + i + ", v1=" + v1 + ", v2=" + v2 + ", e1=" + e1 +
+                                    ", e2=" + e2 + ", grid=" + j + ']';
+                        }
+                    }
+
+                    break;
+                }
+                catch (AssertionError e) {
+                    if (k == 2)
+                        throw e;
+                    else
+                        // Wait for transactions to complete.
+                        Thread.sleep(500);
+                }
+            }
+        }
+
+        for (int i = 1; i <= maxKeyValue(); i++) {
+            for (int k = 0; k < 3; k++) {
+                try {
+                    for (int j = 0; j < gridCount(); j++) {
+                        GridCacheProjection<Integer, String> cache = cache(j);
+
+                        cache.removeAll();
+
+//                        assert cache.keySet().isEmpty() : "Cache is not empty: " + cache.entrySet();
+                    }
+
+                    break;
+                }
+                catch (AssertionError e) {
+                    if (k == 2)
+                        throw e;
+                    else
+                        // Wait for transactions to complete.
+                        Thread.sleep(500);
+                }
+            }
+        }
+    }
+
+    /**
+     * Cache operation.
+     */
+    protected enum OP {
+        /** Cache read. */
+        READ,
+
+        /** Cache write. */
+        WRITE,
+
+        /** Cache remove. */
+        REMOVE
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java
new file mode 100644
index 0000000..9548cd0
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java
@@ -0,0 +1,134 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+/**
+ * Checks multithreaded put/get cache operations on one node.
+ */
+public abstract class IgniteTxConcurrentGetAbstractTest extends GridCommonAbstractTest {
+    /** Debug flag. */
+    private static final boolean DEBUG = false;
+
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int THREAD_NUM = 20;
+
+    /**
+     * Default constructor.
+     *
+     */
+    protected IgniteTxConcurrentGetAbstractTest() {
+        super(true /** Start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        return cfg;
+    }
+
+    /**
+     * @param g Grid.
+     * @return Near cache.
+     */
+    GridNearCacheAdapter<String, Integer> near(Ignite g) {
+        return (GridNearCacheAdapter<String, Integer>)((GridKernal)g).<String, Integer>internalCache();
+    }
+
+    /**
+     * @param g Grid.
+     * @return DHT cache.
+     */
+    GridDhtCacheAdapter<String, Integer> dht(Ignite g) {
+        return near(g).dht();
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutGet() throws Exception {
+        // Random key.
+        final String key = UUID.randomUUID().toString();
+
+        final Ignite ignite = grid();
+
+        ignite.cache(null).put(key, "val");
+
+        GridCacheEntryEx<String,Integer> dhtEntry = dht(ignite).peekEx(key);
+
+        if (DEBUG)
+            info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", entry=" + dhtEntry + ']');
+
+        String val = txGet(ignite, key);
+
+        assertNotNull(val);
+
+        info("Starting threads: " + THREAD_NUM);
+
+        multithreaded(new Callable<String>() {
+            @Override public String call() throws Exception {
+                return txGet(ignite, key);
+            }
+        }, THREAD_NUM, "getter-thread");
+    }
+
+    /**
+     * @param ignite Grid.
+     * @param key Key.
+     * @return Value.
+     * @throws Exception If failed.
+     */
+    private String txGet(Ignite ignite, String key) throws Exception {
+        try (IgniteTx tx = ignite.cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            GridCacheEntryEx<String, Integer> dhtEntry = dht(ignite).peekEx(key);
+
+            if (DEBUG)
+                info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", xid=" + tx.xid() +
+                    ", entry=" + dhtEntry + ']');
+
+            String val = ignite.<String, String>cache(null).get(key);
+
+            assertNotNull(val);
+            assertEquals("val", val);
+
+            tx.commit();
+
+            return val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
new file mode 100644
index 0000000..7b39975
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
@@ -0,0 +1,631 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.indexing.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ * Tests that transaction is invalidated in case of {@link GridCacheTxHeuristicException}.
+ */
+public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstractSelfTest {
+    /** Index SPI throwing exception. */
+    private static TestIndexingSpi idxSpi = new TestIndexingSpi();
+
+    /** */
+    private static final int PRIMARY = 0;
+
+    /** */
+    private static final int BACKUP = 1;
+
+    /** */
+    private static final int NOT_PRIMARY_AND_BACKUP = 2;
+
+    /** */
+    private static Integer lastKey;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setIndexingSpi(idxSpi);
+
+        cfg.getTransactionsConfiguration().setTxSerializableEnabled(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        GridCacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        ccfg.setQueryIndexEnabled(true);
+        ccfg.setStore(null);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        lastKey = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        idxSpi.forceFail(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutNear() throws Exception {
+        checkPut(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+        checkPut(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutPrimary() throws Exception {
+        checkPut(true, keyForNode(grid(0).localNode(), PRIMARY));
+
+        checkPut(false, keyForNode(grid(0).localNode(), PRIMARY));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutBackup() throws Exception {
+        checkPut(true, keyForNode(grid(0).localNode(), BACKUP));
+
+        checkPut(false, keyForNode(grid(0).localNode(), BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAll() throws Exception {
+        checkPutAll(true, keyForNode(grid(0).localNode(), PRIMARY),
+            keyForNode(grid(0).localNode(), PRIMARY),
+            keyForNode(grid(0).localNode(), PRIMARY));
+
+        checkPutAll(false, keyForNode(grid(0).localNode(), PRIMARY),
+            keyForNode(grid(0).localNode(), PRIMARY),
+            keyForNode(grid(0).localNode(), PRIMARY));
+
+        if (gridCount() > 1) {
+            checkPutAll(true, keyForNode(grid(1).localNode(), PRIMARY),
+                keyForNode(grid(1).localNode(), PRIMARY),
+                keyForNode(grid(1).localNode(), PRIMARY));
+
+            checkPutAll(false, keyForNode(grid(1).localNode(), PRIMARY),
+                keyForNode(grid(1).localNode(), PRIMARY),
+                keyForNode(grid(1).localNode(), PRIMARY));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoveNear() throws Exception {
+        checkRemove(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+        checkRemove(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemovePrimary() throws Exception {
+        checkRemove(false, keyForNode(grid(0).localNode(), PRIMARY));
+
+        checkRemove(true, keyForNode(grid(0).localNode(), PRIMARY));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoveBackup() throws Exception {
+        checkRemove(false, keyForNode(grid(0).localNode(), BACKUP));
+
+        checkRemove(true, keyForNode(grid(0).localNode(), BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformNear() throws Exception {
+        checkTransform(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+        checkTransform(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformPrimary() throws Exception {
+        checkTransform(false, keyForNode(grid(0).localNode(), PRIMARY));
+
+        checkTransform(true, keyForNode(grid(0).localNode(), PRIMARY));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformBackup() throws Exception {
+        checkTransform(false, keyForNode(grid(0).localNode(), BACKUP));
+
+        checkTransform(true, keyForNode(grid(0).localNode(), BACKUP));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutNearTx() throws Exception {
+        for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+            for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+                checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+                checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutPrimaryTx() throws Exception {
+        for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+            for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+                checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY));
+
+                checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY));
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutBackupTx() throws Exception {
+        for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+            for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+                checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP));
+
+                checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP));
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutMultipleKeysTx() throws Exception {
+        for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+            for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+                checkPutTx(true, concurrency, isolation,
+                    keyForNode(grid(0).localNode(), PRIMARY),
+                    keyForNode(grid(0).localNode(), PRIMARY),
+                    keyForNode(grid(0).localNode(), PRIMARY));
+
+                checkPutTx(false, concurrency, isolation,
+                    keyForNode(grid(0).localNode(), PRIMARY),
+                    keyForNode(grid(0).localNode(), PRIMARY),
+                    keyForNode(grid(0).localNode(), PRIMARY));
+
+                if (gridCount() > 1) {
+                    checkPutTx(true, concurrency, isolation,
+                        keyForNode(grid(1).localNode(), PRIMARY),
+                        keyForNode(grid(1).localNode(), PRIMARY),
+                        keyForNode(grid(1).localNode(), PRIMARY));
+
+                    checkPutTx(false, concurrency, isolation,
+                        keyForNode(grid(1).localNode(), PRIMARY),
+                        keyForNode(grid(1).localNode(), PRIMARY),
+                        keyForNode(grid(1).localNode(), PRIMARY));
+                }
+            }
+        }
+    }
+
+    /**
+     * @param putBefore If {@code true} then puts some value before executing failing operation.
+     * @param keys Keys.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void checkPutTx(boolean putBefore, GridCacheTxConcurrency concurrency,
+        GridCacheTxIsolation isolation, final Integer... keys) throws Exception {
+        assertTrue(keys.length > 0);
+
+        info("Test transaction [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+        GridCache<Integer, Integer> cache = grid(0).cache(null);
+
+        if (putBefore) {
+            idxSpi.forceFail(false);
+
+            info("Start transaction.");
+
+            try (IgniteTx tx = cache.txStart(concurrency, isolation)) {
+                for (Integer key : keys) {
+                    info("Put " + key);
+
+                    cache.put(key, 1);
+                }
+
+                info("Commit.");
+
+                tx.commit();
+            }
+        }
+
+        // Execute get from all nodes to create readers for near cache.
+        for (int i = 0; i < gridCount(); i++) {
+            for (Integer key : keys)
+                grid(i).cache(null).get(key);
+        }
+
+        idxSpi.forceFail(true);
+
+        try {
+            info("Start transaction.");
+
+            try (IgniteTx tx = cache.txStart(concurrency, isolation)) {
+                for (Integer key : keys) {
+                    info("Put " + key);
+
+                    cache.put(key, 2);
+                }
+
+                info("Commit.");
+
+                tx.commit();
+            }
+
+            fail("Transaction should fail.");
+        }
+        catch (GridCacheTxHeuristicException e) {
+            log.info("Expected exception: " + e);
+        }
+
+        for (Integer key : keys)
+            checkEmpty(key);
+    }
+
+    /**
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void checkEmpty(final Integer key) throws Exception {
+        idxSpi.forceFail(false);
+
+        info("Check key: " + key);
+
+        for (int i = 0; i < gridCount(); i++) {
+            GridKernal grid = (GridKernal) grid(i);
+
+            GridCacheAdapter cache = grid.internalCache(null);
+
+            GridCacheMapEntry entry = cache.map().getEntry(key);
+
+            log.info("Entry: " + entry);
+
+            if (entry != null) {
+                assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.lockedByAny());
+                assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.hasValue());
+            }
+
+            if (cache.isNear()) {
+                entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(key);
+
+                log.info("Dht entry: " + entry);
+
+                if (entry != null) {
+                    assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.lockedByAny());
+                    assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.hasValue());
+                }
+            }
+        }
+
+        for (int i = 0; i < gridCount(); i++)
+            assertEquals("Unexpected value for grid " + i, null, grid(i).cache(null).get(key));
+    }
+
+    /**
+     * @param putBefore If {@code true} then puts some value before executing failing operation.
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void checkPut(boolean putBefore, final Integer key) throws Exception {
+        if (putBefore) {
+            idxSpi.forceFail(false);
+
+            info("Put key: " + key);
+
+            grid(0).cache(null).put(key, 1);
+        }
+
+        // Execute get from all nodes to create readers for near cache.
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).cache(null).get(key);
+
+        idxSpi.forceFail(true);
+
+        info("Going to put: " + key);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                grid(0).cache(null).put(key, 2);
+
+                return null;
+            }
+        }, GridCacheTxHeuristicException.class, null);
+
+        checkEmpty(key);
+    }
+
+    /**
+     * @param putBefore If {@code true} then puts some value before executing failing operation.
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void checkTransform(boolean putBefore, final Integer key) throws Exception {
+        if (putBefore) {
+            idxSpi.forceFail(false);
+
+            info("Put key: " + key);
+
+            grid(0).cache(null).put(key, 1);
+        }
+
+        // Execute get from all nodes to create readers for near cache.
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).cache(null).get(key);
+
+        idxSpi.forceFail(true);
+
+        info("Going to transform: " + key);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() {
+                    @Override public Object apply(Object o) {
+                        return 2;
+                    }
+                });
+
+                return null;
+            }
+        }, GridCacheTxHeuristicException.class, null);
+
+        checkEmpty(key);
+    }
+
+    /**
+     * @param putBefore If {@code true} then puts some value before executing failing operation.
+     * @param keys Keys.
+     * @throws Exception If failed.
+     */
+    private void checkPutAll(boolean putBefore, Integer ... keys) throws Exception {
+        assert keys.length > 1;
+
+        if (putBefore) {
+            idxSpi.forceFail(false);
+
+            Map<Integer, Integer> m = new HashMap<>();
+
+            for (Integer key : keys)
+                m.put(key, 1);
+
+            info("Put data: " + m);
+
+            grid(0).cache(null).putAll(m);
+        }
+
+        // Execute get from all nodes to create readers for near cache.
+        for (int i = 0; i < gridCount(); i++) {
+            for (Integer key : keys)
+                grid(i).cache(null).get(key);
+        }
+
+        idxSpi.forceFail(true);
+
+        final Map<Integer, Integer> m = new HashMap<>();
+
+        for (Integer key : keys)
+            m.put(key, 2);
+
+        info("Going to putAll: " + m);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                grid(0).cache(null).putAll(m);
+
+                return null;
+            }
+        }, GridCacheTxHeuristicException.class, null);
+
+        for (Integer key : m.keySet())
+            checkEmpty(key);
+    }
+
+    /**
+     * @param putBefore If {@code true} then puts some value before executing failing operation.
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void checkRemove(boolean putBefore, final Integer key) throws Exception {
+        if (putBefore) {
+            idxSpi.forceFail(false);
+
+            info("Put key: " + key);
+
+            grid(0).cache(null).put(key, 1);
+        }
+
+        // Execute get from all nodes to create readers for near cache.
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).cache(null).get(key);
+
+        idxSpi.forceFail(true);
+
+        info("Going to remove: " + key);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                grid(0).cache(null).remove(key);
+
+                return null;
+            }
+        }, GridCacheTxHeuristicException.class, null);
+
+        checkEmpty(key);
+    }
+
+    /**
+     * Generates key of a given type for given node.
+     *
+     * @param node Node.
+     * @param type Key type.
+     * @return Key.
+     */
+    private Integer keyForNode(ClusterNode node, int type) {
+        GridCache<Integer, Integer> cache = grid(0).cache(null);
+
+        if (cache.configuration().getCacheMode() == LOCAL)
+            return ++lastKey;
+
+        if (cache.configuration().getCacheMode() == REPLICATED && type == NOT_PRIMARY_AND_BACKUP)
+            return ++lastKey;
+
+        for (int key = lastKey + 1; key < (lastKey + 10_000); key++) {
+            switch (type) {
+                case NOT_PRIMARY_AND_BACKUP: {
+                    if (!cache.affinity().isPrimaryOrBackup(node, key)) {
+                        lastKey = key;
+
+                        return key;
+                    }
+
+                    break;
+                }
+
+                case PRIMARY: {
+                    if (cache.affinity().isPrimary(node, key)) {
+                        lastKey = key;
+
+                        return key;
+                    }
+
+                    break;
+                }
+
+                case BACKUP: {
+                    if (cache.affinity().isBackup(node, key)) {
+                        lastKey = key;
+
+                        return key;
+                    }
+
+                    break;
+                }
+
+                default:
+                    fail();
+            }
+        }
+
+        throw new IllegalStateException("Failed to find key.");
+    }
+
+    /**
+     * Indexing SPI that can fail on demand.
+     */
+    private static class TestIndexingSpi extends IgniteSpiAdapter implements GridIndexingSpi {
+        /** Fail flag. */
+        private volatile boolean fail;
+
+        /**
+         * @param fail Fail flag.
+         */
+        public void forceFail(boolean fail) {
+            this.fail = fail;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<?> query(@Nullable String spaceName, Collection<Object> params, @Nullable GridIndexingQueryFilter filters) throws IgniteSpiException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void store(@Nullable String spaceName, Object key, Object val, long expirationTime)
+            throws IgniteSpiException {
+            if (fail) {
+                fail = false;
+
+                throw new IgniteSpiException("Test exception.");
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove(@Nullable String spaceName, Object k)
+            throws IgniteSpiException {
+            if (fail) {
+                fail = false;
+
+                throw new IgniteSpiException("Test exception.");
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUnswap(@Nullable String spaceName, Object key, Object val) throws IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStop() throws IgniteSpiException {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java
new file mode 100644
index 0000000..4822742
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java
@@ -0,0 +1,918 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.cache.affinity.*;
+import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+
+/**
+ * Checks basic multi-node transactional operations.
+ */
+@SuppressWarnings({"PointlessBooleanExpression", "ConstantConditions", "PointlessArithmeticExpression"})
+public abstract class IgniteTxMultiNodeAbstractTest extends GridCommonAbstractTest {
+    /** Debug flag. */
+    private static final boolean DEBUG = false;
+
+    /** */
+    protected static final int GRID_CNT = 4;
+
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    protected static final int RETRIES = 300;
+
+    /** Log frequency. */
+    private static final int LOG_FREQ = RETRIES < 100 || DEBUG ? 1 : RETRIES / 5;
+
+    /** Counter key. */
+    private static final String CNTR_KEY = "CNTR_KEY";
+
+    /** Removed counter key. */
+    private static final String RMVD_CNTR_KEY = "RMVD_CNTR_KEY";
+
+    /** */
+    protected static final AtomicInteger cntr = new AtomicInteger();
+
+    /** */
+    protected static final AtomicInteger cntrRmvd = new AtomicInteger();
+
+    /** Number of backups for partitioned tests. */
+    protected int backups = 2;
+
+     /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        backups = 0;
+
+        cntr.set(0);
+    }
+
+    /**
+     *
+     * @param ignite Grid
+     * @param key Key.
+     * @return Primary node id.
+     */
+    @SuppressWarnings("unchecked")
+    private static UUID primaryId(Ignite ignite, Object key) {
+        GridCacheAffinity aff = ignite.cache(null).cache().affinity();
+
+        Collection<ClusterNode> affNodes = aff.mapPartitionToPrimaryAndBackups(aff.partition(key));
+
+        ClusterNode first = F.first(affNodes);
+
+        assert first != null;
+
+        return first.id();
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param key Key.
+     * @return DHT entry.
+     */
+    @Nullable private static GridCacheEntryEx<Object, Integer> dhtEntry(UUID nodeId, Object key) {
+        Ignite g = G.ignite(nodeId);
+
+        GridDhtCacheAdapter<Object, Integer> dht =
+            ((GridKernal)g).<Object, Integer>internalCache().context().near().dht();
+
+        return dht.peekEx(key);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param key Key.
+     * @return Near entry.
+     */
+    @Nullable private static GridCacheEntryEx<Object, Integer> nearEntry(UUID nodeId, Object key) {
+        Ignite g = G.ignite(nodeId);
+
+        GridNearCacheAdapter<Object, Integer> near = ((GridKernal)g).<Object, Integer>internalCache().context().near();
+
+        return near.peekEx(key);
+    }
+
+    /**
+     *
+     * @param putCntr Put counter to cache.
+     * @param ignite Grid.
+     * @param itemKey Item key.
+     * @param retry Retry count.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void onItemNear(boolean putCntr, Ignite ignite, String itemKey, int retry) throws IgniteCheckedException {
+        GridCache<String, Integer> cache = ignite.cache(null);
+
+        UUID locId = ignite.cluster().localNode().id();
+        UUID itemPrimaryId = primaryId(ignite, itemKey);
+        UUID cntrPrimaryId = primaryId(ignite, CNTR_KEY);
+
+        boolean isCntrPrimary = cntrPrimaryId.equals(locId);
+
+        try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            if (DEBUG)
+                info("Before near get [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() +
+                    ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId +
+                    ", nearEntry=" + nearEntry(locId, CNTR_KEY) +
+                    (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']');
+
+            Integer cntr = cache.get(CNTR_KEY);
+
+            int newVal = cntr + 1;
+
+            if (putCntr) {
+                if (DEBUG)
+                    info("Before near put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+                        ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, CNTR_KEY) +
+                        (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']');
+
+                cache.putx(CNTR_KEY, newVal);
+            }
+
+            if (DEBUG)
+                info("Before near put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + ", new=" + newVal +
+                    ", nearEntry=" + nearEntry(locId, itemKey) + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+            cache.putx(itemKey, newVal);
+
+            if (DEBUG)
+                info("After near put item [retry=" + retry + ", key=" + itemKey + ", old=" + cntr + ", new=" + newVal +
+                    ", nearEntry=" + nearEntry(locId, itemKey) + ", dhtEntry" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+            tx.commit();
+        }
+    }
+
+    /**
+     *
+     * @param putCntr Put counter to cache.
+     * @param ignite Grid.
+     * @param itemKey Item key.
+     * @param retry Retry count.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void onItemPrimary(boolean putCntr, Ignite ignite, String itemKey, int retry) throws IgniteCheckedException {
+        GridCache<String, Integer> cache = ignite.cache(null);
+
+        UUID locId = ignite.cluster().localNode().id();
+        UUID itemPrimaryId = primaryId(ignite, itemKey);
+        UUID cntrPrimaryId = primaryId(ignite, CNTR_KEY);
+
+        boolean isCntrPrimary = cntrPrimaryId.equals(locId);
+
+        try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            if (DEBUG)
+                info("Before item primary get [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() +
+                    ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId +
+                    ", nearEntry=" + nearEntry(locId, CNTR_KEY) +
+                    (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']');
+
+            Integer cntr = cache.get(CNTR_KEY);
+
+            int newVal = cntr + 1;
+
+            if (putCntr) {
+                if (DEBUG)
+                    info("Before item primary put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+                        ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, CNTR_KEY) +
+                        (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']');
+
+                cache.putx(CNTR_KEY, newVal);
+            }
+
+            if (DEBUG)
+                info("Before item primary put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+                    ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) +
+                    ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+            cache.putx(itemKey, cntr);
+
+            if (DEBUG)
+                info("After item primary put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+                    ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) +
+                    ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+            tx.commit();
+        }
+    }
+
+    /**
+     *
+     * @param putCntr Put counter to cache.
+     * @param ignite Grid.
+     * @param retry Retry count.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void onRemoveItemQueried(boolean putCntr, Ignite ignite, int retry) throws IgniteCheckedException {
+        GridCache<String, Integer> cache = ignite.cache(null);
+
+        UUID locId = ignite.cluster().localNode().id();
+        UUID cntrPrimaryId = primaryId(ignite, RMVD_CNTR_KEY);
+
+        boolean isCntrPrimary = cntrPrimaryId.equals(locId);
+
+        try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            if (DEBUG)
+                ignite.log().info("Before item lock [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() +
+                    ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId +
+                    ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+                    (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']');
+
+            Integer cntr = cache.get(RMVD_CNTR_KEY);
+
+            assert cntr != null : "Received null counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+                ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+                (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']';
+
+            int newVal = cntr - 1;
+
+            if (putCntr) {
+                if (DEBUG)
+                    ignite.log().info("Before item put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+                        ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+                        (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']');
+
+                cache.putx(RMVD_CNTR_KEY, newVal);
+            }
+
+            while (true) {
+                GridCacheQuery<Map.Entry<String, Integer>> qry =
+                    cache.queries().createSqlQuery(Integer.class, "_key != 'RMVD_CNTR_KEY' and _val >= 0");
+
+                if (DEBUG)
+                    ignite.log().info("Before executing query [retry=" + retry + ", locId=" + locId +
+                        ", txId=" + tx.xid() + ']');
+
+                Map.Entry<String, Integer> entry = qry.execute().next();
+
+                if (entry == null) {
+                    ignite.log().info("*** Queue is empty.");
+
+                    return;
+                }
+
+                String itemKey = entry.getKey();
+
+                UUID itemPrimaryId = primaryId(ignite, itemKey);
+
+                // Lock the item key.
+                if (cache.get(itemKey) != null) {
+                    if (DEBUG)
+                        ignite.log().info("Before item remove [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+                            ", nearEntry=" + nearEntry(locId, itemKey) +
+                            ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+                    assert cache.removex(itemKey) : "Failed to remove key [locId=" + locId +
+                        ", primaryId=" + itemPrimaryId + ", key=" + itemKey + ']';
+
+                    if (DEBUG)
+                        info("After item remove item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+                            ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) +
+                            ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+                    break;
+                }
+                else
+                    cache.removex(itemKey);
+            }
+
+            tx.commit();
+        }
+        catch (Error e) {
+            ignite.log().error("Error in test.", e);
+
+            throw e;
+        }
+    }
+
+    /**
+     *
+     * @param putCntr Put counter to cache.
+     * @param ignite Grid.
+     * @param retry Retry count.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void onRemoveItemSimple(boolean putCntr, Ignite ignite, int retry) throws IgniteCheckedException {
+        GridCache<String, Integer> cache = ignite.cache(null);
+
+        UUID locId = ignite.cluster().localNode().id();
+        UUID cntrPrimaryId = primaryId(ignite, RMVD_CNTR_KEY);
+
+        boolean isCntrPrimary = cntrPrimaryId.equals(locId);
+
+        try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            if (DEBUG)
+                ignite.log().info("Before item lock [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() +
+                    ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId +
+                    ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+                    (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']');
+
+            Integer cntr = cache.get(RMVD_CNTR_KEY);
+
+            assert cntr != null : "Received null counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+                ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+                (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']';
+
+            String itemKey = Integer.toString(cntrRmvd.getAndIncrement());
+
+            Integer val = cache.get(itemKey);
+
+            assert val != null : "Received null val [retry=" + retry + ", cacheSize=" + cache.size() + ']';
+
+            UUID itemPrimaryId = primaryId(ignite, itemKey);
+
+            int newVal = cntr - 1;
+
+            if (putCntr) {
+                if (DEBUG)
+                    ignite.log().info("Before item put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+                        ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+                        (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']');
+
+                cache.putx(RMVD_CNTR_KEY, newVal);
+            }
+
+            if (DEBUG)
+                ignite.log().info("Before item remove item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+                    ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) +
+                    ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+            assertTrue(cache.removex(itemKey));
+
+            if (DEBUG)
+                info("After item put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+                    ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) +
+                    ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+            tx.commit();
+        }
+        catch (Error e) {
+            ignite.log().error("Error in test.", e);
+
+            throw e;
+        }
+    }
+
+    /**
+     *
+     * @param putCntr Put counter to cache.
+     * @param ignite Grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void retries(Ignite ignite, boolean putCntr) throws IgniteCheckedException {
+        UUID nodeId = ignite.cluster().localNode().id();
+
+        for (int i = 0; i < RETRIES; i++) {
+            int cnt = cntr.getAndIncrement();
+
+            if (DEBUG)
+                ignite.log().info("***");
+            if (DEBUG || cnt % LOG_FREQ == 0)
+                ignite.log().info("*** Iteration #" + i + " ***");
+            if (DEBUG)
+                ignite.log().info("***");
+
+            String itemKey = nodeId + "-#" + i;
+
+            if (nodeId.equals(primaryId(ignite, itemKey)))
+                onItemPrimary(putCntr, ignite, itemKey, i);
+            else
+                onItemNear(putCntr, ignite, itemKey, i);
+        }
+    }
+
+    /**
+     *
+     * @param putCntr Put counter to cache.
+     * @param ignite Grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void removeRetriesQueried(Ignite ignite, boolean putCntr) throws IgniteCheckedException {
+        for (int i = 0; i < RETRIES; i++) {
+            if (DEBUG)
+                ignite.log().info("***");
+
+            if (DEBUG || cntrRmvd.getAndIncrement() % LOG_FREQ == 0)
+                ignite.log().info("*** Iteration #" + i + " ***");
+
+            if (DEBUG)
+                ignite.log().info("***");
+
+            onRemoveItemQueried(putCntr, ignite, i);
+
+            if (i % 50 == 0)
+                ((GridKernal) ignite).internalCache().context().tm().printMemoryStats();
+        }
+    }
+
+    /**
+     *
+     * @param putCntr Put counter to cache.
+     * @param ignite Grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void removeRetriesSimple(Ignite ignite, boolean putCntr) throws IgniteCheckedException {
+        for (int i = 0; i < RETRIES; i++) {
+            if (DEBUG)
+                ignite.log().info("***");
+
+            if (cntrRmvd.get() % LOG_FREQ == 0 || DEBUG)
+                ignite.log().info("*** Iteration #" + i + " ***");
+
+            if (DEBUG)
+                ignite.log().info("***");
+
+            onRemoveItemSimple(putCntr, ignite, i);
+        }
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutOneEntryInTx() throws Exception {
+//        resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName());
+
+        startGrids(GRID_CNT);
+
+        try {
+            grid(0).cache(null).put(CNTR_KEY, 0);
+
+            grid(0).compute().call(new PutOneEntryInTxJob());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutTwoEntriesInTx() throws Exception {
+//        resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName());
+
+        startGrids(GRID_CNT);
+
+        try {
+            grid(0).cache(null).put(CNTR_KEY, 0);
+
+            grid(0).compute().call(new PutTwoEntriesInTxJob());
+
+            printCounter();
+
+            assertEquals(GRID_CNT * RETRIES, grid(0).cache(null).get(CNTR_KEY));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutOneEntryInTxMultiThreaded() throws Exception {
+//        resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName());
+
+        startGrids(GRID_CNT);
+
+        Collection<Thread> threads = new LinkedList<>();
+
+        try {
+            // Initialize.
+            grid(0).cache(null).put(CNTR_KEY, 0);
+
+            for (int i = 0; i < GRID_CNT; i++) {
+                final int gridId = i;
+
+                threads.add(new Thread("thread-#" + i) {
+                    @Override public void run() {
+                        try {
+                            retries(grid(gridId), false);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                });
+            }
+
+            for (Thread th : threads)
+                th.start();
+
+            for (Thread th : threads)
+                th.join();
+
+            printCounter();
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutTwoEntryInTxMultiThreaded() throws Exception {
+//        resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName());
+
+        startGrids(GRID_CNT);
+
+        Collection<Thread> threads = new LinkedList<>();
+
+        try {
+            grid(0).cache(null).put(CNTR_KEY, 0);
+
+            for (int i = 0; i < GRID_CNT; i++) {
+                final int gridId = i;
+
+                threads.add(new Thread() {
+                    @Override public void run() {
+                        try {
+                            retries(grid(gridId), true);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                });
+            }
+
+            for (Thread th : threads)
+                th.start();
+
+            for (Thread th : threads)
+                th.join();
+
+            printCounter();
+
+            assertEquals(GRID_CNT * RETRIES, grid(0).cache(null).get(CNTR_KEY));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRemoveInTxQueried() throws Exception {
+        //resetLog4j(Level.INFO, true, GridCacheTxManager.class.getPackage().getName());
+
+        startGrids(GRID_CNT);
+
+        try {
+            GridCache<String, Integer> cache = grid(0).cache(null);
+
+            cache.put(RMVD_CNTR_KEY, 0);
+
+            for (int i = 0; i < GRID_CNT * RETRIES; i++)
+                cache.put(String.valueOf(i), i);
+
+            for (int i = 0; i < RETRIES; i++)
+                for (int j = 0; j < GRID_CNT; j++)
+                    assertEquals(i, grid(j).cache(null).get(String.valueOf(i)));
+
+            GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, " _val >= 0");
+
+            Collection<Map.Entry<String, Integer>> entries = qry.execute().get();
+
+            assertFalse(entries.isEmpty());
+
+            cntrRmvd.set(0);
+
+            grid(0).compute().call(new RemoveInTxJobQueried());
+
+            for (int i = 0; i < GRID_CNT * RETRIES; i++)
+                for (int ii = 0; ii < GRID_CNT; ii++)
+                    assertEquals(null, grid(ii).cache(null).get(Integer.toString(i)));
+
+            assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRemoveInTxSimple() throws Exception {
+        startGrids(GRID_CNT);
+
+        try {
+            GridCache<String, Integer> cache = grid(0).cache(null);
+
+            cache.put(RMVD_CNTR_KEY, 0);
+
+            for (int i = 0; i < GRID_CNT * RETRIES; i++)
+                cache.put(Integer.toString(i), i);
+
+            for (int i = 0; i < RETRIES; i++)
+                for (int j = 0; j < GRID_CNT; j++)
+                    assertEquals(i, grid(j).cache(null).get(Integer.toString(i)));
+
+            GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, " _val >= 0");
+
+            Collection<Map.Entry<String, Integer>> entries = qry.execute().get();
+
+            assertFalse(entries.isEmpty());
+
+            cntrRmvd.set(0);
+
+            grid(0).compute().call(new RemoveInTxJobSimple());
+
+            // Check using cache.
+            for (int i = 0; i < GRID_CNT * RETRIES; i++)
+                for (int ii = 0; ii < GRID_CNT; ii++)
+                    assertEquals(null, grid(ii).cache(null).get(Integer.toString(i)));
+
+            // Check using query.
+            entries = qry.execute().get();
+
+            assertTrue(entries.isEmpty());
+
+            assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRemoveInTxQueriedMultiThreaded() throws Exception {
+        //resetLog4j(Level.INFO, true, GridCacheTxManager.class.getPackage().getName());
+
+        backups = 1;
+
+        try {
+            startGrids(GRID_CNT);
+
+            GridCache<String, Integer> cache = grid(0).cache(null);
+
+            // Store counter.
+            cache.put(RMVD_CNTR_KEY, 0);
+
+            // Store values.
+            for (int i = 1; i <= GRID_CNT * RETRIES; i++)
+                cache.put(String.valueOf(i), i);
+
+            for (int j = 0; j < GRID_CNT; j++)
+                assertEquals(0, grid(j).cache(null).get(RMVD_CNTR_KEY));
+
+            for (int i = 1; i <= RETRIES; i++)
+                for (int j = 0; j < GRID_CNT; j++)
+                    assertEquals(i, grid(j).cache(null).get(String.valueOf(i)));
+
+            GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "_val >= 0");
+
+            // Load all results.
+            qry.keepAll(true);
+            qry.includeBackups(false);
+
+            // NOTE: for replicated cache includeBackups(false) is not enough since
+            // all nodes are considered primary, so we have to deduplicate result set.
+            if (cache.configuration().getCacheMode() == REPLICATED)
+                qry.enableDedup(true);
+
+            List<Map.Entry<String, Integer>> entries =
+                new ArrayList<>(qry.execute().get());
+
+            Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+                @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
+                    return o1.getValue().compareTo(o2.getValue());
+                }
+            });
+
+            info("Queried entries: " + entries);
+
+            int val = 0;
+
+            for (Map.Entry<String, Integer> e : entries) {
+                assertEquals(val, e.getValue().intValue());
+
+                val++;
+            }
+
+            assertFalse(entries.isEmpty());
+
+            cntrRmvd.set(0);
+
+            Collection<Thread> threads = new LinkedList<>();
+
+            for (int i = 0; i < GRID_CNT; i++) {
+                final int gridId = i;
+
+                threads.add(new Thread() {
+                    @Override public void run() {
+                        try {
+                            removeRetriesQueried(grid(gridId), true);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                });
+            }
+
+            for (Thread th : threads)
+                th.start();
+
+            for (Thread th : threads)
+                th.join();
+
+            for (int i = 0; i < GRID_CNT * RETRIES; i++)
+                for (int ii = 0; ii < GRID_CNT; ii++)
+                    assertEquals("Got invalid value from cache [gridIdx=" + ii + ", key=" + i + ']',
+                        null, grid(ii).cache(null).get(Integer.toString(i)));
+
+            assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void printCounter() throws IgniteCheckedException {
+        info("***");
+        info("*** Peeked counter: " + grid(0).cache(null).peek(CNTR_KEY));
+        info("*** Got counter: " + grid(0).cache(null).get(CNTR_KEY));
+        info("***");
+    }
+
+    /**
+     * Test job putting data to queue.
+     */
+    protected class PutTwoEntriesInTxJob implements IgniteCallable<Integer> {
+        /** */
+        @GridToStringExclude
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public Integer call() throws IgniteCheckedException {
+            assertNotNull(ignite);
+
+            ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
+
+            retries(ignite, true);
+
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PutTwoEntriesInTxJob.class, this);
+        }
+    }
+
+    /**
+     * Test job putting data to cache.
+     */
+    protected class PutOneEntryInTxJob implements IgniteCallable<Integer> {
+        /** */
+        @GridToStringExclude
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public Integer call() throws IgniteCheckedException {
+            assertNotNull(ignite);
+
+            ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
+
+            retries(ignite, false);
+
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PutOneEntryInTxJob.class, this);
+        }
+    }
+
+    /**
+     * Test job removing data from cache using query.
+     */
+    protected class RemoveInTxJobQueried implements IgniteCallable<Integer> {
+        /** */
+        @GridToStringExclude
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public Integer call() throws IgniteCheckedException {
+            assertNotNull(ignite);
+
+            ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
+
+            removeRetriesQueried(ignite, true);
+
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoveInTxJobQueried.class, this);
+        }
+    }
+
+    /**
+     * Test job removing data from cache.
+     */
+    protected class RemoveInTxJobSimple implements IgniteCallable<Integer> {
+        /** */
+        @GridToStringExclude
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public Integer call() throws IgniteCheckedException {
+            assertNotNull(ignite);
+
+            ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
+
+            removeRetriesSimple(ignite, true);
+
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoveInTxJobSimple.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
new file mode 100644
index 0000000..efb7a26
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
@@ -0,0 +1,275 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+/**
+ * Tests for local transactions.
+ */
+@SuppressWarnings( {"BusyWait"})
+public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstractTest {
+    /**
+     * @return Thread count.
+     */
+    protected abstract int threadCount();
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws Exception If check failed.
+     */
+    protected void checkCommitMultithreaded(final GridCacheTxConcurrency concurrency,
+        final GridCacheTxIsolation isolation) throws Exception {
+        GridTestUtils.runMultiThreaded(new Callable<Object>() {
+            @Nullable @Override public Object call() throws Exception {
+                Thread t = Thread.currentThread();
+
+                t.setName(t.getName() + "-id-" + t.getId());
+
+                info("Starting commit thread: " + Thread.currentThread().getName());
+
+                try {
+                    checkCommit(concurrency, isolation);
+                }
+                finally {
+                    info("Finished commit thread: " + Thread.currentThread().getName());
+                }
+
+                return null;
+            }
+        }, threadCount(), concurrency + "-" + isolation);
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @throws Exception If check failed.
+     */
+    protected void checkRollbackMultithreaded(final GridCacheTxConcurrency concurrency,
+        final GridCacheTxIsolation isolation) throws Exception {
+        final ConcurrentMap<Integer, String> map = new ConcurrentHashMap<>();
+
+        GridTestUtils.runMultiThreaded(new Callable<Object>() {
+            @Nullable @Override public Object call() throws Exception {
+                Thread t = Thread.currentThread();
+
+                t.setName(t.getName() + "-id-" + t.getId());
+
+                info("Starting rollback thread: " + Thread.currentThread().getName());
+
+                try {
+                    checkRollback(map, concurrency, isolation);
+
+                    return null;
+                }
+                finally {
+                    info("Finished rollback thread: " + Thread.currentThread().getName());
+                }
+            }
+        }, threadCount(), concurrency + "-" + isolation);
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testPessimisticReadCommittedCommitMultithreaded() throws Exception {
+        checkCommitMultithreaded(PESSIMISTIC, READ_COMMITTED);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testPessimisticRepeatableReadCommitMultithreaded() throws Exception {
+        checkCommitMultithreaded(PESSIMISTIC, REPEATABLE_READ);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testPessimisticSerializableCommitMultithreaded() throws Exception {
+        checkCommitMultithreaded(PESSIMISTIC, SERIALIZABLE);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testOptimisticReadCommittedCommitMultithreaded() throws Exception {
+        checkCommitMultithreaded(OPTIMISTIC, READ_COMMITTED);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testOptimisticRepeatableReadCommitMultithreaded() throws Exception {
+        checkCommitMultithreaded(OPTIMISTIC, REPEATABLE_READ);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testOptimisticSerializableCommitMultithreaded() throws Exception {
+        checkCommitMultithreaded(OPTIMISTIC, SERIALIZABLE);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testPessimisticReadCommittedRollbackMultithreaded() throws Exception {
+        checkRollbackMultithreaded(PESSIMISTIC, READ_COMMITTED);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testPessimisticRepeatableReadRollbackMultithreaded() throws Exception {
+        checkRollbackMultithreaded(PESSIMISTIC, REPEATABLE_READ);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testPessimisticSerializableRollbackMultithreaded() throws Exception {
+        checkRollbackMultithreaded(PESSIMISTIC, SERIALIZABLE);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testOptimisticReadCommittedRollbackMultithreaded() throws Exception {
+        checkRollbackMultithreaded(OPTIMISTIC, READ_COMMITTED);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testOptimisticRepeatableReadRollbackMultithreaded() throws Exception {
+        checkRollbackMultithreaded(OPTIMISTIC, REPEATABLE_READ);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testOptimisticSerializableRollbackMultithreaded() throws Exception {
+        checkRollbackMultithreaded(OPTIMISTIC, SERIALIZABLE);
+
+        finalChecks();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    // TODO: GG-8063, enabled when fixed.
+    public void _testOptimisticSerializableConsistency() throws Exception {
+        final GridCache<Integer, Long> cache = grid(0).cache(null);
+
+        final int THREADS = 2;
+
+        final int ITERATIONS = 100;
+
+        final int key = 0;
+
+        cache.put(key, 0L);
+
+        List<IgniteFuture<Collection<Long>>> futs = new ArrayList<>(THREADS);
+
+        for (int i = 0; i < THREADS; i++) {
+            futs.add(GridTestUtils.runAsync(new Callable<Collection<Long>>() {
+                @Override public Collection<Long> call() throws Exception {
+                    Collection<Long> res = new ArrayList<>();
+
+                    for (int i = 0; i < ITERATIONS; i++) {
+                        while (true) {
+                            try (IgniteTx tx = cache.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                                long val = cache.get(key);
+
+                                cache.put(key, val + 1);
+
+                                tx.commit();
+
+                                assertTrue(res.add(val + 1));
+
+                                break;
+                            }
+                            catch(GridCacheTxOptimisticException e) {
+                                log.info("Got error, will retry: " + e);
+                            }
+                        }
+                    }
+
+                    return res;
+                }
+            }));
+        }
+
+        List<Collection<Long>> cols = new ArrayList<>(THREADS);
+
+        for (IgniteFuture<Collection<Long>> fut : futs) {
+            Collection<Long> col = fut.get();
+
+            assertEquals(ITERATIONS, col.size());
+
+            cols.add(col);
+        }
+
+        Set<Long> duplicates = new HashSet<>();
+
+        for (Collection<Long> col1 : cols) {
+            for (Long val1 : col1) {
+                for (Collection<Long> col2 : cols) {
+                    if (col1 == col2)
+                        continue;
+
+                    for (Long val2 : col2) {
+                        if (val1.equals(val2)) {
+                            duplicates.add(val2);
+
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        assertTrue("Found duplicated values: " + duplicates, duplicates.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java
new file mode 100644
index 0000000..8decc56
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java
@@ -0,0 +1,169 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.managers.communication.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.util.direct.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.concurrent.atomic.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+/**
+ * Tests reentry in pessimistic repeatable read tx.
+ */
+public abstract class IgniteTxReentryAbstractSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** @return Cache mode. */
+    protected abstract GridCacheMode cacheMode();
+
+    /** @return Near enabled. */
+    protected abstract boolean nearEnabled();
+
+    /** @return Grid count. */
+    protected abstract int gridCount();
+
+    /** @return Test key. */
+    protected abstract int testKey();
+
+    /** @return Expected number of near lock requests. */
+    protected abstract int expectedNearLockRequests();
+
+    /** @return Expected number of near lock requests. */
+    protected abstract int expectedDhtLockRequests();
+
+    /** @return Expected number of near lock requests. */
+    protected abstract int expectedDistributedLockRequests();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setCommunicationSpi(new CountingCommunicationSpi());
+        cfg.setDiscoverySpi(discoSpi);
+
+        GridCacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(cacheMode());
+        cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(nearEnabled() ? NEAR_PARTITIONED : PARTITIONED_ONLY);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** @throws Exception If failed. */
+    public void testLockReentry() throws Exception {
+        startGrids(gridCount());
+
+        try {
+            GridCache<Object, Object> cache = grid(0).cache(null);
+
+            // Find test key.
+            int key = testKey();
+
+            try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                // One near lock request.
+                cache.get(key);
+
+                // No more requests.
+                cache.remove(key);
+
+                tx.commit();
+            }
+
+            CountingCommunicationSpi commSpi = (CountingCommunicationSpi)grid(0).configuration().getCommunicationSpi();
+
+            assertEquals(expectedNearLockRequests(), commSpi.nearLocks());
+            assertEquals(expectedDhtLockRequests(), commSpi.dhtLocks());
+            assertEquals(expectedDistributedLockRequests(), commSpi.distributedLocks());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** Counting communication SPI. */
+    protected static class CountingCommunicationSpi extends TcpCommunicationSpi {
+        /** Distributed lock requests. */
+        private AtomicInteger distLocks = new AtomicInteger();
+
+        /** Near lock requests. */
+        private AtomicInteger nearLocks = new AtomicInteger();
+
+        /** Dht locks. */
+        private AtomicInteger dhtLocks = new AtomicInteger();
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg)
+            throws IgniteSpiException {
+            countMsg((GridIoMessage)msg);
+
+            super.sendMessage(node, msg);
+        }
+
+        /**
+         * Unmarshals the message and increments counters.
+         *
+         * @param msg Message to check.
+         */
+        private void countMsg(GridIoMessage msg) {
+            Object origMsg = msg.message();
+
+            if (origMsg instanceof GridDistributedLockRequest) {
+                distLocks.incrementAndGet();
+
+                if (origMsg instanceof GridNearLockRequest)
+                    nearLocks.incrementAndGet();
+                else if (origMsg instanceof GridDhtLockRequest)
+                    dhtLocks.incrementAndGet();
+            }
+        }
+
+        /** @return Number of recorded distributed locks. */
+        public int distributedLocks() {
+            return distLocks.get();
+        }
+
+        /** @return Number of recorded distributed locks. */
+        public int nearLocks() {
+            return nearLocks.get();
+        }
+
+        /** @return Number of recorded distributed locks. */
+        public int dhtLocks() {
+            return dhtLocks.get();
+        }
+    }
+}


Mime
View raw message