ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1607 WIP
Date Wed, 21 Oct 2015 09:01:18 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1607 de2bdc4e9 -> ca8d0364e


ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca8d0364
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca8d0364
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca8d0364

Branch: refs/heads/ignite-1607
Commit: ca8d0364ea186416533e63d73888ee8fdaf6d835
Parents: de2bdc4
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Oct 21 12:01:13 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Oct 21 12:01:13 2015 +0300

----------------------------------------------------------------------
 .../distributed/near/GridNearCacheEntry.java    | 27 +++----
 .../distributed/near/GridNearGetFuture.java     | 77 ++++++++++----------
 .../transactions/IgniteTxLocalAdapter.java      |  9 +--
 .../cache/CacheNearReaderUpdateTest.java        |  2 +-
 ...niteCacheClientNodeChangingTopologyTest.java |  3 +
 5 files changed, 57 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ca8d0364/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 7bfd979..0ad236c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -402,8 +402,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert dhtVer != null;
 
-        boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion());
-
         GridCacheVersion enqueueVer = null;
 
         try {
@@ -418,28 +416,25 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
                 CacheObject old = this.val;
                 boolean hasVal = hasValueUnlocked();
 
-                if (isNew() || !valid) {
+                if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) {
                     primaryNode(primaryNodeId, topVer);
 
-                    // Change entry only if dht version has changed.
-                    if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) {
-                        update(val, expireTime, ttl, ver);
+                    update(val, expireTime, ttl, ver);
 
-                        if (cctx.deferredDelete() && !isInternal()) {
-                            boolean deleted = val == null;
+                    if (cctx.deferredDelete() && !isInternal()) {
+                        boolean deleted = val == null;
 
-                            if (deleted != deletedUnlocked()) {
-                                deletedUnlocked(deleted);
+                        if (deleted != deletedUnlocked()) {
+                            deletedUnlocked(deleted);
 
-                                if (deleted)
-                                    enqueueVer = ver;
-                            }
+                            if (deleted)
+                                enqueueVer = ver;
                         }
+                    }
 
-                        this.dhtVer = dhtVer;
+                    this.dhtVer = dhtVer;
 
-                        ret = true;
-                    }
+                    ret = true;
                 }
 
                 if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca8d0364/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 7384a89..8dc273c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -268,16 +268,17 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
 
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(affNodes.size());
 
-        Map<KeyCacheObject, GridNearCacheEntry> savedVers = null;
+        Map<KeyCacheObject, GridNearCacheEntry> savedEntries = null;
 
         // Assign keys to primary nodes.
         for (KeyCacheObject key : keys)
-            savedVers = map(key, mappings, topVer, mapped, savedVers);
+            savedEntries = map(key, mappings, topVer, mapped, savedEntries);
 
         if (isDone())
             return;
 
-        final Map<KeyCacheObject, GridNearCacheEntry> saved = savedVers;
+        final Map<KeyCacheObject, GridNearCacheEntry> saved = savedEntries != null
? savedEntries :
+            Collections.<KeyCacheObject, GridNearCacheEntry>emptyMap();
 
         final int keysSize = keys.size();
 
@@ -335,9 +336,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
 
                             return Collections.emptyMap();
                         }
-                        finally {
-                            releaseEvictions(mappedKeys.keySet(), saved, topVer);
-                        }
                     }
                 }));
             }
@@ -385,7 +383,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
      * @param key Key to map.
      * @param topVer Topology version
      * @param mapped Previously mapped.
-     * @param savedVers Saved versions.
+     * @param saved Reserved near cache entries.
      * @return Map.
      */
     private Map<KeyCacheObject, GridNearCacheEntry> map(
@@ -393,16 +391,16 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings,
         AffinityTopologyVersion topVer,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
-        Map<KeyCacheObject, GridNearCacheEntry> savedVers
+        Map<KeyCacheObject, GridNearCacheEntry> saved
     ) {
         final GridNearCacheAdapter near = cache();
 
         // Allow to get cached value from the local node.
         boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(),
key, topVer);
 
