ignite-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [ignite] Mmuzaf commented on a change in pull request #6688: IGNITE-3195 Rebalancing: IgniteConfiguration.rebalanceThreadPoolSize is wrongly treated
Date Mon, 22 Jul 2019 09:55:42 GMT
Mmuzaf commented on a change in pull request #6688: IGNITE-3195 Rebalancing: IgniteConfiguration.rebalanceThreadPoolSize
is wrongly treated
URL: https://github.com/apache/ignite/pull/6688#discussion_r305762930
 
 

 ##########
 File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 ##########
 @@ -459,79 +449,59 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign
 
                 U.log(log, "Prepared rebalancing [grp=" + grp.cacheOrGroupName()
                     + ", mode=" + cfg.getRebalanceMode() + ", supplier=" + node.id() + ",
partitionsCount=" + parts.size()
-                    + ", topVer=" + fut.topologyVersion() + ", localParallelism=" + locStripes
-                    + ", rmtParallelism=" + rmtStripes + ", parallelism=" + rmtTotalStripes
+ "]");
+                    + ", topVer=" + fut.topologyVersion() + "]");
             }
 
-            final List<IgniteDhtDemandedPartitionsMap> stripePartitions = new ArrayList<>(stripes);
-            for (int i = 0; i < stripes; i++)
-                stripePartitions.add(new IgniteDhtDemandedPartitionsMap());
+            if (!parts.isEmpty()) {
+                // Create copy of demand message with new striped partitions map.
+                final GridDhtPartitionDemandMessage demandMsg = d.withNewPartitionsMap(parts);
 
-            // Reserve one stripe for historical partitions.
-            if (parts.hasHistorical()) {
-                stripePartitions.set(stripes - 1, new IgniteDhtDemandedPartitionsMap(parts.historicalMap(),
null));
-
-                if (stripes > 1)
-                    stripes--;
-            }
+                final int topicId = 0;
 
-            // Distribute full partitions across other stripes.
-            Iterator<Integer> it = parts.fullSet().iterator();
-            for (int i = 0; it.hasNext(); i++)
-                stripePartitions.get(i % stripes).addFull(it.next());
+                demandMsg.topic(rebalanceTopic);
+                demandMsg.rebalanceId(fut.rebalanceId);
+                demandMsg.timeout(grp.preloader().timeout());
 
-            for (int stripe = 0; stripe < rmtTotalStripes; stripe++) {
-                if (!stripePartitions.get(stripe).isEmpty()) {
-                    // Create copy of demand message with new striped partitions map.
-                    final GridDhtPartitionDemandMessage demandMsg = d.withNewPartitionsMap(stripePartitions.get(stripe));
-
-                    demandMsg.topic(rebalanceTopics.get(stripe));
-                    demandMsg.rebalanceId(fut.rebalanceId);
-                    demandMsg.timeout(grp.preloader().timeout());
-
-                    final int topicId = stripe;
-
-                    IgniteInternalFuture<?> clearAllFuture = clearFullPartitions(fut,
demandMsg.partitions().fullSet());
-
-                    // Start rebalancing after clearing full partitions is finished.
-                    clearAllFuture.listen(f -> ctx.kernalContext().closure().runLocalSafe(()
-> {
-                        if (fut.isDone())
-                            return;
-
-                        try {
-                            ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId),
-                                demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(),
demandMsg.timeout());
+                IgniteInternalFuture<?> clearAllFuture = clearFullPartitions(fut, demandMsg.partitions().fullSet());
 
-                            // Cleanup required in case partitions demanded in parallel with
cancellation.
-                            synchronized (fut) {
-                                if (fut.isDone())
-                                    fut.cleanupRemoteContexts(node.id());
-                            }
+                // Start rebalancing after clearing full partitions is finished.
+                clearAllFuture.listen(f -> ctx.kernalContext().closure().runLocalSafe(()
-> {
+                    if (fut.isDone())
+                        return;
 
-                            if (log.isInfoEnabled())
-                                log.info("Started rebalance routine [" + grp.cacheOrGroupName()
+
-                                    ", topVer=" + fut.topologyVersion() +
-                                    ", supplier=" + node.id() + ", topic=" + topicId +
-                                    ", fullPartitions=" + S.compact(stripePartitions.get(topicId).fullSet())
+
-                                    ", histPartitions=" + S.compact(stripePartitions.get(topicId).historicalSet())
+ "]");
+                    try {
+                        if (log.isInfoEnabled())
+                            log.info("Starting rebalance routine [" + grp.cacheOrGroupName()
+
+                                ", topVer=" + fut.topologyVersion() +
+                                ", supplier=" + node.id() + ", topic=" + topicId +
+                                ", fullPartitions=" + S.compact(parts.fullSet()) +
+                                ", histPartitions=" + S.compact(parts.historicalSet()) +
"]");
+
+                        ctx.io().sendOrderedMessage(node, rebalanceTopic,
+                            demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout());
 
 Review comment:
   I think the newly created `GridIoPolicy.REBALANCE_POOL` should be used here, not the group
one. WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message