ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject ignite git commit: IGNITE-5232 GridDhtPartitionDemander.requestPartitions invokes sendMessages consequently, which lead to significant increase of node start time on large clusters with ssl
Date Wed, 31 May 2017 10:50:40 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 8476a1958 -> 4c460b78f


IGNITE-5232 GridDhtPartitionDemander.requestPartitions invokes sendMessages consequently,
which lead to significant increase of node start time on large clusters with ssl


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c460b78
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c460b78
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c460b78

Branch: refs/heads/master
Commit: 4c460b78f7b0febc37940c8d65f91cb449fa4d54
Parents: 8476a19
Author: Anton Vinogradov <av@apache.org>
Authored: Thu May 25 16:27:46 2017 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Wed May 31 13:50:29 2017 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 103 ++++++++++---------
 1 file changed, 52 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4c460b78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 838ccc8..cdbae1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -328,41 +328,21 @@ public class GridDhtPartitionDemander {
 
             return new Runnable() {
                 @Override public void run() {
-                    try {
-                        if (next != null)
-                            fut.listen(new CI1<IgniteInternalFuture<Boolean>>()
{
-                                @Override public void apply(IgniteInternalFuture<Boolean>
f) {
-                                    try {
-                                        if (f.get()) // Not cancelled.
-                                            next.run(); // Starts next cache rebalancing
(according to the order).
-                                    }
-                                    catch (IgniteCheckedException e) {
-                                        if (log.isDebugEnabled())
-                                            log.debug(e.getMessage());
-                                    }
+                    if (next != null)
+                        fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                            @Override public void apply(IgniteInternalFuture<Boolean>
f) {
+                                try {
+                                    if (f.get()) // Not cancelled.
+                                        next.run(); // Starts next cache rebalancing (according
to the order).
                                 }
-                            });
-
-                        requestPartitions(fut, assigns);
-                    }
-                    catch (IgniteCheckedException e) {
-                        ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class);
-
-                        if (cause != null)
-                            log.warning("Failed to send initial demand request to node. "
+ e.getMessage());
-                        else
-                            log.error("Failed to send initial demand request to node.", e);
-
-                        fut.cancel();
-                    }
-                    catch (Throwable th) {
-                        log.error("Runtime error caught during initial demand request sending.",
th);
-
-                        fut.cancel();
+                                catch (IgniteCheckedException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug(e.getMessage());
+                                }
+                            }
+                        });
 
-                        if (th instanceof Error)
-                            throw th;
-                    }
+                    requestPartitions(fut, assigns);
                 }
             };
         }
@@ -399,10 +379,7 @@ public class GridDhtPartitionDemander {
      * @param assigns Assignments.
      * @throws IgniteCheckedException If failed.
      */
-    private void requestPartitions(
-        RebalanceFuture fut,
-        GridDhtPreloaderAssignments assigns
-    ) throws IgniteCheckedException {
+    private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssignments
assigns) {
         assert fut != null;
 
         if (topologyChanged(fut)) {
@@ -411,7 +388,7 @@ public class GridDhtPartitionDemander {
             return;
         }
 
-        synchronized (fut) {
+        synchronized (fut) { // Synchronized to prevent consistency issues in case of parallel
cancellation.
             if (fut.isDone())
                 return;
 
@@ -443,7 +420,7 @@ public class GridDhtPartitionDemander {
 
             int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
-            List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+            final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
             for (int cnt = 0; cnt < lsnrCnt; cnt++)
                 sParts.add(new HashSet<Integer>());
@@ -458,26 +435,50 @@ public class GridDhtPartitionDemander {
             for (cnt = 0; cnt < lsnrCnt; cnt++) {
                 if (!sParts.get(cnt).isEmpty()) {
                     // Create copy.
-                    GridDhtPartitionDemandMessage initD = createDemandMessage(d, sParts.get(cnt));
+                    final GridDhtPartitionDemandMessage initD = createDemandMessage(d, sParts.get(cnt));
 
                     initD.topic(rebalanceTopics.get(cnt));
                     initD.updateSequence(fut.updateSeq);
                     initD.timeout(cctx.config().getRebalanceTimeout());
 
-                    synchronized (fut) {
-                        if (fut.isDone())
-                            return;// Future can be already cancelled at this moment and
all failovers happened.
+                    final int finalCnt = cnt;
 
-                        // New requests will not be covered by failovers.
-                        cctx.io().sendOrderedMessage(node,
-                            rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
-                    }
+                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                        @Override public void run() {
+                            try {
+                                if (!fut.isDone()) {
+                                    cctx.io().sendOrderedMessage(node,
+                                        rebalanceTopics.get(finalCnt), initD, cctx.ioPolicy(),
initD.timeout());
+
+                                    // Cleanup required in case partitions demanded in parallel
with cancellation.
+                                    synchronized (fut) {
+                                        if (fut.isDone())
+                                            fut.cleanupRemoteContexts(node.id());
+                                    }
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Requested rebalancing [from node=" + node.id()
+ ", listener index=" +
+                                            finalCnt + ", partitions count=" + sParts.get(finalCnt).size()
+
+                                            " (" + partitionsList(sParts.get(finalCnt)) +
")]");
+                                }
+                            }
+                            catch (IgniteCheckedException e) {
+                                ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class);
 
+                                if (cause != null)
+                                    log.warning("Failed to send initial demand request to
node. " + e.getMessage());
+                                else
+                                    log.error("Failed to send initial demand request to node.",
e);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Requested rebalancing [from node=" + node.id() + ", listener
index=" +
-                            cnt + ", partitions count=" + sParts.get(cnt).size() +
-                            " (" + partitionsList(sParts.get(cnt)) + ")]");
+                                fut.cancel();
+                            }
+                            catch (Throwable th) {
+                                log.error("Runtime error caught during initial demand request
sending.", th);
+
+                                fut.cancel();
+                            }
+                        }
+                    }, /*system pool*/true);
                 }
             }
         }


Mime
View raw message