ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [11/21] ignite git commit: ignite-1607 WIP
Date Tue, 13 Oct 2015 07:05:44 GMT
ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1f3a5fd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1f3a5fd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1f3a5fd

Branch: refs/heads/ignite-1607
Commit: c1f3a5fdabc7af58066cc6f5a7a6826f2721ddd7
Parents: caed865
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Oct 8 16:30:53 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Oct 9 13:33:27 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 199 ++-------
 .../distributed/dht/GridDhtCacheAdapter.java    |   6 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |  40 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   2 +-
 ...arOptimisticSerializableTxPrepareFuture.java |  47 ++-
 .../cache/distributed/near/GridNearTxLocal.java |  14 -
 .../CacheSerializableTransactionsTest.java      | 418 ++++++++++++++++++-
 .../GridCacheConcurrentTxMultiNodeTest.java     |   3 -
 .../IgniteTxMultiThreadedAbstractTest.java      |   2 +-
 9 files changed, 509 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c1f3a5fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index ae987b7..bb15204 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1409,144 +1409,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param taskName Task name.
      * @return Future.
      */
-    public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> reloadAllAsync0(
+    public final IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> reloadAllAsync0(
         Collection<KeyCacheObject> keys,
         boolean ret,
         boolean skipVals,
         @Nullable UUID subjId,
-        String taskName)
-    {
-        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
-
-        if (!F.isEmpty(keys)) {
-            final Map<KeyCacheObject, GridCacheVersion> keyVers = new HashMap();
-
-            for (KeyCacheObject key : keys) {
-                if (key == null)
-                    continue;
-
-                // Skip primary or backup entries for near cache.
-                if (ctx.isNear() && ctx.affinity().localNode(key, topVer))
-                    continue;
-
-                while (true) {
-                    try {
-                        GridCacheEntryEx entry = entryExSafe(key, topVer);
-
-                        if (entry == null)
-                            break;
-
-                        GridCacheVersion ver = entry.version();
-
-                        keyVers.put(key, ver);
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry for reload (will retry): " + key);
-                    }
-                    catch (GridDhtInvalidPartitionException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got invalid partition for key (will skip): " + key);
+        String taskName) {
+        assert false;
 
-                        break;
-                    }
-                }
-            }
-
-            final Map<KeyCacheObject, CacheObject> map =
-                ret ? U.<KeyCacheObject, CacheObject>newHashMap(keys.size()) : null;
-
-            final Collection<KeyCacheObject> absentKeys = F.view(keyVers.keySet());
-
-            final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>();
-
-            IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null,
-                subjId, taskName, new CI2<KeyCacheObject, Object>() {
-                    /** Version for all loaded entries. */
-                    private GridCacheVersion nextVer = ctx.versions().next();
-
-                    /** {@inheritDoc} */
-                    @Override public void apply(KeyCacheObject key, Object val) {
-                        loadedKeys.add(key);
-
-                        GridCacheEntryEx entry = peekEx(key);
-
-                        if (entry != null) {
-                            try {
-                                GridCacheVersion curVer = keyVers.get(key);
-
-                                if (curVer != null) {
-                                    boolean wasNew = entry.isNewLocked();
-
-                                    entry.unswap();
-
-                                    CacheObject cacheVal = ctx.toCacheObject(val);
-
-                                    boolean set = entry.versionedValue(cacheVal, curVer, nextVer);
-
-                                    ctx.evicts().touch(entry, topVer);
-
-                                    if (map != null) {
-                                        if (set || wasNew)
-                                            map.put(key, cacheVal);
-                                        else {
-                                            CacheObject v = entry.peek(true, false, false, null);
-
-                                            if (v != null)
-                                                map.put(key, v);
-                                        }
-                                    }
-
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("Set value loaded from store into entry [set=" + set + ", " +
-                                            "curVer=" +
-                                            curVer + ", newVer=" + nextVer + ", entry=" + entry + ']');
-                                    }
-                                }
-                                else {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("Current version was not found (either entry was removed or " +
-                                            "validation was not passed: " + entry);
-                                    }
-                                }
-                            }
-                            catch (GridCacheEntryRemovedException ignore) {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Got removed entry for reload (will not store reloaded entry) " +
-                                        "[entry=" + entry + ']');
-                                }
-                            }
-                            catch (IgniteCheckedException e) {
-                                throw new IgniteException(e);
-                            }
-                        }
-                    }
-                });
-
-            return readFut.chain(new CX1<IgniteInternalFuture<Object>, Map<KeyCacheObject, CacheObject>>() {
-                @Override public Map<KeyCacheObject, CacheObject> applyx(IgniteInternalFuture<Object> e)
-                    throws IgniteCheckedException  {
-                    // Touch all not loaded keys.
-                    for (KeyCacheObject key : absentKeys) {
-                        if (!loadedKeys.contains(key)) {
-                            GridCacheEntryEx entry = peekEx(key);
-
-                            if (entry != null)
-                                ctx.evicts().touch(entry, topVer);
-                        }
-                    }
-
-                    // Make sure there were no exceptions.
-                    e.get();
-
-                    return map;
-                }
-            });
-        }
-
-        return new GridFinishedFuture<>(Collections.<KeyCacheObject, CacheObject>emptyMap());
+        return new GridFinishedFuture<>();
     }
 
     /**
@@ -1763,7 +1634,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             expiry,
             skipVals,
             false,
-            canRemap);
+            canRemap,
+            false);
     }
 
     /**
@@ -1778,7 +1650,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param keepCacheObjects Keep cache objects
      * @return Future.
      */