-        GridCacheEntryEx entry = allowLocRead ? near.peekEx(key) : null;
-
         while (true) {
+            GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key)
: null;
+
             try {
                 CacheObject v = null;
                 GridCacheVersion ver = null;
@@ -506,7 +504,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                                 onDone(new ClusterTopologyServerNotFoundException("Failed
to map keys for cache " +
                                     "(all partition nodes left the grid)."));
 
-                                return savedVers;
+                                return saved;
                             }
 
                             if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled()
&& !skipVals)
@@ -557,7 +555,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                             onDone(new ClusterTopologyServerNotFoundException("Failed to
map keys for cache " +
                                 "(all partition nodes left the grid)."));
 
-                            return savedVers;
+                            return saved;
                         }
                     }
 
@@ -569,20 +567,22 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                                 MAX_REMAP_CNT + " attempts (key got remapped to the same
node) " +
                                 "[key=" + key + ", node=" + U.toShortString(affNode) + ",
mappings=" + mapped + ']'));
 
-                            return savedVers;
+                            return saved;
                         }
                     }
 
-                    GridNearCacheEntry nearEntry = near.entryExx(key, topVer);
+                    if (!cctx.affinity().localNode(key, topVer)) {
+                        GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key,
topVer);
 
-                    nearEntry.reserveEviction();
+                        nearEntry.reserveEviction();
 
-                    entry = null;
+                        entry = null;
 
-                    if (savedVers == null)
-                        savedVers = U.newHashMap(3);
+                        if (saved == null)
+                            saved = U.newHashMap(3);
 
-                    savedVers.put(key, nearEntry);
+                        saved.put(key, nearEntry);
+                    }
 
                     // Don't add reader if transaction acquires lock anyway to avoid deadlock.
                     boolean addRdr = tx == null || tx.optimistic();
@@ -606,7 +606,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                 break;
             }
             catch (GridCacheEntryRemovedException ignored) {
-                entry = allowLocRead ? near.peekEx(key) : null;
+                // Retry.
             }
             finally {
                 if (entry != null && tx == null)
@@ -614,7 +614,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
             }
         }
 
-        return savedVers;
+        return saved;
     }
 
     /**
@@ -657,7 +657,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
      * @param nodeId Node id.
      * @param keys Keys.
      * @param infos Entry infos.
-     * @param savedVers Saved versions.
+     * @param savedEntries Saved entries.
      * @param topVer Topology version
      * @return Result map.
      */
@@ -665,7 +665,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
         UUID nodeId,
         Collection<KeyCacheObject> keys,
         Collection<GridCacheEntryInfo> infos,
