ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1607 WIP
Date Thu, 22 Oct 2015 13:03:24 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1607 2322eb3f1 -> 9f77d2723


ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9f77d272
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9f77d272
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9f77d272

Branch: refs/heads/ignite-1607
Commit: 9f77d2723ae5d097f920b8e12be43bf6e83f4115
Parents: 2322eb3
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Oct 22 16:03:02 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Oct 22 16:03:02 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 19 ++--
 .../processors/cache/GridCacheEntryEx.java      |  4 +-
 .../processors/cache/GridCacheMapEntry.java     | 36 ++++----
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  7 --
 .../distributed/dht/GridDhtTxPrepareFuture.java | 38 ++------
 .../distributed/near/GridNearCacheEntry.java    | 21 -----
 ...arOptimisticSerializableTxPrepareFuture.java | 53 ++---------
 .../near/GridNearTransactionalCache.java        |  2 +-
 .../cache/distributed/near/GridNearTxLocal.java |  2 +-
 .../transactions/IgniteTxLocalAdapter.java      | 59 +++++++------
 .../datastreamer/DataStreamProcessor.java       |  3 +-
 .../datastreamer/DataStreamerImpl.java          | 37 ++++++--
 .../datastreamer/DataStreamerRequest.java       | 34 +++++++-
 .../datastreamer/DataStreamerUpdateJob.java     | 33 ++++---
 .../IgniteTxOptimisticCheckedException.java     | 27 ------
 .../CacheSerializableTransactionsTest.java      | 92 ++++++++++++++++++++
 .../processors/cache/GridCacheTestEntryEx.java  |  8 +-
 17 files changed, 261 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 400d76e..74951b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1680,9 +1680,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         ctx.closures().callLocalSafe(ctx.projectSafe(new GPC<Map<K1, V1>>() {
                             @Override public Map<K1, V1> call() throws Exception {
                                 ctx.store().loadAll(null/*tx*/, loadKeys.keySet(), new CI2<KeyCacheObject, Object>() {
-                                    /** New version for all new entries. */
-                                    private GridCacheVersion nextVer;
-
                                     @Override public void apply(KeyCacheObject key, Object val) {
                                         GridCacheVersion ver = loadKeys.get(key);
 
@@ -1694,10 +1691,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                             return;
                                         }
 
-                                        // Initialize next version.
-                                        if (nextVer == null)
-                                            nextVer = ctx.versions().next();
-
                                         loaded.add(key);
 
                                         CacheObject cacheVal = ctx.toCacheObject(val);
@@ -1706,11 +1699,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                             GridCacheEntryEx entry = entryEx(key);
 
                                             try {
-                                                boolean set = entry.versionedValue(cacheVal, ver, nextVer);
+                                                GridCacheVersion verSet = entry.versionedValue(cacheVal, ver, null);
+
+                                                boolean set = verSet != null;
 
                                                 if (log.isDebugEnabled())
-                                                    log.debug("Set value loaded from store into entry [set=" + set +
-                                                        ", curVer=" + ver + ", newVer=" + nextVer + ", " +
+                                                    log.debug("Set value loaded from store into entry [" +
+                                                        "set=" + set +
+                                                        ", curVer=" + ver +
+                                                        ", newVer=" + verSet + ", " +
                                                         "entry=" + entry + ']');
 
                                                 // Don't put key-value pair into result map if value is null.
@@ -1718,7 +1715,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                                     if (needVer) {
                                                         assert keepCacheObjects;
 
-                                                        map.put((K1)key, (V1)new T2<>(cacheVal, set ? nextVer : ver));
+                                                        map.put((K1)key, (V1)new T2<>(cacheVal, set ? verSet : ver));
                                                     }
                                                     else {
                                                         ctx.addResult(map,

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 0b5a859..3b636b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -696,11 +696,11 @@ public interface GridCacheEntryEx {
      * @param val New value.
      * @param curVer Version to match or {@code null} if match is not required.
      * @param newVer Version to set.
-     * @return {@code True} if versioned matched.
+     * @return Non null version if value was set.
      * @throws IgniteCheckedException If index could not be updated.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    public boolean versionedValue(CacheObject val,
+    public GridCacheVersion versionedValue(CacheObject val,
         @Nullable GridCacheVersion curVer,
         @Nullable GridCacheVersion newVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 5608b4c..621ed99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3208,11 +3208,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized boolean versionedValue(CacheObject val,
+    @Override public synchronized GridCacheVersion versionedValue(CacheObject val,
         GridCacheVersion curVer,
         GridCacheVersion newVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
-        assert newVer != null;
 
         checkObsolete();
 
@@ -3220,33 +3219,34 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             GridCacheMvcc mvcc = mvccExtras();
 
             if (mvcc != null && !mvcc.isEmpty())
-                return false;
+                return null;
 
-            if (val != this.val) {
-                CacheObject old = rawGetOrUnmarshalUnlocked(false);
+            if (newVer == null)
+                newVer = cctx.versions().next();
 
-                long ttl = ttlExtras();
+            CacheObject old = rawGetOrUnmarshalUnlocked(false);
 
-                long expTime = CU.toExpireTime(ttl);
+            long ttl = ttlExtras();
 
-                // Detach value before index update.
-                val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+            long expTime = CU.toExpireTime(ttl);
 
-                if (val != null) {
-                    updateIndex(val, expTime, newVer, old);
+            // Detach value before index update.
+            val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
 
-                    if (deletedUnlocked())
-                        deletedUnlocked(false);
-                }
+            if (val != null) {
+                updateIndex(val, expTime, newVer, old);
 
-                // Version does not change for load ops.
-                update(val, expTime, ttl, newVer);
+                if (deletedUnlocked())
+                    deletedUnlocked(false);
             }
 
-            return true;
+            // Version does not change for load ops.
+            update(val, expTime, ttl, newVer);
+
+            return newVer;
         }
 
-        return false;
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index c4fda29..8c7d985 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -189,13 +189,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     }
 
     /**
-     * @return {@code True} if originating node has a near cache that participates in this transaction.
-     */
-    public boolean nearOnOriginatingNode() {
-        return nearOnOriginatingNode;
-    }
-
-    /**
      * @return {@code True} if explicit lock transaction.
      */
     public boolean explicitLock() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 7d281d5..6f8ac2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -689,14 +689,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
             tx.implicitSingleResult(ret);
         }
-        else if (prepErr instanceof IgniteTxOptimisticCheckedException) {
-            IgniteTxOptimisticCheckedException err = (IgniteTxOptimisticCheckedException)prepErr;
-
-            if (err.values() != null) {
-                for (Map.Entry<IgniteTxKey, CacheVersionedValue> e : err.values().entrySet())
-                    res.addOwnedValue(e.getKey(), e.getValue());
-            }
-        }
 
         res.filterFailedKeys(filterFailedKeys);
 
@@ -943,7 +935,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     entry.cached().unswap();
 
                     if (!entry.cached().checkSerializableReadVersion(serReadVer))
-                        return versionCheckError(entry, null);
+                        return versionCheckError(entry);
                 }
             }
         }
@@ -991,16 +983,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
     /**
      * @param entry Entry.
-     * @param vals Values.
      * @return Optimistic version check error.
      */
-    private IgniteTxOptimisticCheckedException versionCheckError(IgniteTxEntry entry,
-        Map<IgniteTxKey, CacheVersionedValue> vals) {
+    private IgniteTxOptimisticCheckedException versionCheckError(IgniteTxEntry entry) {
         GridCacheContext cctx = entry.context();
 
         return new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " +
             "read/write conflict [key=" + entry.key().value(cctx.cacheObjectContext(), false) +
-            ", cache=" + cctx.name() + ']', vals);
+            ", cache=" + cctx.name() + ']');
     }
 
     /**
@@ -1012,26 +1002,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 IgniteCheckedException err0 = null;
 
                 try {
-                    if (tx.nearOnOriginatingNode()) {
-                        Map<IgniteTxKey, CacheVersionedValue> vals;
-
-                        vals = checkReadConflict(writes, null);
-                        vals = checkReadConflict(reads, vals);
-
-                        if (vals != null) {
-                            IgniteTxEntry entry = tx.entry(F.firstKey(vals));
+                    err0 = checkReadConflict(writes);
 
-                            assert entry != null;
-
-                            err0 = versionCheckError(entry, vals);
-                        }
-                    }
-                    else {
-                        err0 = checkReadConflict(writes);
-
-                        if (err0 == null)
-                            err0 = checkReadConflict(reads);
-                    }
+                    if (err0 == null)
+                        err0 = checkReadConflict(reads);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to check entry version: " + e, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 0ad236c..d558cc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -353,27 +353,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     }
 
     /**
-     * @param primaryNodeId Primary node ID.
-     * @param topVer Topology version.
-     * @param val Value.
-     * @param dhtVer DHT version received from remote node.
-     * @throws GridCacheEntryRemovedException If entry was removed.
-     */
-    synchronized public void loadedValue(UUID primaryNodeId,
-        AffinityTopologyVersion topVer,
-        CacheObject val,
-        GridCacheVersion dhtVer)
-        throws GridCacheEntryRemovedException {
-        checkObsolete();
-
-        if (recordDhtVersion(dhtVer)) {
-            primaryNode(primaryNodeId, topVer);
-
-            value(val);
-        }
-    }
-
-    /**
      * @param tx Transaction.
      * @param primaryNodeId Primary node ID.
      * @param val New value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 48e44e6..eb88149 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -187,9 +187,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     /**
      * @param m Failed mapping.
      * @param e Error.
-     * @param res Response.
      */
-    private void onError(@Nullable GridDistributedTxMapping m, Throwable e, GridNearTxPrepareResponse res) {
+    private void onError(@Nullable GridDistributedTxMapping m, Throwable e) {
         if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
             if (tx.onePhaseCommit()) {
                 tx.markForBackupCheck();
@@ -201,44 +200,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         }
 
         if (e instanceof IgniteTxOptimisticCheckedException) {
-            if (res != null) {
-                assert m != null;
-
-                Map<IgniteTxKey, CacheVersionedValue> ownVals = res.ownedValues();
-
-                if (ownVals != null) {
-                    for (Map.Entry<IgniteTxKey, CacheVersionedValue> val : ownVals.entrySet()) {
-                        IgniteTxEntry txEntry = tx.entry(val.getKey());
-
-                        assert txEntry != null : val.getKey();
-
-                        GridCacheContext cctx = txEntry.context();
-
-                        if (cctx.isNear()) {
-                            GridNearCacheEntry entry = (GridNearCacheEntry)txEntry.cached();
-
-                            assert entry != null;
-
-                            while (true) {
-                                try {
-                                    CacheVersionedValue verVal = val.getValue();
-
-                                    entry.loadedValue(m.node().id(),
-                                        tx.topologyVersion(),
-                                        verVal.value(),
-                                        verVal.version());
-
-                                    break;
-                                }
-                                catch (GridCacheEntryRemovedException rmvErr) {
-                                    txEntry.cached(entry.context().cache().entryEx(entry.key()));
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-
             if (m != null)
                 tx.removeMapping(m.node().id());
         }
@@ -324,14 +285,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                 if (tx.setRollbackOnly()) {
                     if (tx.timedOut())
                         onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
-                            "was rolled back: " + this), null);
+                            "was rolled back: " + this));
                     else
                         onError(null, new IgniteCheckedException("Invalid transaction state for prepare " +
-                            "[state=" + tx.state() + ", tx=" + this + ']'), null);
+                            "[state=" + tx.state() + ", tx=" + this + ']'));
                 }
                 else
                     onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
-                        "prepare [state=" + tx.state() + ", tx=" + this + ']'), null);
+                        "prepare [state=" + tx.state() + ", tx=" + this + ']'));
 
                 return;
             }
@@ -745,7 +706,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
          */
         void onResult(Throwable e) {
             if (rcvRes.compareAndSet(false, true)) {
-                onError(m, e, null);
+                onError(m, e);
 
                 if (log.isDebugEnabled())
                     log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
@@ -769,7 +730,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                 if (log.isDebugEnabled())
                     log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
 
-                onError(null, e, null);
+                onError(null, e);
 
                 onDone(e);
             }
@@ -786,7 +747,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             if (rcvRes.compareAndSet(false, true)) {
                 if (res.error() != null) {
                     // Fail the whole compound future.
-                    onError(m, res.error(), res);
+                    onError(m, res.error());
 
                     onDone(res.error());
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 00b78b3..13743d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -186,7 +186,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
             keys,
             readThrough,
-            false,
+            /*force primary*/needVer,
             tx,
             CU.subjectId(tx, ctx.shared()),
             tx.resolveTaskName(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index e856b24..67fd81e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -380,7 +380,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             return cacheCtx.colocated().loadAsync(
                 keys,
                 readThrough,
-                /*force primary*/false,
+                /*force primary*/needVer,
                 topologyVersion(),
                 CU.subjectId(this, cctx),
                 resolveTaskName(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 6dee304..4d3ad72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -486,31 +486,28 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 final Map<KeyCacheObject, GridCacheVersion> misses0 = misses;
 
                 cacheCtx.store().loadAll(this, misses.keySet(), new CI2<KeyCacheObject, Object>() {
-                    private GridCacheVersion nextVer;
-
                     @Override public void apply(KeyCacheObject key, Object val) {
                         GridCacheVersion ver = misses0.remove(key);
 
                         assert ver != null : key;
 
                         if (val != null) {
-                            if (nextVer == null)
-                                nextVer = cacheCtx.versions().next();
-
                             CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
                             while (true) {
                                 GridCacheEntryEx entry = cacheCtx.cache().entryEx(key);
 
                                 try {
-                                    boolean set = entry.versionedValue(cacheVal, ver, nextVer);
+                                    GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null);
+
+                                    boolean set = setVer != null;
 
                                     if (set)
-                                        ver = nextVer;
+                                        ver = setVer;
 
                                     if (log.isDebugEnabled())
                                         log.debug("Set value loaded from store into entry [set=" + set +
-                                            ", curVer=" + ver + ", newVer=" + nextVer + ", " +
+                                            ", curVer=" + ver + ", newVer=" + setVer + ", " +
                                             "entry=" + entry + ']');
 
                                     break;
@@ -1316,6 +1313,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param entry Entry.
+     * @return {@code True} if local node is current primary for given entry.
+     */
+    private boolean primaryLocal(GridCacheEntryEx entry) {
+        return entry.context().affinity().primary(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE);
+    }
+
+    /**
      * Checks if there is a cached or swapped value for
      * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method.
      *
@@ -1452,15 +1457,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
 
                             if (needReadVer) {
-                                T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /*metrics*/true,
-                                    /*event*/true,
-                                    CU.subjectId(this, cctx),
-                                    null,
-                                    resolveTaskName(),
-                                    accessPlc);
+                                T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+                                    entry.innerGetVersioned(this,
+                                        /*swap*/true,
+                                        /*unmarshal*/true,
+                                        /*metrics*/true,
+                                        /*event*/true,
+                                        CU.subjectId(this, cctx),
+                                        null,
+                                        resolveTaskName(),
+                                        accessPlc) : null;
 
                                 if (res != null) {
                                     val = res.get1();
@@ -2129,15 +2135,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             if (optimistic() && !implicit()) {
                                 try {
                                     if (needReadVer) {
-                                        T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this,
-                                            /*swap*/false,
-                                            /*unmarshal*/retval,
-                                            /*metrics*/retval,
-                                            /*events*/retval,
-                                            CU.subjectId(this, cctx),
-                                            entryProcessor,
-                                            resolveTaskName(),
-                                            null);
+                                        T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+                                            entry.innerGetVersioned(this,
+                                                /*swap*/false,
+                                                /*unmarshal*/retval,
+                                                /*metrics*/retval,
+                                                /*events*/retval,
+                                                CU.subjectId(this, cctx),
+                                                entryProcessor,
+                                                resolveTaskName(),
+                                                null) : null;
 
                                         if (res != null) {
                                             old = res.get1();

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 5150d83..20a013b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -289,7 +289,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
                 col,
                 req.ignoreDeploymentOwnership(),
                 req.skipStore(),
-                updater);
+                updater,
+                req.version());
 
             Exception err = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index ab2a6e8..bf9dc78 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -213,6 +213,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     /** */
     private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
 
+    /** */
+    private GridCacheVersion ver;
+
     /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */
     private static boolean isWarningPrinted;
 
@@ -1242,8 +1245,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             IgniteInternalFuture<Object> fut;
 
             if (isLocNode) {
-                fut = ctx.closure().callLocalSafe(
-                    new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, rcvr), false);
+                DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx,
+                    log,
+                    cacheName,
+                    entries,
+                    false,
+                    skipStore,
+                    rcvr,
+                    ver);
+
+                fut = ctx.closure().callLocalSafe(job, false);
 
                 locFuts.add(fut);
 
@@ -1277,6 +1288,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                         assert rcvr != null;
 
                         updaterBytes = ctx.config().getMarshaller().marshal(rcvr);
+
+                        if (rcvr == ISOLATED_UPDATER)
+                            ver = ctx.cache().context().versions().next();
                     }
 
                     if (topicBytes == null)
@@ -1337,7 +1351,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     dep != null ? dep.participants() : null,
                     dep != null ? dep.classLoaderId() : null,
                     dep == null,
-                    topVer);
+                    topVer,
+                    ver);
 
                 try {
                     ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
@@ -1537,7 +1552,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     /**
      * Isolated receiver which only loads entry initial value.
      */
-    private static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>,
+    static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>,
         DataStreamerCacheUpdaters.InternalUpdater {
         /** */
         private static final long serialVersionUID = 0L;
@@ -1545,6 +1560,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         /** {@inheritDoc} */
         @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache,
             Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
+            receive(cache, entries, null);
+        }
+
+        /**
+         * @param cache Cache.
+         * @param entries Entries.
+         * @param ver Entries version.
+         */
+        void receive(IgniteCache<KeyCacheObject, CacheObject> cache,
+            Collection<Map.Entry<KeyCacheObject, CacheObject>> entries,
+            GridCacheVersion ver) {
             IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache;
 
             GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache();
@@ -1556,7 +1582,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
             AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
 
-            GridCacheVersion ver = cctx.versions().next(topVer);
+            if (ver == null)
+                ver = cctx.versions().next(topVer);
 
             long ttl = CU.TTL_ETERNAL;
             long expiryTime = CU.EXPIRE_TIME_ETERNAL;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index c1a1528..59810ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -87,6 +88,9 @@ public class DataStreamerRequest implements Message {
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    private GridCacheVersion ver;
+
     /**
      * {@code Externalizable} support.
      */
@@ -109,6 +113,7 @@ public class DataStreamerRequest implements Message {
      * @param clsLdrId Class loader ID.
      * @param forceLocDep Force local deployment.
      * @param topVer Topology version.
+     * @param ver Entries version for {@link DataStreamerImpl.IsolatedUpdater}.
      */
     public DataStreamerRequest(long reqId,
         byte[] resTopicBytes,
@@ -123,7 +128,8 @@ public class DataStreamerRequest implements Message {
         Map<UUID, IgniteUuid> ldrParticipants,
         IgniteUuid clsLdrId,
         boolean forceLocDep,
-        @NotNull AffinityTopologyVersion topVer) {
+        @NotNull AffinityTopologyVersion topVer,
+        @Nullable GridCacheVersion ver) {
         assert topVer != null;
 
         this.reqId = reqId;
@@ -140,6 +146,14 @@ public class DataStreamerRequest implements Message {
         this.clsLdrId = clsLdrId;
         this.forceLocDep = forceLocDep;
         this.topVer = topVer;
+        this.ver = ver;
+    }
+
+    /**
+     * @return Version.
+     */
+    @Nullable public GridCacheVersion version() {
+        return ver;
     }
 
     /**
@@ -341,6 +355,12 @@ public class DataStreamerRequest implements Message {
 
                 writer.incrementState();
 
+            case 14:
+                if (!writer.writeMessage("ver", ver))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -470,6 +490,14 @@ public class DataStreamerRequest implements Message {
 
                 reader.incrementState();
 
+            case 14:
+                ver = reader.readMessage("ver");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(DataStreamerRequest.class);
@@ -482,6 +510,6 @@ public class DataStreamerRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 14;
+        return 15;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
index 42084a3..12eee88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -56,6 +57,9 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> {
     /** */
     private final StreamReceiver rcvr;
 
+    /** */
+    private final GridCacheVersion ver;
+
     /**
      * @param ctx Context.
      * @param log Log.
@@ -64,6 +68,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> {
      * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
      * @param skipStore Skip store flag.
      * @param rcvr Updater.
+     * @param ver Entries version for {@link DataStreamerImpl.IsolatedUpdater}.
      */
     DataStreamerUpdateJob(
         GridKernalContext ctx,
@@ -72,7 +77,8 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> {
         Collection<DataStreamerEntry> col,
         boolean ignoreDepOwnership,
         boolean skipStore,
-        StreamReceiver<?, ?> rcvr) {
+        StreamReceiver<?, ?> rcvr,
+        @Nullable GridCacheVersion ver) {
         this.ctx = ctx;
         this.log = log;
 
@@ -84,6 +90,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> {
         this.ignoreDepOwnership = ignoreDepOwnership;
         this.skipStore = skipStore;
         this.rcvr = rcvr;
+        this.ver = ver;
     }
 
     /** {@inheritDoc} */
@@ -119,17 +126,21 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> {
                     checkSecurityPermission(SecurityPermission.CACHE_REMOVE);
             }
 
-            if (unwrapEntries()) {
-                Collection<Map.Entry> col0 = F.viewReadOnly(col, new C1<DataStreamerEntry, Map.Entry>() {
-                    @Override public Map.Entry apply(DataStreamerEntry e) {
-                        return e.toEntry(cctx);
-                    }
-                });
-
-                rcvr.receive(cache, col0);
+            if (rcvr instanceof DataStreamerImpl.IsolatedUpdater)
+                ((DataStreamerImpl.IsolatedUpdater)rcvr).receive(cache, (Collection)col, ver);
+            else {
+                if (unwrapEntries()) {
+                    Collection<Map.Entry> col0 = F.viewReadOnly(col, new C1<DataStreamerEntry, Map.Entry>() {
+                        @Override public Map.Entry apply(DataStreamerEntry e) {
+                            return e.toEntry(cctx);
+                        }
+                    });
+
+                    rcvr.receive(cache, col0);
+                }
+                else
+                    rcvr.receive(cache, col);
             }
-            else
-                rcvr.receive(cache, col);
 
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java
index 3324779..b2b0e2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java
@@ -17,11 +17,7 @@
 
 package org.apache.ignite.internal.transactions;
 
-import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 
 /**
  * Exception thrown whenever grid transactions fail optimistically.
@@ -30,10 +26,6 @@ public class IgniteTxOptimisticCheckedException extends IgniteCheckedException {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** */
-    @GridToStringExclude
-    private transient Map<IgniteTxKey, CacheVersionedValue> vals;
-
     /**
      * Creates new optimistic exception with given error message.
      *
@@ -44,25 +36,6 @@ public class IgniteTxOptimisticCheckedException extends IgniteCheckedException {
     }
 
     /**
-     * Creates new optimistic exception with given error message.
-     *
-     * @param msg Error message.
-     * @param vals Current values.
-     */
-    public IgniteTxOptimisticCheckedException(String msg, Map<IgniteTxKey, CacheVersionedValue> vals) {
-        super(msg);
-
-        this.vals = vals;
-    }
-
-    /**
-     * @return Current values.
-     */
-    public Map<IgniteTxKey, CacheVersionedValue> values() {
-        return vals;
-    }
-
-    /**
      * Creates new optimistic exception with given error message and optional nested exception.
      *
      * @param msg Error message.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 510f3e7..70ddfa0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -42,6 +42,7 @@ import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheMode;
@@ -149,6 +150,97 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testTxStreamerLoad() throws Exception {
+        txStreamerLoad(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxStreamerLoadAllowOverwrite() throws Exception {
+        txStreamerLoad(true);
+    }
+
+    /**
+     * @param allowOverwrite Streamer flag.
+     * @throws Exception If failed.
+     */
+    private void txStreamerLoad(boolean allowOverwrite) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            if (ccfg.getCacheStoreFactory() == null)
+                continue;
+
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                List<Integer> keys = testKeys(cache);
+
+                for (Integer key : keys)
+                    txStreamerLoad(ignite0, key, cache.getName(), allowOverwrite);
+
+                txStreamerLoad(ignite(SRVS), 10_000, cache.getName(), allowOverwrite);
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     * @param key Key.
+     * @param cacheName Cache name.
+     * @param allowOverwrite Streamer flag.
+     * @throws Exception If failed.
+     */
+    private void txStreamerLoad(Ignite ignite,
+        Integer key,
+        String cacheName,
+        boolean allowOverwrite) throws Exception {
+        IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+
+        log.info("Test key: " + key);
+
+        Integer loadVal = -1;
+
+        IgniteTransactions txs = ignite.transactions();
+
+        try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(cache.getName())) {
+            streamer.allowOverwrite(allowOverwrite);
+
+            streamer.addData(key, loadVal);
+        }
+
+        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+            Integer val = cache.get(key);
+
+            assertEquals(loadVal, val);
+
+            tx.commit();
+        }
+
+        checkValue(key, loadVal, cache.getName());
+
+        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+            Integer val = cache.get(key);
+
+            assertEquals(loadVal, val);
+
+            cache.put(key, 0);
+
+            tx.commit();
+        }
+
+        checkValue(key, 0, cache.getName());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testTxLoadFromStore() throws Exception {
         Ignite ignite0 = ignite(0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index e074583..84cc572 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -670,8 +670,12 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
     }
 
     /** @inheritDoc */
-    @Override public boolean versionedValue(CacheObject val, GridCacheVersion curVer, GridCacheVersion newVer) {
-        assert false; return false;
+    @Override public GridCacheVersion versionedValue(CacheObject val,
+        GridCacheVersion curVer,
+        GridCacheVersion newVer) {
+        assert false;
+
+        return null;
     }
 
     /** @inheritDoc */


Mime
View raw message