ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/4] ignite git commit: Release notes updated
Date Tue, 15 Sep 2015 16:59:57 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 a5fc6f35b -> fdfa62f0f


Release notes updated


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e52367bc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e52367bc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e52367bc

Branch: refs/heads/ignite-1093-2
Commit: e52367bc0a57028d86f356101cfa46cd70e35e12
Parents: a5fc6f3
Author: Anton Vinogradov <av@apache.org>
Authored: Tue Sep 15 13:16:34 2015 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Tue Sep 15 13:16:34 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 166 +++++++++----------
 1 file changed, 79 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e52367bc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 2e54294..31e2e5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -68,11 +68,11 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
@@ -141,7 +141,7 @@ public class GridDhtPartitionDemander {
      *
      */
     void stop() {
-        syncFut.onCancel();
+        syncFut.cancel();
 
         lastExchangeFut = null;
 
@@ -222,7 +222,7 @@ public class GridDhtPartitionDemander {
             if (!topologyChanged(fut.assigns.topologyVersion()))
                 wFut.get();
             else {
-                fut.onCancel();
+                fut.cancel();
 
                 return;
             }
@@ -231,13 +231,13 @@ public class GridDhtPartitionDemander {
             if (log.isDebugEnabled()) {
                 log.debug("Failed to wait for " + name + " cache rebalancing future (grid
is stopping): " +
                     "[cacheName=" + cctx.name() + ']');
-                fut.onCancel();
+                fut.cancel();
 
                 return;
             }
         }
         catch (IgniteCheckedException e) {
-            fut.onCancel();
+            fut.cancel();
 
             throw new Error("Ordered rebalancing future should never fail: " + e.getMessage(),
e);
         }
@@ -264,7 +264,7 @@ public class GridDhtPartitionDemander {
 
             if (fut.isInited()) {
                 if (!fut.isDone())
-                    fut.onCancel();
+                    fut.cancel();
 
                 fut = new SyncFuture(assigns, cctx, log, false);
 
@@ -284,7 +284,7 @@ public class GridDhtPartitionDemander {
             }
 
             if (topologyChanged(topVer)) {
-                fut.onCancel();
+                fut.cancel();
 
                 return;
             }
@@ -315,7 +315,7 @@ public class GridDhtPartitionDemander {
                                 if (!topologyChanged(topVer))
                                     oFut.get();
                                 else {
-                                    curFut.onCancel();
+                                    curFut.cancel();
 
                                     return;
                                 }
@@ -325,13 +325,13 @@ public class GridDhtPartitionDemander {
                             if (log.isDebugEnabled()) {
                                 log.debug("Failed to wait for ordered rebalance future (grid
is stopping): " +
                                     "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder
+ ']');
-                                curFut.onCancel();
+                                curFut.cancel();
 
                                 return;
                             }
                         }
                         catch (IgniteCheckedException e) {
-                            curFut.onCancel();
+                            curFut.cancel();
 
                             throw new Error("Ordered rebalance future should never fail:
" + e.getMessage(), e);
                         }
@@ -379,7 +379,7 @@ public class GridDhtPartitionDemander {
 
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet())
{
             if (topologyChanged(topVer)) {
-                fut.onCancel();
+                fut.cancel();
 
                 return;
             }
@@ -393,10 +393,6 @@ public class GridDhtPartitionDemander {
 
             final CacheConfiguration cfg = cctx.config();
 
-            final long start = U.currentTimeMillis();
-
-            fut.logStart(node.id(), start);
-
             U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode()
+
                 ", from node=" + node.id() + ", partitions count=" + d.partitions().size()
+ ", topology=" + d.topologyVersion() + "]");
 
@@ -406,7 +402,7 @@ public class GridDhtPartitionDemander {
 
                 remainings.addAll(d.partitions());
 
-                fut.append(node.id(), remainings);
+                fut.appendPartitions(node.id(), remainings);
 
                 int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
@@ -434,10 +430,10 @@ public class GridDhtPartitionDemander {
                             if (!topologyChanged(topVer))
                                 cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt),
initD, cctx.ioPolicy(), d.timeout());
                             else
-                                fut.onCancel();
+                                fut.cancel();
                         }
                         catch (IgniteCheckedException ex) {
-                            fut.onCancel();
+                            fut.cancel();
 
                             U.error(log, "Failed to send partition demand message to node",
ex);
                         }
@@ -450,7 +446,7 @@ public class GridDhtPartitionDemander {
             else {
                 DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
 
-                fut.append(node.id(), d.partitions());
+                fut.appendPartitions(node.id(), d.partitions());
 
                 dw.run(node, d);
             }
@@ -522,7 +518,7 @@ public class GridDhtPartitionDemander {
             return;
 
         if (topologyChanged(topVer)) {
-            fut.onCancel();
+            fut.cancel();
 
             return;
         }
@@ -539,7 +535,7 @@ public class GridDhtPartitionDemander {
             if (log.isDebugEnabled())
                 log.debug("Class got undeployed during preloading: " + supply.classError());
 
-            fut.onCancel(id);
+            fut.cancel(id);
 
             return;
         }
@@ -550,7 +546,7 @@ public class GridDhtPartitionDemander {
             // Preload.
             for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet())
{
                 if (topologyChanged(topVer)) {
-                    fut.onCancel();
+                    fut.cancel();
 
                     return;
                 }
@@ -597,7 +593,7 @@ public class GridDhtPartitionDemander {
                             if (last) {
                                 top.own(part);
 
-                                fut.onPartitionDone(id, p);
+                                fut.partitionDone(id, p);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Finished rebalancing partition: " + part);
@@ -609,14 +605,14 @@ public class GridDhtPartitionDemander {
                         }
                     }
                     else {
-                        fut.onPartitionDone(id, p);
+                        fut.partitionDone(id, p);
 
                         if (log.isDebugEnabled())
                             log.debug("Skipping rebalancing partition (state is not MOVING):
" + part);
                     }
                 }
                 else {
-                    fut.onPartitionDone(id, p);
+                    fut.partitionDone(id, p);
 
                     if (log.isDebugEnabled())
                         log.debug("Skipping rebalancing partition (it does not belong on
current node): " + p);
@@ -626,10 +622,10 @@ public class GridDhtPartitionDemander {
             // Only request partitions based on latest topology version.
             for (Integer miss : supply.missed())
                 if (cctx.affinity().localNode(miss, topVer))
-                    fut.onMissedPartition(id, miss);
+                    fut.partitionMissed(id, miss);
 
             for (Integer miss : supply.missed())
-                fut.onPartitionDone(id, miss);
+                fut.partitionDone(id, miss);
 
             if (!fut.isDone()) {
                 GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
@@ -647,7 +643,7 @@ public class GridDhtPartitionDemander {
                             nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
                     }
                     else
-                        fut.onCancel();
+                        fut.cancel();
                 }
             }
         }
@@ -655,13 +651,13 @@ public class GridDhtPartitionDemander {
             if (log.isDebugEnabled())
                 log.debug("Node left during rebalancing [node=" + node.id() +
                     ", msg=" + e.getMessage() + ']');
-            fut.onCancel();
+            fut.cancel();
         }
         catch (IgniteCheckedException ex) {
             U.error(log, "Failed to receive partitions from node (rebalancing will not "
+
                 "fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex);
 
-            fut.onCancel(node.id());
+            fut.cancel(node.id());
         }
     }
 
@@ -767,6 +763,9 @@ public class GridDhtPartitionDemander {
         /** */
         private static final long serialVersionUID = 1L;
 
+        /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
+        private final boolean sendStopEvnt;
+
         /** */
         private final GridCacheContext<?, ?> cctx;
 
@@ -774,13 +773,10 @@ public class GridDhtPartitionDemander {
         private final IgniteLogger log;
 
         /** Remaining. */
-        private final ConcurrentHashMap8<UUID, Collection<Integer>> remaining
= new ConcurrentHashMap8<>();
+        private final Map<UUID, IgniteBiTuple<Long, Collection<Integer>>>
remaining = new HashMap<>();
 
         /** Missed. */
-        private final ConcurrentHashMap8<UUID, Collection<Integer>> missed =
new ConcurrentHashMap8<>();
-
-        /** Started time. */
-        private final ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>();
+        private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
 
         /** Lock. */
         private final Lock lock = new ReentrantLock();
@@ -791,12 +787,12 @@ public class GridDhtPartitionDemander {
         /** Completed. */
         private volatile boolean completed = true;
 
-        private final boolean sendStopEvnt;
-
         /**
          * @param assigns Assigns.
          */
-        SyncFuture(GridDhtPreloaderAssignments assigns, GridCacheContext<?, ?> cctx,
IgniteLogger log,
+        SyncFuture(GridDhtPreloaderAssignments assigns,
+            GridCacheContext<?, ?> cctx,
+            IgniteLogger log,
             boolean sentStopEvnt) {
             this.assigns = assigns;
             this.cctx = cctx;
@@ -814,13 +810,13 @@ public class GridDhtPartitionDemander {
         /**
          * @param assigns Assigns.
          */
-        void init(GridDhtPreloaderAssignments assigns) {
+        private void init(GridDhtPreloaderAssignments assigns) {
             this.assigns = assigns;
 
             cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() +
1).listen(
                 new CI1<IgniteInternalFuture<Long>>() {
                     @Override public void apply(IgniteInternalFuture<Long> future)
{
-                        SyncFuture.this.onCancel();
+                        SyncFuture.this.cancel();
                     }
                 });
         }
@@ -828,7 +824,7 @@ public class GridDhtPartitionDemander {
         /**
          * @return Initialised or not.
          */
-        boolean isInited() {
+        private boolean isInited() {
             return assigns != null;
         }
 
@@ -836,24 +832,21 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          * @param parts Parts.
          */
-        void append(UUID nodeId, Collection<Integer> parts) {
-            remaining.put(nodeId, parts);
-
-            missed.put(nodeId, new GridConcurrentHashSet<Integer>());
-        }
+        private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
+            lock.lock();
 
-        /**
-         * @param nodeId Node id.
-         * @param time Time.
-         */
-        void logStart(UUID nodeId, long time) {
-            started.put(nodeId, time);
+            try {
+                remaining.put(nodeId, new IgniteBiTuple<>(System.currentTimeMillis(),
parts));
+            }
+            finally {
+                lock.unlock();
+            }
         }
 
         /**
          * @param node Node.
          */
-        GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
+        private GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
             if (isDone())
                 return null;
 
@@ -861,47 +854,51 @@ public class GridDhtPartitionDemander {
         }
 
         /**
+         * Cancels this future.
          *
+         * @return {@code true}.
          */
-        void onCancel() {
+        @Override public boolean cancel() {
             lock.lock();
+
             try {
                 if (isDone())
-                    return;
+                    return true;
 
                 remaining.clear();
 
                 completed = false;
 
-                U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing from
all nodes [cache=" + cctx.name()
-                    + ", topology=" + topologyVersion() +
-                    ", time=" +
-                    (started.isEmpty() ? 0 : (U.currentTimeMillis() - Collections.min(started.values())))
+ " ms]");
+                U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
+                    + ", topology=" + topologyVersion());
 
                 checkIsDone();
             }
             finally {
                 lock.unlock();
             }
+
+            return true;
         }
 
         /**
          * @param nodeId Node id.
          */
-        void onCancel(UUID nodeId) {
+        private void cancel(UUID nodeId) {
             lock.lock();
+
             try {
                 if (isDone())
                     return;
 
+                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+                    ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                    ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) +
" ms]"));
+
                 remaining.remove(nodeId);
 
                 completed = false;
 
-                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
-                    ", from node=" + nodeId + ", topology=" + topologyVersion() +
-                    ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
-
                 checkIsDone();
             }
             finally {
@@ -911,18 +908,12 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         * @return Is completed.
-         */
-        boolean isCompleted() {
-            return completed;
-        }
-
-        /**
          * @param nodeId Node id.
          * @param p P.
          */
-        void onMissedPartition(UUID nodeId, int p) {
+        private void partitionMissed(UUID nodeId, int p) {
             lock.lock();
+
             try {
                 if (isDone())
                     return;
@@ -941,8 +932,9 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          * @param p P.
          */
-        void onPartitionDone(UUID nodeId, int p) {
+        private void partitionDone(UUID nodeId, int p) {
             lock.lock();
+
             try {
                 if (isDone())
                     return;
@@ -951,17 +943,17 @@ public class GridDhtPartitionDemander {
                     preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
                         assigns.exchangeFuture().discoveryEvent());
 
-                Collection<Integer> parts = remaining.get(nodeId);
+                Collection<Integer> parts = remaining.get(nodeId).get2();
 
                 if (parts != null) {
                     parts.remove(p);
 
                     if (parts.isEmpty()) {
-                        remaining.remove(nodeId);
-
                         U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
                             ", from node=" + nodeId + ", topology=" + topologyVersion() +
-                            ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + "
ms]"));
+                            ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1())
+ " ms]"));
+
+                        remaining.remove(nodeId);
                     }
                 }
 
@@ -994,7 +986,7 @@ public class GridDhtPartitionDemander {
         /**
          *
          */
-        public void checkIsDone() {
+        private void checkIsDone() {
             if (remaining.isEmpty()) {
                 if (log.isDebugEnabled())
                     log.debug("Completed sync future.");
@@ -1301,7 +1293,7 @@ public class GridDhtPartitionDemander {
                                         // then we take ownership.
                                         if (last) {
                                             remaining.remove(p);
-                                            fut.onPartitionDone(node.id(), p);
+                                            fut.partitionDone(node.id(), p);
 
                                             top.own(part);
 
@@ -1320,7 +1312,7 @@ public class GridDhtPartitionDemander {
                                 }
                                 else {
                                     remaining.remove(p);
-                                    fut.onPartitionDone(node.id(), p);
+                                    fut.partitionDone(node.id(), p);
 
                                     if (log.isDebugEnabled())
                                         log.debug("Skipping rebalancing partition (state
is not MOVING): " + part);
@@ -1328,7 +1320,7 @@ public class GridDhtPartitionDemander {
                             }
                             else {
                                 remaining.remove(p);
-                                fut.onPartitionDone(node.id(), p);
+                                fut.partitionDone(node.id(), p);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Skipping rebalancing partition (it does not
belong on current node): " + p);
@@ -1342,7 +1334,7 @@ public class GridDhtPartitionDemander {
                             if (cctx.affinity().localNode(miss, topVer))
                                 missed.add(miss);
 
-                            fut.onMissedPartition(node.id(), miss);
+                            fut.partitionMissed(node.id(), miss);
                         }
 
                         if (remaining.isEmpty())
@@ -1379,7 +1371,7 @@ public class GridDhtPartitionDemander {
                 Collection<Integer> missed = new HashSet<>();
 
                 if (topologyChanged(topVer)) {
-                    fut.onCancel();
+                    fut.cancel();
 
                     return;
                 }
@@ -1400,16 +1392,16 @@ public class GridDhtPartitionDemander {
                         log.debug("Node left during rebalancing (will retry) [node=" + node.id()
+
                             ", msg=" + e.getMessage() + ']');
 
-                    fut.onCancel();
+                    fut.cancel();
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to receive partitions from node (rebalancing will
not " +
                         "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
 
-                    fut.onCancel(node.id());
+                    fut.cancel(node.id());
                 }
                 catch (InterruptedException e) {
-                    fut.onCancel();
+                    fut.cancel();
                 }
             }
             finally {


Mime
View raw message