ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject ignite git commit: IGNITE-8482 Skip 2-phase partition release wait in case of activation or dynamic caches start - Fixes #4078.
Date Wed, 06 Jun 2018 12:51:25 GMT
Repository: ignite
Updated Branches:
  refs/heads/master ae7357ba2 -> 7c565d2eb


IGNITE-8482 Skip 2-phase partition release wait in case of activation or dynamic caches start
- Fixes #4078.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7c565d2e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7c565d2e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7c565d2e

Branch: refs/heads/master
Commit: 7c565d2ebb899e41e91df6451f09c03496071397
Parents: ae7357b
Author: Pavel Kovalenko <jokserfn@gmail.com>
Authored: Wed Jun 6 15:24:39 2018 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Wed Jun 6 15:35:16 2018 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        | 32 +++++++++++++++-----
 .../preloader/latch/ExchangeLatchManager.java   | 18 ++++++-----
 2 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7c565d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c391403..2973194 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -524,6 +524,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @return {@code True} if there are caches to start.
+     */
+    public boolean hasCachesToStart() {
+        return exchActions != null && !exchActions.cacheStartRequests().isEmpty();
+    }
+
+    /**
      * @return First event discovery event.
      *
      */
@@ -1156,11 +1163,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         // To correctly rebalance when persistence is enabled, it is necessary to reserve
history within exchange.
         partHistReserved = cctx.database().reserveHistoryForExchange();
 
-        // On first phase we wait for finishing all local tx updates, atomic updates and
lock releases.
-        waitPartitionRelease(1);
+        boolean distributed = true;
+
+        // Do not perform distributed partition release in case of cluster activation or
caches start.
+        if (activateCluster() || hasCachesToStart())
+            distributed = false;
+
+        // On first phase we wait for finishing all local tx updates, atomic updates and
lock releases on all nodes.
+        waitPartitionRelease(distributed);
 
         // Second phase is needed to wait for finishing all tx updates from primary to backup
nodes remaining after first phase.
-        waitPartitionRelease(2);
+        waitPartitionRelease(false);
 
         boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg
!= null;
 
@@ -1265,15 +1278,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * For the exact list of the objects being awaited for see
      * {@link GridCacheSharedContext#partitionReleaseFuture(AffinityTopologyVersion)} javadoc.
      *
-     * @param phase Phase of partition release.
+     * @param distributed If {@code true} then node should wait for partition release completion
on all other nodes.
      *
      * @throws IgniteCheckedException If failed.
      */
-    private void waitPartitionRelease(int phase) throws IgniteCheckedException {
+    private void waitPartitionRelease(boolean distributed) throws IgniteCheckedException
{
         Latch releaseLatch = null;
 
-        // Wait for other nodes only on first phase.
-        if (phase == 1)
+        if (distributed)
             releaseLatch = cctx.exchange().latch().getOrCreate("exchange", initialVersion());
 
         IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(initialVersion());
@@ -2621,7 +2633,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            validatePartitionsState();
+            // Don't validate partitions state in case of caches start.
+            boolean skipValidation = hasCachesToStart();
+
+            if (!skipValidation)
+                validatePartitionsState();
 
             if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                 assert firstDiscoEvt instanceof DiscoveryCustomEvent;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c565d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index d44d165..ab5819d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -473,11 +473,12 @@ public class ExchangeLatchManager {
                             io.sendToGridTopic(node, GridTopic.TOPIC_EXCHANGE, new LatchAckMessage(id,
topVer, true), GridIoPolicy.SYSTEM_POOL);
 
                             if (log.isDebugEnabled())
-                                log.debug("Final ack is ackSent [latch=" + latchId() + ",
to=" + node.id() + "]");
+                                log.debug("Final ack has sent [latch=" + latchId() + ", to="
+ node.id() + "]");
                         }
-                    } catch (IgniteCheckedException e) {
+                    }
+                    catch (IgniteCheckedException e) {
                         if (log.isDebugEnabled())
-                            log.debug("Unable to send final ack [latch=" + latchId() + ",
to=" + node.id() + "]");
+                            log.debug("Failed to send final ack [latch=" + latchId() + ",
to=" + node.id() + "]: " + e.getMessage());
                     }
                 }
             });
@@ -521,7 +522,7 @@ public class ExchangeLatchManager {
             int remaining = permits.decrementAndGet();
 
             if (log.isDebugEnabled())
-                log.debug("Count down + [latch=" + latchId() + ", remaining=" + remaining
+ "]");
+                log.debug("Count down [latch=" + latchId() + ", remaining=" + remaining +
"]");
 
             if (remaining == 0)
                 complete();
@@ -584,7 +585,7 @@ public class ExchangeLatchManager {
          */
         private void newCoordinator(ClusterNode coordinator) {
             if (log.isDebugEnabled())
-                log.debug("Coordinator is changed [latch=" + latchId() + ", crd=" + coordinator.id()
+ "]");
+                log.debug("Coordinator is changed [latch=" + latchId() + ", newCrd=" + coordinator.id()
+ "]");
 
             synchronized (this) {
                 this.coordinator = coordinator;
@@ -606,11 +607,12 @@ public class ExchangeLatchManager {
                 io.sendToGridTopic(coordinator, GridTopic.TOPIC_EXCHANGE, new LatchAckMessage(id,
topVer, false), GridIoPolicy.SYSTEM_POOL);
 
                 if (log.isDebugEnabled())
-                    log.debug("Ack is ackSent + [latch=" + latchId() + ", to=" + coordinator.id()
+ "]");
-            } catch (IgniteCheckedException e) {
+                    log.debug("Ack has sent [latch=" + latchId() + ", to=" + coordinator.id()
+ "]");
+            }
+            catch (IgniteCheckedException e) {
                 // Coordinator is unreachable. On coodinator node left discovery event ack
will be resent.
                 if (log.isDebugEnabled())
-                    log.debug("Unable to send ack [latch=" + latchId() + ", to=" + coordinator.id()
+ "]: " + e.getMessage());
+                    log.debug("Failed to send ack [latch=" + latchId() + ", to=" + coordinator.id()
+ "]: " + e.getMessage());
             }
         }
 


Mime
View raw message