ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1607 WIP
Date Mon, 19 Oct 2015 10:35:55 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1607 b9051da90 -> 838e0ef87


ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/838e0ef8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/838e0ef8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/838e0ef8

Branch: refs/heads/ignite-1607
Commit: 838e0ef874fab078140abf647f66b14258e22d55
Parents: b9051da
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Oct 19 13:07:23 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 19 13:30:13 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |   9 +-
 .../processors/cache/GridCacheMapEntry.java     |  20 +-
 .../GridDistributedTxRemoteAdapter.java         |  59 +++--
 .../distributed/near/GridNearCacheEntry.java    |  11 +-
 .../cache/transactions/IgniteTxEntry.java       |   1 +
 .../cache/transactions/IgniteTxHandler.java     |   2 +-
 .../transactions/IgniteTxLocalAdapter.java      |  17 +-
 .../CacheSerializableTransactionsTest.java      | 247 +++++++++++++++++++
 .../processors/cache/GridCacheTestEntryEx.java  |  16 +-
 9 files changed, 345 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/838e0ef8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index d00929a..53edde7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEnt
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -360,6 +359,7 @@ public interface GridCacheEntryEx {
      * @param explicitVer Explicit version (if any).
      * @param subjId Subject ID initiated this update.
      * @param taskName Task name.
+     * @param dhtVer Dht version for near cache entry.
      * @return Tuple containing success flag and old value. If success is {@code false},
      *      then value is {@code null}.
      * @throws IgniteCheckedException If storing value failed.
@@ -381,7 +381,8 @@ public interface GridCacheEntryEx {
         long drExpireTime,
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable GridCacheVersion dhtVer
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
@@ -398,6 +399,7 @@ public interface GridCacheEntryEx {
      * @param explicitVer Explicit version (if any).
      * @param subjId Subject ID initiated this update.
      * @param taskName Task name.
+     * @param dhtVer Dht version for near cache entry.
      * @return Tuple containing success flag and old value. If success is {@code false},
      *      then value is {@code null}.
      * @throws IgniteCheckedException If remove failed.
@@ -416,7 +418,8 @@ public interface GridCacheEntryEx {
         GridDrType drType,
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable GridCacheVersion dhtVer
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/838e0ef8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9a6eb06..9271923 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1058,7 +1058,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
         long drExpireTime,
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable GridCacheVersion dhtVer
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         CacheObject old;
 
@@ -1146,6 +1147,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
             update(val, expireTime, ttl, newVer);
 
+            if (isNear()) {
+                assert dhtVer != null;
+
+                ((GridNearCacheEntry)this).recordDhtVersion(dhtVer);
+            }
+
             drReplicate(drType, val, newVer);
 
             recordNodeId(affNodeId, topVer);
@@ -1212,8 +1219,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
         GridDrType drType,
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
-        String taskName
-    ) throws IgniteCheckedException, GridCacheEntryRemovedException {
+        String taskName,
+        @Nullable GridCacheVersion dhtVer
+        ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert cctx.transactional();
 
         CacheObject old;
@@ -1274,6 +1282,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
             update(null, 0, 0, newVer);
 
+            if (isNear()) {
+                assert dhtVer != null;
+
+                ((GridNearCacheEntry)this).recordDhtVersion(dhtVer);
+            }
+
             if (cctx.offheapTiered() && hadValPtr) {
                 boolean rmv = cctx.swap().removeOffheap(key);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/838e0ef8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c6dd014..bbcab6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -569,22 +569,42 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         // Nullify explicit version so that innerSet/innerRemove
will work as usual.
                                         explicitVer = null;
 
+                                    GridCacheVersion dhtVer = cached.isNear() ? writeVersion()
: null;
+
                                     if (op == CREATE || op == UPDATE) {
                                         // Invalidate only for near nodes (backups cannot
be invalidated).
                                         if (isSystemInvalidate() || (isInvalidate() &&
cacheCtx.isNear()))
-                                            cached.innerRemove(this, eventNodeId(), nodeId,
false, false, true, true,
-                                                topVer, null, replicate ? DR_BACKUP : DR_NONE,
+                                            cached.innerRemove(this,
+                                                eventNodeId(),
+                                                nodeId,
+                                                false,
+                                                false,
+                                                true,
+                                                true,
+                                                topVer,
+                                                null,
+                                                replicate ? DR_BACKUP : DR_NONE,
                                                 near() ? null : explicitVer, CU.subjectId(this,
cctx),
-                                                resolveTaskName());
+                                                resolveTaskName(),
+                                                dhtVer);
                                         else {
-                                            cached.innerSet(this, eventNodeId(), nodeId,
val, false, false,
-                                                txEntry.ttl(), true, true, topVer, null,
-                                                replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(),
-                                                near() ? null : explicitVer, CU.subjectId(this,
cctx),
-                                                resolveTaskName());
-
-                                            if (cached.isNear())
-                                                ((GridNearCacheEntry)cached).recordDhtVersion(cached.version());
+                                            cached.innerSet(this,
+                                                eventNodeId(),
+                                                nodeId,
+                                                val,
+                                                false,
+                                                false,
+                                                txEntry.ttl(),
+                                                true,
+                                                true,
+                                                topVer,
+                                                null,
+                                                replicate ? DR_BACKUP : DR_NONE,
+                                                txEntry.conflictExpireTime(),
+                                                near() ? null : explicitVer,
+                                                CU.subjectId(this, cctx),
+                                                resolveTaskName(),
+                                                dhtVer);
 
                                             // Keep near entry up to date.
                                             if (nearCached != null) {
@@ -600,9 +620,20 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         }
                                     }
                                     else if (op == DELETE) {
-                                        cached.innerRemove(this, eventNodeId(), nodeId, false,
false, true, true,
-                                            topVer, null, replicate ? DR_BACKUP : DR_NONE,
-                                            near() ? null : explicitVer, CU.subjectId(this,
cctx), resolveTaskName());
+                                        cached.innerRemove(this,
+                                            eventNodeId(),
+                                            nodeId,
+                                            false,
+                                            false,
+                                            true,
+                                            true,
+                                            topVer,
+                                            null,
+                                            replicate ? DR_BACKUP : DR_NONE,
+                                            near() ? null : explicitVer,
+                                            CU.subjectId(this, cctx),
+                                            resolveTaskName(),
+                                            dhtVer);
 
                                         // Keep near entry up to date.
                                         if (nearCached != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/838e0ef8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 5c3eccd..036ee6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -317,13 +317,11 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     }
 
     /**
-     * This method should be called only when committing optimistic transactions.
-     *
      * @param dhtVer DHT version to record.
      */
-    public synchronized void recordDhtVersion(GridCacheVersion dhtVer) {
-        // Version manager must be updated separately, when adding DHT version
-        // to transaction entries.
+    public void recordDhtVersion(GridCacheVersion dhtVer) {
+        assert Thread.holdsLock(this);
+
         this.dhtVer = dhtVer;
     }
 
@@ -380,6 +378,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
             synchronized (this) {
                 checkObsolete();
 
+                if (this.dhtVer != null && this.dhtVer.compareTo(dhtVer) >= 0)
+                    return false;
+
                 if (cctx.cache().configuration().isStatisticsEnabled())
                     cctx.cache().metrics0().onRead(false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/838e0ef8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 04393a5..a72a392 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -843,6 +843,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
      * @param serReadVer Read version for serializable transaction.
      */
     public void serializableReadVersion(GridCacheVersion serReadVer) {
+        assert this.serReadVer == null;
         assert serReadVer != null;
 
         this.serReadVer = serReadVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/838e0ef8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index baaf4aa..74dd650 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -416,7 +416,7 @@ public class IgniteTxHandler {
 
             if (tx.isRollbackOnly()) {
                 try {
-                    if (tx.state() != TransactionState.ROLLED_BACK)
+                    if (tx.state() != TransactionState.ROLLED_BACK && tx.state()
!= TransactionState.ROLLING_BACK)
                         tx.rollback();
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/838e0ef8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7668bca..5a7e5c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -911,13 +910,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     IgniteBiTuple<GridCacheOperation, CacheObject>
res = applyTransformClosures(txEntry,
                                         true);
 
+                                    GridCacheVersion dhtVer = null;
+
                                     // For near local transactions we must record DHT version
                                     // in order to keep near entries on backup nodes until
                                     // backup remote transaction completes.
                                     if (cacheCtx.isNear()) {
                                         if (txEntry.op() == CREATE || txEntry.op() == UPDATE
||
                                             txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
-                                            ((GridNearCacheEntry)cached).recordDhtVersion(txEntry.dhtVersion());
+                                            dhtVer = txEntry.dhtVersion();
 
                                         if ((txEntry.op() == CREATE || txEntry.op() == UPDATE)
&&
                                             txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE)
{
@@ -1015,7 +1016,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             txEntry.conflictExpireTime(),
                                             cached.isNear() ? null : explicitVer,
                                             CU.subjectId(this, cctx),
-                                            resolveTaskName());
+                                            resolveTaskName(),
+                                            dhtVer);
 
                                         if (nearCached != null && updRes.success())
                                             nearCached.innerSet(
@@ -1034,7 +1036,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                                 txEntry.conflictExpireTime(),
                                                 null,
                                                 CU.subjectId(this, cctx),
-                                                resolveTaskName());
+                                                resolveTaskName(),
+                                                dhtVer);
                                     }
                                     else if (op == DELETE) {
                                         GridCacheUpdateTxResult updRes = cached.innerRemove(
@@ -1050,7 +1053,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             cached.detached()  ? DR_NONE : drType,
                                             cached.isNear() ? null : explicitVer,
                                             CU.subjectId(this, cctx),
-                                            resolveTaskName());
+                                            resolveTaskName(),
+                                            dhtVer);
 
                                         if (nearCached != null && updRes.success())
                                             nearCached.innerRemove(
@@ -1066,7 +1070,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                                 DR_NONE,
                                                 null,
                                                 CU.subjectId(this, cctx),
-                                                resolveTaskName());
+                                                resolveTaskName(),
+                                                dhtVer);
                                     }
                                     else if (op == RELOAD) {
                                         cached.innerReload();

http://git-wip-us.apache.org/repos/asf/ignite/blob/838e0ef8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 67134f0..7ef2b34 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -1844,6 +1844,29 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                     }
 
                     checkValue(key, null, cache.getName());
+
+                    try {
+                        cache.put(key, 1);
+
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Object val = cache.get(key);
+
+                            assertEquals(1, val);
+
+                            boolean res = cache.remove(key);
+
+                            assertTrue(res);
+
+                            updateKey(cache, key, 2);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
                 }
             }
             finally {
@@ -2795,6 +2818,230 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
+    public void testIncrementTx1() throws Exception {
+        incrementTx(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncrementTx2() throws Exception {
+        incrementTx(false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncrementTxNearCache1() throws Exception {
+        incrementTx(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncrementTxNearCache2() throws Exception {
+        incrementTx(true, true);
+    }
+
+    /**
+     * @param nearCache If {@code true} near cache is enabled.
+     * @param store If {@code true} cache store is enabled.
+     * @throws Exception If failed.
+     */
+    private void incrementTx(boolean nearCache, boolean store) throws Exception {
+        final Ignite ignite0 = ignite(0);
+
+        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED,
FULL_SYNC, 1, store, false);
+
+        final List<Ignite> clients = clients();
+
+        final String cacheName = ignite0.createCache(ccfg).getName();
+
+        try {
+            final List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
+
+            for (Ignite client : clients) {
+                if (nearCache)
+                    caches.add(client.createNearCache(cacheName, new NearCacheConfiguration<Integer,
Integer>()));
+                else
+                    caches.add(client.<Integer, Integer>cache(cacheName));
+            }
+
+            for (int i = 0; i < 30; i++) {
+                final AtomicInteger cntr = new AtomicInteger();
+
+                final Integer key = i;
+
+                final AtomicInteger threadIdx = new AtomicInteger();
+
+                final int THREADS = 10;
+
+                final CyclicBarrier barrier = new CyclicBarrier(THREADS);
+
+                GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        int idx = threadIdx.getAndIncrement() % caches.size();
+
+                        IgniteCache<Integer, Integer> cache = caches.get(idx);
+
+                        Ignite ignite = cache.unwrap(Ignite.class);
+
+                        IgniteTransactions txs = ignite.transactions();
+
+                        log.info("Started update thread: " + ignite.name());
+
+                        barrier.await();
+
+                        for (int i = 0; i < 1000; i++) {
+                            try {
+                                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE))
{
+                                    Integer val = cache.get(key);
+
+                                    cache.put(key, val == null ? 1 : val + 1);
+
+                                    tx.commit();
+                                }
+
+                                cntr.incrementAndGet();
+                            }
+                            catch (TransactionOptimisticException ignore) {
+                                // Retry.
+                            }
+                        }
+
+                        return null;
+                    }
+                }, THREADS, "update-thread").get();
+
+                log.info("Iteration [iter=" + i + ", val=" + cntr.get() + ']');
+
+                assertTrue(cntr.get() > 0);
+
+                checkValue(key, cntr.get(), cacheName);
+            }
+        }
+        finally {
+            ignite0.destroyCache(cacheName);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRemoveTx() throws Exception {
+        getRemoveTx(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRemoveTxNearCache() throws Exception {
+        getRemoveTx(true);
+    }
+
+    /**
+     * @param nearCache If {@code true} near cache is enabled.
+     * @throws Exception If failed.
+     */
+    private void getRemoveTx(boolean nearCache) throws Exception {
+        final Ignite ignite0 = ignite(0);
+
+        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED,
FULL_SYNC, 0, false, false);
+
+        final List<Ignite> clients = clients();
+
+        final String cacheName = ignite0.createCache(ccfg).getName();
+
+        try {
+            final List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
+
+            for (Ignite client : clients) {
+                if (nearCache)
+                    caches.add(client.createNearCache(cacheName, new NearCacheConfiguration<Integer,
Integer>()));
+                else
+                    caches.add(client.<Integer, Integer>cache(cacheName));
+            }
+
+            for (int i = 0; i < 100; i++) {
+                final AtomicInteger cntr = new AtomicInteger();
+
+                final Integer key = i;
+
+                final AtomicInteger threadIdx = new AtomicInteger();
+
+                final int THREADS = 10;
+
+                final CyclicBarrier barrier = new CyclicBarrier(THREADS);
+
+                GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        int idx = threadIdx.getAndIncrement() % caches.size();
+
+                        IgniteCache<Integer, Integer> cache = caches.get(idx);
+
+                        Ignite ignite = cache.unwrap(Ignite.class);
+
+                        IgniteTransactions txs = ignite.transactions();
+
+                        log.info("Started update thread: " + ignite.name());
+
+                        barrier.await();
+
+                        for (int i = 0; i < 100; i++) {
+                            try {
+                                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                                boolean rmv = rnd.nextInt(3) == 0;
+
+                                Integer val;
+
+                                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE))
{
+                                    val = cache.get(key);
+
+                                    if (rmv)
+                                        cache.remove(key);
+                                    else
+                                        cache.put(key, val == null ? 1 : val + 1);
+
+                                    tx.commit();
+
+                                    if (rmv) {
+                                        if (val != null) {
+                                            for (int j = 0; j < val; j++)
+                                                cntr.decrementAndGet();
+                                        }
+                                    }
+                                    else
+                                        cntr.incrementAndGet();
+                                }
+                            }
+                            catch (TransactionOptimisticException ignore) {
+                                // Retry.
+                            }
+                        }
+
+                        return null;
+                    }
+                }, THREADS, "update-thread").get();
+
+                int val = cntr.get();
+
+                log.info("Iteration [iter=" + i + ", val=" + val + ']');
+
+                if (val == 0)
+                    checkValue(key, null, cacheName);
+                else
+                    checkValue(key, val, cacheName);
+            }
+        }
+        finally {
+            ignite0.destroyCache(cacheName);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAccountTx1() throws Exception {
         accountTx(false, false, false, TestMemoryMode.HEAP);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/838e0ef8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index dd3c79a..e074583 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -473,9 +473,14 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements
Gr
         boolean evt,
         boolean metrics,
         AffinityTopologyVersion topVer,
-        CacheEntryPredicate[] filter, GridDrType drType,
-        long drExpireTime, @Nullable GridCacheVersion drVer, UUID subjId, String taskName)
throws IgniteCheckedException,
-        GridCacheEntryRemovedException {
+        CacheEntryPredicate[] filter,
+        GridDrType drType,
+        long drExpireTime,
+        @Nullable GridCacheVersion drVer,
+        UUID subjId,
+        String taskName,
+        @Nullable GridCacheVersion dhtVer)
+        throws IgniteCheckedException, GridCacheEntryRemovedException {
         return new GridCacheUpdateTxResult(true, rawPut(val, ttl));
     }
 
@@ -545,8 +550,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements
Gr
         GridDrType drType,
         @Nullable GridCacheVersion drVer,
         UUID subjId,
-        String taskName
-    ) throws IgniteCheckedException, GridCacheEntryRemovedException {
+        String taskName,
+        @Nullable GridCacheVersion dhtVer
+        ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         obsoleteVer = ver;
 
         CacheObject old = val;


Mime
View raw message