-        Map<KeyCacheObject, GridNearCacheEntry> savedVers,
+        Map<KeyCacheObject, GridNearCacheEntry> savedEntries,
         AffinityTopologyVersion topVer
     ) {
         boolean empty = F.isEmpty(keys);
@@ -683,9 +683,10 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
 
                     // Entries available locally in DHT should not be loaded into near cache
for reading.
                     if (!cctx.affinity().localNode(info.key(), cctx.affinity().affinityTopologyVersion()))
{
-                        GridNearCacheEntry entry = savedVers.get(info.key());
+                        GridNearCacheEntry entry = savedEntries.get(info.key());
 
-                        assert entry != null : info.key();
+                        if (entry == null)
+                            entry = cache().entryExx(info.key(), topVer);
 
                         // Load entry into cache.
                         entry.loadedValue(tx,
@@ -743,12 +744,12 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
         for (KeyCacheObject key : keys) {
             GridNearCacheEntry entry = saved.get(key);
 
-            assert entry != null : key;
+            if (entry != null) {
+                entry.releaseEviction();
 
-            entry.releaseEviction();
-
-            if (tx == null)
-                cctx.evicts().touch(entry, topVer);
+                if (tx == null)
+                    cctx.evicts().touch(entry, topVer);
+            }
         }
     }
 
@@ -791,7 +792,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
         private LinkedHashMap<KeyCacheObject, Boolean> keys;
 
         /** Saved entry versions. */
-        private Map<KeyCacheObject, GridNearCacheEntry> savedVers;
+        private Map<KeyCacheObject, GridNearCacheEntry> savedEntries;
 
         /** Topology version on which this future was mapped. */
         private AffinityTopologyVersion topVer;
@@ -802,18 +803,18 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
         /**
          * @param node Node.
          * @param keys Keys.
-         * @param savedVers Saved entry versions.
+         * @param savedEntries Saved entries.
          * @param topVer Topology version.
          */
         MiniFuture(
             ClusterNode node,
             LinkedHashMap<KeyCacheObject, Boolean> keys,
-            Map<KeyCacheObject, GridNearCacheEntry> savedVers,
+            Map<KeyCacheObject, GridNearCacheEntry> savedEntries,
             AffinityTopologyVersion topVer
         ) {
             this.node = node;
             this.keys = keys;
-            this.savedVers = savedVers;
+            this.savedEntries = savedEntries;
             this.topVer = topVer;
         }
 
@@ -852,7 +853,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
         /** {@inheritDoc} */
         @Override public boolean onDone(@Nullable Map<K, V> res, @Nullable Throwable
err) {
             if (super.onDone(res, err)) {
-                releaseEvictions(keys.keySet(), savedVers, topVer);
+                releaseEvictions(keys.keySet(), savedEntries, topVer);
 
                 return true;
             }
@@ -954,7 +955,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     }), F.t(node, keys), topVer);
 
                     // It is critical to call onDone after adding futures to compound list.
-                    onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers,
topVer));
+                    onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries,
topVer));
 
                     return;
                 }
@@ -974,12 +975,12 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                         }), F.t(node, keys), new AffinityTopologyVersion(readyTopVer));
 
                         // It is critical to call onDone after adding futures to compound
list.
-                        onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers,
topVer));
+                        onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries,
topVer));
                     }
                 });
             }
             else
-                onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer));
+                onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries,
topVer));
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca8d0364/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index fd807c9..d495103 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -999,6 +999,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                         txEntry.conflictVersion(explicitVer);
                                     }
 
+                                    if (dhtVer == null)
+                                        dhtVer = explicitVer != null ? explicitVer : writeVersion();
+
                                     if (op == CREATE || op == UPDATE) {
                                         GridCacheUpdateTxResult updRes = cached.innerSet(
                                             this,
@@ -1020,9 +1023,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             dhtVer);
 
                                         if (nearCached != null && updRes.success())
{
-                                            if (dhtVer == null)
-                                                dhtVer = explicitVer != null ? explicitVer
: writeVersion();
-
                                             nearCached.innerSet(
                                                 null,
                                                 eventNodeId(),
@@ -1061,9 +1061,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             dhtVer);
 
                                         if (nearCached != null && updRes.success())
{
-                                            if (dhtVer == null)
-                                                dhtVer = explicitVer != null ? explicitVer
: writeVersion();
-
                                             nearCached.innerRemove(
                                                 null,
                                                 eventNodeId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca8d0364/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
index aed2db8..c2f9fab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
@@ -115,7 +115,7 @@ public class CacheNearReaderUpdateTest extends GridCommonAbstractTest
{
 
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
-        return 5 * 60_000;
+        return 10 * 60_000;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca8d0364/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index f0d7582..cb83798 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -805,6 +805,9 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
 
+        for (int i = 0; i < 100; i++)
+            primaryCache(i, null).put(i, -1);
+
         final Map<Integer, Integer> map = new HashMap<>();
 
         for (int i = 0; i < 100; i++)


Mime
View raw message