ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4154
Date Mon, 14 Nov 2016 10:44:39 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4154-opt2 c05e49bb7 -> 283b23237


ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/283b2323
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/283b2323
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/283b2323

Branch: refs/heads/ignite-4154-opt2
Commit: 283b2323767ec4ab3f2c6a84c6dac405b01922bb
Parents: c05e49b
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Nov 14 13:44:32 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Nov 14 13:44:32 2016 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  8 +++
 .../dht/GridDhtPartitionTopologyImpl.java       | 59 +++++++++++---------
 .../GridDhtPartitionsExchangeFuture.java        | 18 +++++-
 3 files changed, 57 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/283b2323/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 27c2bac..b50479d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -933,6 +933,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     private void fetchAffinityOnJoin(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException
{
+        long start = System.currentTimeMillis();
+
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
         List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>();
@@ -967,6 +969,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
         }
+
+        log.info("Affinity fetch time [topVer=" + topVer + ", time=" + (System.currentTimeMillis()
- start) + ']');
     }
 
     /**
@@ -1055,12 +1059,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private void initCachesAffinity(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException
{
         assert !lateAffAssign;
 
+        long start = System.currentTimeMillis();
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal())
                 continue;
 
             initAffinity(cacheCtx.affinity().affinityCache(), fut, false);
         }
+
+        log.info("Affinity init time [topVer=" + fut.topologyVersion() + ", time=" + (System.currentTimeMillis()
- start) + ']');
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/283b2323/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f3751ac..f50116d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -350,7 +350,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            initPartitions0(exchFut, updateSeq);
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+            initPartitions0(oldest, exchFut, updateSeq);
 
             consistencyCheck();
         }
@@ -363,11 +365,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
      * @param exchFut Exchange future.
      * @param updateSeq Update sequence.
      */
-    private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq)
{
+    private void initPartitions0(ClusterNode oldest, GridDhtPartitionsExchangeFuture exchFut,
long updateSeq) {
         ClusterNode loc = cctx.localNode();
 
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
         assert oldest != null || cctx.kernalContext().clientNode();
 
         GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
@@ -407,12 +407,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                         if (log.isDebugEnabled())
                             log.debug("Owned partition for oldest node: " + locPart);
 
-                        updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                        updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
                     }
                 }
             }
             else
-                createPartitions(aff, updateSeq);
+                createPartitions(oldest, aff, updateSeq);
         }
         else {
             // If preloader is disabled, then we simply clear out
@@ -429,7 +429,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                         if (state.active()) {
                             locPart.rent(false);
 
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
 
                             if (log.isDebugEnabled())
                                 log.debug("Evicting partition with rebalancing disabled "
+
@@ -443,7 +443,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
         }
 
         if (node2part != null && node2part.valid())
-            checkEvictions(updateSeq, aff);
+            checkEvictions(oldest, updateSeq, aff);
 
         updateRebalanceVersion(aff);
     }
@@ -452,7 +452,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
      * @param aff Affinity assignments.
      * @param updateSeq Update sequence.
      */
-    private void createPartitions(List<List<ClusterNode>> aff, long updateSeq)
{
+    private void createPartitions(ClusterNode oldest, List<List<ClusterNode>>
aff, long updateSeq) {
         ClusterNode loc = cctx.localNode();
 
         int num = cctx.affinity().partitions();
@@ -464,7 +464,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                     // will be created in MOVING state.
                     GridDhtLocalPartition locPart = createPartition(p);
 
-                    updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                    updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
                 }
             }
             // If this node's map is empty, we pre-create local partitions,
@@ -533,11 +533,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             }
 
             if (affReady)
-                initPartitions0(exchFut, updateSeq);
+                initPartitions0(oldest, exchFut, updateSeq);
             else {
                 List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
 
-                createPartitions(aff, updateSeq);
+                createPartitions(oldest, aff, updateSeq);
             }
 
             consistencyCheck();
