ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [13/46] ignite git commit: 1093
Date Fri, 02 Oct 2015 10:58:27 GMT
1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7e91796
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7e91796
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7e91796

Branch: refs/heads/ignite-1093-2
Commit: b7e9179604833dad2b358917413d092989f2bd55
Parents: a8b323d
Author: Anton Vinogradov <av@apache.org>
Authored: Wed Sep 23 13:00:42 2015 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Wed Sep 23 13:00:42 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 42 ++++++++++++-----
 .../dht/preloader/GridDhtPartitionSupplier.java | 49 +++++++++++---------
 2 files changed, 57 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b7e91796/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index bbb6a21..345e3bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -64,11 +64,11 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
@@ -391,7 +391,7 @@ public class GridDhtPartitionDemander {
 
             //Check remote node rebalancing API version.
             if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION)))
{
-                fut.appendPartitions(node.id(), d.partitions());
+                fut.appendPartitions(node.id(), d.partitions(), d.updateSequence());
 
                 int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
@@ -418,10 +418,13 @@ public class GridDhtPartitionDemander {
 
                         try {
                             if (!topologyChanged(fut)) {
-                                cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt),
initD, cctx.ioPolicy(), d.timeout());
+                                cctx.io().sendOrderedMessage(node,
+                                    GridCachePartitionExchangeManager.rebalanceTopic(cnt),
initD, cctx.ioPolicy(), d.timeout());
 
                                 if (log.isDebugEnabled())
-                                    log.debug("Requested rebalancing [from node=" + node.id()
+ ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt))
+ ")]");
+                                    log.debug("Requested rebalancing [from node=" + node.id()
+ ", listener index=" +
+                                        cnt + ", partitions count=" + sParts.get(cnt).size()
+
+                                        " (" + partitionsList(sParts.get(cnt)) + ")]");
 
                             }
                             else {
@@ -441,7 +444,7 @@ public class GridDhtPartitionDemander {
             else {
                 DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
 
-                fut.appendPartitions(node.id(), d.partitions());
+                fut.appendPartitions(node.id(), d.partitions(), d.updateSequence());
 
                 dw.run(node, d);
             }
@@ -513,7 +516,7 @@ public class GridDhtPartitionDemander {
 
         assert node != null;
 
-        if (!fut.topologyVersion().equals(topVer) || topologyChanged(fut))
+        if (!fut.topologyVersion().equals(topVer) || topologyChanged(fut) || !fut.isActual(id,
supply.updateSequence()))
             return;
 
         if (log.isDebugEnabled())
@@ -744,7 +747,7 @@ public class GridDhtPartitionDemander {
     /**
      *
      */
-    public static class SyncFuture extends GridFutureAdapter<Object> {
+    public static class SyncFuture extends GridFutureAdapter<Boolean> {
         /** */
         private static final long serialVersionUID = 1L;
 
@@ -757,8 +760,8 @@ public class GridDhtPartitionDemander {
         /** */
         private final IgniteLogger log;
 
-        /** Remaining. */
-        private final Map<UUID, IgniteBiTuple<Long, Collection<Integer>>>
remaining = new HashMap<>();
+        /** Remaining. T3: startTime, partitions, updateSequence */
+        private final Map<UUID, T3<Long, Collection<Integer>, Long>> remaining
= new HashMap<>();
 
         /** Missed. */
         private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
@@ -819,6 +822,17 @@ public class GridDhtPartitionDemander {
         }
 
         /**
+         * @param nodeId Node id.
+         * @param updateSeq Update sequence.
+         * @return true in case future created for specified updateSeq, false in other case.
+         */
+        private boolean isActual(UUID nodeId, long updateSeq) {
+            T3<Long, Collection<Integer>, Long> t = remaining.get(nodeId);
+
+            return t != null ? t.get3().equals(updateSeq) : false;
+        }
+
+        /**
          * @return Is dummy (created at demander creation).
          */
         private boolean isDummy() {
@@ -829,11 +843,11 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          * @param parts Parts.
          */
-        private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
+        private void appendPartitions(UUID nodeId, Collection<Integer> parts, long
updateSeq) {
             lock.lock();
 
             try {
-                remaining.put(nodeId, new IgniteBiTuple<>(U.currentTimeMillis(), parts));
+                remaining.put(nodeId, new T3<>(U.currentTimeMillis(), parts, updateSeq));
             }
             finally {
                 lock.unlock();
@@ -1014,12 +1028,16 @@ public class GridDhtPartitionDemander {
                         U.log(log, ("Reassigning partitions that were missed: " + m));
 
                         cctx.shared().exchange().forceDummyExchange(true, exchFut);
+
+                        onDone(false); //Finished but has missed partitions and forced dummy
exchange
+
+                        return;
                     }
 
                     cctx.shared().exchange().scheduleResendPartitions();
                 }
 
-                onDone();
+                onDone(true);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7e91796/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index d33dc5a..a4bd134 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -131,7 +131,12 @@ class GridDhtPartitionSupplier {
     private static void clearContexts(
         ConcurrentHashMap8<T4, SupplyContext> map, IgniteLogger log, GridCacheContext<?,
?> cctx) {
         for (Map.Entry<T4, SupplyContext> entry : map.entrySet()) {
-            clearContext(map, entry.getKey(), entry.getValue(), log, cctx);
+            T4 t = entry.getKey();
+
+            SupplyContext sc = entry.getValue();
+
+            if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion())
&& sc != null)
+                clearContext(map, t, sc, log);
         }
     }
 
@@ -139,34 +144,31 @@ class GridDhtPartitionSupplier {
      * Clear context.
      *
      * @param map Context map.
+     * @param t id.
+     * @param sc Supply context.
      * @param log Logger.
+     * @return true in case context was removed.
      */
     private static boolean clearContext(
-        ConcurrentHashMap8<T4, SupplyContext> map,
-        T4 t,
-        SupplyContext sc,
-        IgniteLogger log,
-        GridCacheContext<?, ?> cctx) {
-
-        if (!t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc !=
null) {
-            Iterator it = sc.entryIt;
-
-            if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed())
{
-                try {
-                    synchronized (map) {
-                        if (!((GridCloseableIterator)it).isClosed())
-                            ((GridCloseableIterator)it).close();
-                    }
-                }
-                catch (IgniteCheckedException e) {
-                    log.error("Iterator close failed.", e);
+        final ConcurrentHashMap8<T4, SupplyContext> map,
+        final T4 t,
+        final SupplyContext sc,
+        final IgniteLogger log) {
+        final Iterator it = sc.entryIt;
+
+        if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed())
{
+            try {
+                synchronized (it) {
+                    if (!((GridCloseableIterator)it).isClosed())
+                        ((GridCloseableIterator)it).close();
                 }
             }
-
-            return map.remove(t, sc);
+            catch (IgniteCheckedException e) {
+                log.error("Iterator close failed.", e);
+            }
         }
 
-        return false;
+        return map.remove(t, sc);
     }
 
     /**
@@ -528,6 +530,9 @@ class GridDhtPartitionSupplier {
             if (log.isDebugEnabled())
                 log.debug("Replying to partition demand [node=" + n.id() + ", demand=" +
d + ", supply=" + s + ']');
 
+            if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+                return true;
+
             cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
 
             // Throttle preloading.


Mime
View raw message