-    public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys,
+    public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys,
         final boolean readThrough,
         boolean checkTx,
         @Nullable final UUID subjId,
@@ -1787,7 +1659,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         @Nullable IgniteCacheExpiryPolicy expiry,
         final boolean skipVals,
         final boolean keepCacheObjects,
-        boolean canRemap
+        boolean canRemap,
+        final boolean needVer
         ) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
@@ -1822,20 +1695,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         GridCacheEntryEx entry = entryEx(key);
 
                         try {
-                            CacheObject val = entry.innerGet(null,
+                            T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null,
                                 ctx.isSwapOrOffheapEnabled(),
-                                /*don't read-through*/false,
-                                /*fail-fast*/true,
                                 /*unmarshal*/true,
                                 /*update-metrics*/!skipVals,
                                 /*event*/!skipVals,
-                                /*temporary*/false,
                                 subjId,
                                 null,
                                 taskName,
                                 expiry);
 
-                            if (val == null) {
+                            if (res == null) {
                                 GridCacheVersion ver = entry.version();
 
                                 if (misses == null)
@@ -1844,7 +1714,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                 misses.put(key, ver);
                             }
                             else {
-                                ctx.addResult(map, key, val, skipVals, keepCacheObjects, deserializePortable, true);
+                                if (needVer) {
+                                    assert keepCacheObjects;
+
+                                    map.put((K1)key, (V1)new T2<>(res.get1(), res.get2()));
+                                }
+                                else {
+                                    ctx.addResult(map,
+                                        key,
+                                        res.get1(),
+                                        skipVals,
+                                        keepCacheObjects,
+                                        deserializePortable,
+                                        true);
+                                }
 
                                 if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
                                     ctx.evicts().touch(entry, topVer);
@@ -1860,15 +1743,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                             if (log.isDebugEnabled())
                                 log.debug("Got removed entry in getAllAsync(..) method (will retry): " + key);
                         }
-                        catch (GridCacheFilterFailedException ignore) {
-                            if (log.isDebugEnabled())
-                                log.debug("Filter validation failed for entry: " + entry);
-
-                            if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
-                                ctx.evicts().touch(entry, topVer);
-
-                            break; // While loop.
-                        }
                     }
                 }
 