@@ -584,6 +584,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
             for (int p = 0; p < num; p++) {
                 GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
 
@@ -610,7 +612,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                                 assert owned : "Failed to own partition [cacheName" + cctx.name()
+ ", locPart=" +
                                     locPart + ']';
 
-                                updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                                updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
 
                                 changed = true;
 
@@ -630,7 +632,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                                     locPart + ", owners = " + owners + ']');
                         }
                         else
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
                     }
                 }
                 else {
@@ -640,7 +642,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                         if (state == MOVING) {
                             locPart.rent(false);
 
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
 
                             changed = true;
 
@@ -1129,7 +1131,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer)
>= 0) {
                 List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 
-                changed = checkEvictions(updateSeq, aff);
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+                changed = checkEvictions(oldest, updateSeq, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1254,7 +1258,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer)
>= 0) {
                 List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 
-                changed |= checkEvictions(updateSeq, aff);
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+                changed |= checkEvictions(oldest, updateSeq, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1276,7 +1282,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
      * @param aff Affinity assignments.
      * @return Checks if any of the local partitions need to be evicted.
      */
-    private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff)
{
+    private boolean checkEvictions(ClusterNode oldest, long updateSeq, List<List<ClusterNode>>
aff) {
         boolean changed = false;
 
         UUID locId = cctx.nodeId();
@@ -1299,7 +1305,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                     if (nodeIds.containsAll(F.nodeIds(affNodes))) {
                         part.rent(false);
 
-                        updateLocal(part.id(), locId, part.state(), updateSeq);
+                        updateLocal(oldest, part.id(), locId, part.state(), updateSeq);
 
                         changed = true;
 
@@ -1324,7 +1330,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                                 if (locId.equals(n.id())) {
                                     part.rent(false);
 
-                                    updateLocal(part.id(), locId, part.state(), updateSeq);
+                                    updateLocal(oldest, part.id(), locId, part.state(), updateSeq);
 
                                     changed = true;
 
@@ -1353,12 +1359,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
      * @param updateSeq Update sequence.
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq)
{
+    private void updateLocal(ClusterNode oldest, int p, UUID nodeId, GridDhtPartitionState
state, long updateSeq) {
         assert nodeId.equals(cctx.nodeId());
 
-        // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
         assert oldest != null || cctx.kernalContext().clientNode();
 
         // If this node became the oldest node.
@@ -1453,7 +1456,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
         try {
             if (part.own()) {
-                updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+                updateLocal(oldest, part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
 
                 consistencyCheck();
 
@@ -1481,7 +1486,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
             long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get();
 
-            updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+            updateLocal(oldest, part.id(), cctx.localNodeId(), part.state(), seq);
 
             consistencyCheck();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/283b2323/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 9af7a7b..468e16e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -432,6 +432,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         assert !dummy && !forcePreload : this;
 
         try {
+            long initStart = System.currentTimeMillis();
+
             log.info("Start exchange init [topVer=" + topologyVersion() + ']');
 
             srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion()));
@@ -465,14 +467,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                         cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
                 }
 
+                long affStart = System.currentTimeMillis();
+
                 if (CU.clientNode(discoEvt.eventNode()))
                     exchange = onClientNodeEvent(crdNode);
                 else
                     exchange = onServerNodeEvent(crdNode);
+
+                log.info("Affinity call time [topVer=" + topologyVersion() + ", time=" +
(System.currentTimeMillis() - affStart) + ']');
             }
 
+            long topUpdateStart = System.currentTimeMillis();
+
             updateTopologies(crdNode);
 
+            log.info("Top update time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis()
- topUpdateStart) + ']');
+
             switch (exchange) {
                 case ALL: {
                     distributedExchange();
@@ -501,7 +511,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             }
 
-            log.info("Finish exchange init [topVer=" + topologyVersion() + ']');
+            log.info("Finish exchange init [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis()
- initStart) + ']');
         }
         catch (IgniteInterruptedCheckedException e) {
             onDone(e);
@@ -725,6 +735,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg
!= null;
 
+        long beforeExchStart = System.currentTimeMillis();
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
                 continue;
@@ -739,12 +751,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             cacheCtx.topology().beforeExchange(this, !centralizedAff);
         }
 
+        log.info("Before exchange time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis()
- beforeExchStart) + ']');
+
         if (crd.isLocal()) {
             ClusterNode node = discoEvt.eventNode();
 
             Object attr = node.attribute("SKIP_FIRST_EXCHANGE_MSG");
 
-            boolean skipFirstExchange = Boolean.TRUE.equals(attr) || "true".equals(attr);
+            boolean skipFirstExchange = Boolean.TRUE.equals(attr) || ((attr instanceof String)
&& "true".equalsIgnoreCase((String)attr));
 
             if (discoEvt.type() == EVT_NODE_JOINED && !node.isLocal() &&
skipFirstExchange) {
                 assert !CU.clientNode(node) : discoEvt;


Mime
View raw message