@@ -1918,13 +1792,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                                                 // Don't put key-value pair into result map if value is null.
                                                 if (val != null) {
-                                                    ctx.addResult(map,
-                                                        key,
-                                                        cacheVal,
-                                                        skipVals,
-                                                        keepCacheObjects,
-                                                        deserializePortable,
-                                                        false);
+                                                    if (needVer) {
+                                                        assert keepCacheObjects;
+
+                                                        map.put((K1)key, (V1)new T2<>(cacheVal, set ? nextVer : ver));
+                                                    }
+                                                    else {
+                                                        ctx.addResult(map,
+                                                            key,
+                                                            cacheVal,
+                                                            skipVals,
+                                                            keepCacheObjects,
+                                                            deserializePortable,
+                                                            false);
+                                                    }
                                                 }
 
                                                 if (tx0 == null || (!tx0.implicit() &&
@@ -2017,6 +1898,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             }
         }
         else {
+            assert !needVer;
+
             return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
                 @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) {
                     return tx.getAllAsync(ctx, keys, null, deserializePortable, skipVals, false, !readThrough);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1f3a5fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 3ce9ee9..25e480c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CI3;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -605,7 +606,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param skipVals Skip values flag.
      * @return Get future.
      */
-    IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> getDhtAllAsync(
+    IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> getDhtAllAsync(
         Collection<KeyCacheObject> keys,
         boolean readThrough,
         @Nullable UUID subjId,
@@ -623,7 +624,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             expiry,
             skipVals,
             /*keep cache objects*/true,
-            canRemap);
+            canRemap,
+            /*need version*/true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1f3a5fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index a67b1de..e8cafb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -17,11 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -45,6 +44,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
@@ -147,6 +147,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         assert reader != null;
         assert !F.isEmpty(keys);
 
+        assert !reload;
+
         this.reader = reader;
         this.cctx = cctx;
         this.msgId = msgId;
@@ -291,8 +293,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             return new GridFinishedFuture<Collection<GridCacheEntryInfo>>(
                 Collections.<GridCacheEntryInfo>emptyList());
 
-        final Collection<GridCacheEntryInfo> infos = new LinkedList<>();
-
         String taskName0 = cctx.kernalContext().job().currentTaskName();
 
         if (taskName0 == null)
@@ -335,8 +335,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                         txFut.add(f);
                     }
 
-                    infos.add(info);
-
                     break;
                 }
                 catch (IgniteCheckedException err) {
@@ -355,7 +353,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         if (txFut != null)
             txFut.markInitialized();
 
-        IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> fut;
+        IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
 
         if (txFut == null || txFut.isDone()) {
             if (reload && cctx.readThrough() && cctx.store().configured()) {
@@ -393,8 +391,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             // transactions to complete.
             fut = new GridEmbeddedFuture<>(
                 txFut,
-                new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, CacheObject>>>() {
-                    @Override public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> apply(Boolean b, Exception e) {
+                new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() {
+                    @Override public IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> apply(Boolean b, Exception e) {
                         if (e != null)
                             throw new GridClosureException(e);
 
@@ -432,23 +430,29 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         }
 
         return new GridEmbeddedFuture<>(
-            new C2<Map<KeyCacheObject, CacheObject>, Exception, Collection<GridCacheEntryInfo>>() {
-                @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, CacheObject> map, Exception e) {
+            new C2<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>, Exception, Collection<GridCacheEntryInfo>>() {
+                @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e) {
                     if (e != null) {
                         onDone(e);
 
                         return Collections.emptyList();
                     }
                     else {
-                        for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) {
-                            GridCacheEntryInfo info = it.next();
+                        Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size());
 
-                            Object v = map.get(info.key());
+                        for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) {
+                            T2<CacheObject, GridCacheVersion> val = entry.getValue();
 
-                            if (v == null)
-                                it.remove();
-                            else
-                                info.value(skipVals ? null : (CacheObject)v);
+                            assert val != null;
+
+                            GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                            info.cacheId(cctx.cacheId());
+                            info.key(entry.getKey());
+                            info.value(skipVals ? null : val.get1());
+                            info.version(val.get2());
+
+                            infos.add(info);
                         }
 
                         return infos;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1f3a5fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 2071275..44f34aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -627,7 +627,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                         if (finish(false) || state() == UNKNOWN)
                             fut.finish();
                         else
-                            fut.onError(new IgniteCheckedException("Failed to commit transaction: " +
+                            fut.onError(new IgniteCheckedException("Failed to rollback transaction: " +
                                 CU.txString(GridDhtTxLocal.this)));
 
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1f3a5fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index e48601d..04c4851 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -139,7 +140,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
 
                     e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
 
-                    f.onResult(e);
+                    f.onNodeLeft(e);
 
                     found = true;
                 }
@@ -165,7 +166,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         }
 
         if (e instanceof IgniteTxOptimisticCheckedException && nodeId != null)
-            tx.onOptimisticException(nodeId);
+            tx.removeMapping(nodeId);
 
         err.compareAndSet(null, e);
     }
@@ -519,14 +520,27 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
 
         Collection<MiniFuture> futs = (Collection)futures();
 
-        for (MiniFuture fut : futs) {
-            if (remap && fut.rcvRes.get())
+        Iterator<MiniFuture> it = futs.iterator();
+
+        while (it.hasNext()) {
+            MiniFuture fut = it.next();
+
+            if (skipFuture(remap, fut))
                 continue;
 
             IgniteCheckedException err = prepare(fut);
 
             if (err != null) {
-                onDone(err);
+                while (it.hasNext()) {
+                    fut = it.next();
+
+                    if (skipFuture(remap, fut))
+                        continue;
+
+                    tx.removeMapping(fut.mapping().node().id());
+
+                    fut.onResult(err);
+                }
 
                 break;
             }
@@ -536,10 +550,19 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
     }
 
     /**
+     * @param remap Remap flag.
+     * @param fut Future.
+     * @return {@code True} if skip future during remap.
+     */
+    private boolean skipFuture(boolean remap, MiniFuture fut) {
+        return remap && fut.rcvRes.get();
+    }
+
+    /**
      * @param fut Mini future.
-     * @return {@code False} if should stop mapping.
+     * @return Prepare error if any.
      */
-    private IgniteCheckedException prepare(final MiniFuture fut) {
+    @Nullable private IgniteCheckedException prepare(final MiniFuture fut) {
         GridDistributedTxMapping m = fut.mapping();
 
         final ClusterNode n = m.node();
@@ -575,7 +598,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
                 tx.userPrepare();
             }
             catch (IgniteCheckedException e) {
-                onError(m.node().id(), e);
+                fut.onResult(e);
 
                 return e;
             }
@@ -605,7 +628,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
             catch (ClusterTopologyCheckedException e) {
                 e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
 
-                fut.onResult(e);
+                fut.onNodeLeft(e);
 
                 return e;
             }
@@ -806,7 +829,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
          */
         void onResult(Throwable e) {
             if (rcvRes.compareAndSet(false, true)) {
-                err.compareAndSet(null, e);
+                onError(m.node().id(), e);
 
                 if (log.isDebugEnabled())
                     log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
@@ -822,7 +845,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         /**
          * @param e Node failure.
          */
-        void onResult(ClusterTopologyCheckedException e) {
+        void onNodeLeft(ClusterTopologyCheckedException e) {
             if (isDone())
                 return;
 
@@ -856,7 +879,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
                         assert cctx.kernalContext().clientNode();
                         assert m.clientFirst();
 
-                        tx.onClientRemap(m.node().id());
+                        tx.removeMapping(m.node().id());
 
                         ClientRemapFuture remapFut = new ClientRemapFuture();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1f3a5fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 9207bd0..5b2d50c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -597,20 +597,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /**
-     * @param nodeId Primary node id.
-     */
-    void onOptimisticException(UUID nodeId) {
-        mappings.remove(nodeId);
-    }
-
-    /**
-     * @param nodeId Primary node id.
-     */
-    void onClientRemap(UUID nodeId) {
-        mappings.remove(nodeId);
-    }
-
-    /**
      * @param nodeId Node ID to mark with explicit lock.
      * @return {@code True} if mapping was found.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1f3a5fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 9f74b9c..accecb6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -77,7 +78,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final boolean FAST = true;
+    private static final boolean FAST = false;
 
     /** */
     private static Map<Integer, Integer> storeMap = new ConcurrentHashMap8<>();
@@ -95,6 +96,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        cfg.setPeerClassLoadingEnabled(false);
+
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
         cfg.setClientMode(client);
@@ -266,8 +269,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
 
                         txAsync(cache, OPTIMISTIC, SERIALIZABLE,
                             new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
-                                @Override
-                                public Void apply(IgniteCache<Integer, Integer> cache) {
+                                @Override public Void apply(IgniteCache<Integer, Integer> cache) {
                                     cache.get(key);
 
                                     return null;
@@ -1305,6 +1307,373 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testRollbackNearCache1() throws Exception {
+        rollbackNearCacheWrite(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRollbackNearCache2() throws Exception {
+        rollbackNearCacheWrite(false);
+    }
+
+    /**
+     * @param near If {@code true} locks entry using the same near cache.
+     * @throws Exception If failed.
+     */
+    private void rollbackNearCacheWrite(boolean near) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        IgniteCache<Integer, Integer> cache0 =
+            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+
+        final String cacheName = cache0.getName();
+
+        try {
+            Ignite ignite = ignite(SRVS);
+
+            IgniteCache<Integer, Integer> cache = ignite.createNearCache(cacheName,
+                new NearCacheConfiguration<Integer, Integer>());
+
+            IgniteTransactions txs = ignite.transactions();
+
+            Integer key1 = primaryKey(ignite(0).cache(cacheName));
+            Integer key2 = primaryKey(ignite(1).cache(cacheName));
+            Integer key3 = primaryKey(ignite(2).cache(cacheName));
+
+            CountDownLatch latch = new CountDownLatch(1);
+
+            IgniteInternalFuture<?> fut = null;
+
+            try {
+                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    cache.put(key1, key1);
+                    cache.put(key2, key2);
+                    cache.put(key3, key3);
+
+                    fut = lockKey(latch, near ? cache : cache0, key2);
+
+                    tx.commit();
+                }
+
+                fail();
+            }
+            catch (TransactionOptimisticException e) {
+                log.info("Expected exception: " + e);
+            }
+
+            latch.countDown();
+
+            assert fut != null;
+
+            fut.get();
+
+            checkValue(key1, null, cacheName);
+            checkValue(key2, 1, cacheName);
+            checkValue(key3, null, cacheName);
+
+            try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                cache.put(key1, key1);
+                cache.put(key2, key2);
+                cache.put(key3, key3);
+
+                tx.commit();
+            }
+
+            checkValue(key1, key1, cacheName);
+            checkValue(key2, key2, cacheName);
+            checkValue(key3, key3, cacheName);
+        }
+        finally {
+            ignite0.destroyCache(cacheName);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRollbackNearCache3() throws Exception {
+        rollbackNearCacheRead(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRollbackNearCache4() throws Exception {
+        rollbackNearCacheRead(false);
+    }
+
+    /**
+     * @param near If {@code true} updates entry using the same near cache.
+     * @throws Exception If failed.
+     */
+    private void rollbackNearCacheRead(boolean near) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        IgniteCache<Integer, Integer> cache0 =
+            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+
+        final String cacheName = cache0.getName();
+
+        try {
+            Ignite ignite = ignite(SRVS);
+
+            IgniteCache<Integer, Integer> cache = ignite.createNearCache(cacheName,
+                new NearCacheConfiguration<Integer, Integer>());
+
+            IgniteTransactions txs = ignite.transactions();
+
+            Integer key1 = primaryKey(ignite(0).cache(cacheName));
+            Integer key2 = primaryKey(ignite(1).cache(cacheName));
+            Integer key3 = primaryKey(ignite(2).cache(cacheName));
+
+            cache0.put(key1, -1);
+            cache0.put(key2, -1);
+            cache0.put(key3, -1);
+
+            try {
+                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    cache.get(key1);
+                    cache.get(key2);
+                    cache.get(key3);
+
+                    updateKey(near ? cache : cache0, key2, -2);
+
+                    tx.commit();
+                }
+
+                fail();
+            }
+            catch (TransactionOptimisticException e) {
+                log.info("Expected exception: " + e);
+            }
+
+            checkValue(key1, -1, cacheName);
+            checkValue(key2, -2, cacheName);
+            checkValue(key3, -1, cacheName);
+
+            try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                cache.put(key1, key1);
+                cache.put(key2, key2);
+                cache.put(key3, key3);
+
+                tx.commit();
+            }
+
+            checkValue(key1, key1, cacheName);
+            checkValue(key2, key2, cacheName);
+            checkValue(key3, key3, cacheName);
+        }
+        finally {
+            ignite0.destroyCache(cacheName);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountTx1() throws Exception {
+        accountTx(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountTxNearCache() throws Exception {
+        accountTx(false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountTx2() throws Exception {
+        accountTx(true, false);
+    }
+
+    /**
+     * @param getAll If {@code true} uses getAll/putAll in transaction.
+     * @param nearCache If {@code true} near cache is enabled.
+     * @throws Exception If failed.
+     */
+    private void accountTx(final boolean getAll, final boolean nearCache) throws Exception {
+        final Ignite ignite0 = ignite(0);
+
+        final String cacheName =
+            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
+
+        try {
+            final List<Ignite> clients = clients();
+
+            final int ACCOUNTS = 100;
+            final int VAL_PER_ACCOUNT = 10_000;
+
+            IgniteCache<Integer, Account> srvCache = ignite0.cache(cacheName);
+
+            for (int i = 0; i < ACCOUNTS; i++)
+                srvCache.put(i, new Account(VAL_PER_ACCOUNT));
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            final int THREADS = 20;
+
+            final long stopTime = System.currentTimeMillis() + 10_000;
+
+            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int nodeIdx = idx.getAndIncrement() % clients.size();
+
+                    Ignite node = clients.get(nodeIdx);
+
+                    log.info("Tx thread: " + node.name());
+
+                    final IgniteTransactions txs = node.transactions();
+
+                    final IgniteCache<Integer, Account> cache =
+                        nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<Integer, Account>()) :
+                            node.<Integer, Account>cache(cacheName);
+
+                    assertNotNull(cache);
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (U.currentTimeMillis() < stopTime) {
+                        int id1 = rnd.nextInt(ACCOUNTS);
+
+                        int id2 = rnd.nextInt(ACCOUNTS);
+
+                        while (id2 == id1)
+                            id2 = rnd.nextInt(ACCOUNTS);
+
+                        try {
+                            while (true) {
+                                try {
+                                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                                        if (getAll) {
+                                            Map<Integer, Account> map = cache.getAll(F.asSet(id1, id2));
+
+                                            Account a1 = cache.get(id1);
+                                            Account a2 = cache.get(id2);
+
+                                            assertNotNull(a1);
+                                            assertNotNull(a2);
+
+                                            if (a1.value() > 0) {
+                                                a1 = new Account(a1.value() - 1);
+                                                a2 = new Account(a2.value() + 1);
+                                            }
+
+                                            map.put(id1, a1);
+                                            map.put(id2, a2);
+
+                                            cache.putAll(map);
+                                        }
+                                        else {
+                                            Account a1 = cache.get(id1);
+                                            Account a2 = cache.get(id2);
+
+                                            assertNotNull(a1);
+                                            assertNotNull(a2);
+
+                                            if (a1.value() > 0) {
+                                                a1 = new Account(a1.value() - 1);
+                                                a2 = new Account(a2.value() + 1);
+                                            }
+
+                                            cache.put(id1, a1);
+                                            cache.put(id2, a2);
+                                        }
+
+                                        tx.commit();
+                                    }
+
+                                    break;
+                                }
+                                catch (TransactionOptimisticException ignore) {
+                                    if (System.currentTimeMillis() > stopTime)
+                                        break;
+
+                                    // Retry.
+                                }
+                            }
+                        }
+                        catch (Throwable e) {
+                            log.error("Unexpected error: " + e, e);
+
+                            throw e;
+                        }
+                    }
+
+                    return null;
+                }
+            }, THREADS, "tx-thread");
+
+            fut.get(30_000);
+
+            int sum = 0;
+
+            for (int i = 0; i < ACCOUNTS; i++) {
+                Account a = srvCache.get(i);
+
+                assertNotNull(a);
+                assertTrue(a.value() >= 0);
+
+                log.info("Account: " + a.value());
+
+                sum += a.value();
+            }
+
+            assertEquals(ACCOUNTS * VAL_PER_ACCOUNT, sum);
+
+            for (int node = 0; node < SRVS + CLIENTS; node++) {
+                log.info("Verify node: " + node);
+
+                Ignite ignite = ignite(node);
+
+                IgniteCache<Integer, Account> cache = ignite.cache(cacheName);
+
+                sum = 0;
+
+                try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    Map<Integer, Account> map = new HashMap<>();
+
+                    for (int i = 0; i < ACCOUNTS; i++) {
+                        Account a = cache.get(i);
+
+                        assertNotNull(a);
+
+                        map.put(i, a);
+
+                        sum += a.value();
+                    }
+
+                    Account a1 = map.get(0);
+                    Account a2 = map.get(1);
+
+                    if (a1.value() > 0) {
+                        a1 = new Account(a1.value() - 1);
+                        a2 = new Account(a2.value() + 1);
+
+                        map.put(0, a1);
+                        map.put(1, a2);
+                    }
+
+                    cache.putAll(map);
+
+                    tx.commit();
+                }
+
+                assertEquals(ACCOUNTS * VAL_PER_ACCOUNT, sum);
+            }
+        }
+        finally {
+            ignite0.destroyCache(cacheName);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConcurrentUpdateNoDeadlock() throws Exception {
         concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false);
     }
@@ -1319,14 +1688,14 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testConcurrentUpdateNoDeadlockClients() throws Exception {
+    public void testConcurrentUpdateNoDeadlockFromClients() throws Exception {
         concurrentUpdateNoDeadlock(clients(), 20, false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception {
+    public void testConcurrentUpdateNoDeadlockFromClientsNodeRestart() throws Exception {
         concurrentUpdateNoDeadlock(clients(), 20, true);
     }
 
@@ -1356,12 +1725,15 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes,
         int threads,
         final boolean restart) throws Exception {
+        if (FAST)
+            return;
+
         assert updateNodes.size() > 0;
 
-        final Ignite ignite0 = ignite(0);
+        final Ignite srv = ignite(1);
 
         final String cacheName =
-            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
+            srv.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
 
         try {
             final int KEYS = 100;
@@ -1389,12 +1761,10 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
                     });
                 }
 
-                int ITERS = FAST ? 1 : 10;
-
-                for (int i = 0; i < ITERS; i++) {
+                for (int i = 0; i < 10; i++) {
                     log.info("Iteration: " + i);
 
-                    final long stopTime = U.currentTimeMillis() + (FAST ? 1000 : 10_000);
+                    final long stopTime = U.currentTimeMillis() + 10_000;
 
                     final AtomicInteger idx = new AtomicInteger();
 
@@ -1454,7 +1824,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
 
                     updateFut.get(60, SECONDS);
 
-                    IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName);
+                    IgniteCache<Integer, Integer> cache = srv.cache(cacheName);
 
                     for (int key = 0; key < KEYS; key++) {
                         Integer val = cache.get(key);
@@ -1474,7 +1844,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
             }
         }
         finally {
-            destroyCache(ignite(1), cacheName);
+            destroyCache(srv, cacheName);
         }
     }
 
@@ -1735,4 +2105,26 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
             return val;
         }
     }
+
+    /**
+     *
+     */
+    static class Account {
+        /** */
+        private final int val;
+
+        /**
+         * @param val Value.
+         */
+        public Account(int val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public int value() {
+            return val;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1f3a5fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
index 67bc08c..1ef77f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
@@ -162,9 +162,6 @@ public class GridCacheConcurrentTxMultiNodeTest extends GridCommonAbstractTest {
 
         c.setPeerClassLoadingEnabled(false);
 
-        // Enable tracing.
-//        Logger.getLogger("org.apache.ignite.kernal.processors.cache.GridCacheDgcManager.trace").setLevel(Level.DEBUG);
-
         return c;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1f3a5fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
index 7a1a0b9..191feb6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
@@ -225,7 +225,7 @@ public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstract
 
         final int ITERATIONS = 100;
 
-        for (int key0 = 0; key0 < 20; key0++) {
+        for (int key0 = 100_000; key0 < 100_000 + 20; key0++) {
             final int key = key0;
 
             cache.put(key, 0L);


Mime
View raw message