ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4296
Date Fri, 02 Dec 2016 08:46:40 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4296 eb57fe16d -> 2d523f841


ignite-4296


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d523f84
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d523f84
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d523f84

Branch: refs/heads/ignite-4296
Commit: 2d523f84136aba8d2b671b1eace389032d6eef25
Parents: eb57fe1
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Dec 2 11:46:35 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Dec 2 11:46:35 2016 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       | 118 +++++++++----------
 1 file changed, 57 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2d523f84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 15c41e8..1b4dcc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -340,7 +340,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            initPartitions0(currentCoordinator(), exchFut, updateSeq);
+            initPartitions0(exchFut, updateSeq);
 
             consistencyCheck();
         }
@@ -350,23 +350,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     }
 
     /**
-     * @return Coordinator node ID.
-     */
-    private UUID currentCoordinator() {
-        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
-
-        return oldest != null ? oldest.id() : null;
-    }
-
-    /**
-     * @param oldest Oldest server node.
      * @param exchFut Exchange future.
      * @param updateSeq Update sequence.
      */
-    private void initPartitions0(UUID oldest, GridDhtPartitionsExchangeFuture exchFut, long
updateSeq) {
-        UUID locId = cctx.localNodeId();
+    private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq)
{
+        ClusterNode loc = cctx.localNode();
 
-        assert oldest != null || cctx.kernalContext().clientNode();
+        ClusterNode oldest = currentCoordinator();
 
         GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
@@ -388,7 +378,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
         if (cctx.rebalanceEnabled()) {
             boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
 
-            boolean first = (locId.equals(oldest) && locId.equals(exchId.nodeId())
&& exchId.isJoined()) || added;
+            boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId())
&& exchId.isJoined()) || added;
 
             if (first) {
                 assert exchId.isJoined() || added;
@@ -405,12 +395,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                         if (log.isDebugEnabled())
                             log.debug("Owned partition for oldest node: " + locPart);
 
-                        updateSeq = updateLocal(oldest, p, locPart.state(), updateSeq);
+                        updateSeq = updateLocal(p, locPart.state(), updateSeq);
                     }
                 }
             }
             else
-                createPartitions(oldest, aff, updateSeq);
+                createPartitions(aff, updateSeq);
         }
         else {
             // If preloader is disabled, then we simply clear out
@@ -427,7 +417,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                         if (state.active()) {
                             locPart.rent(false);
 
-                            updateSeq = updateLocal(oldest, p, locPart.state(), updateSeq);
+                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
 
                             if (log.isDebugEnabled())
                                 log.debug("Evicting partition with rebalancing disabled "
+
@@ -441,17 +431,16 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
         }
 
         if (node2part != null && node2part.valid())
-            checkEvictions(oldest, updateSeq, aff);
+            checkEvictions(updateSeq, aff);
 
         updateRebalanceVersion(aff);
     }
 
     /**
-     * @param oldest Oldest server node.
      * @param aff Affinity assignments.
      * @param updateSeq Update sequence.
      */
-    private void createPartitions(UUID oldest, List<List<ClusterNode>> aff, long
updateSeq) {
+    private void createPartitions(List<List<ClusterNode>> aff, long updateSeq)
{
         int num = cctx.affinity().partitions();
 
         for (int p = 0; p < num; p++) {
@@ -461,7 +450,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                     // will be created in MOVING state.
                     GridDhtLocalPartition locPart = createPartition(p);
 
-                    updateSeq = updateLocal(oldest, p, locPart.state(), updateSeq);
+                    updateSeq = updateLocal(p, locPart.state(), updateSeq);
                 }
             }
             // If this node's map is empty, we pre-create local partitions,
@@ -492,10 +481,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             if (exchId.isLeft())
                 removeNode(exchId.nodeId());
 
-            // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
-
-            assert oldest != null || cctx.kernalContext().clientNode();
+            ClusterNode oldest = currentCoordinator();
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap="
+ fullMapString() + ']');
@@ -530,11 +516,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             }
 
             if (affReady)
-                initPartitions0(oldest != null ? oldest.id() : null, exchFut, updateSeq);
+                initPartitions0(exchFut, updateSeq);
             else {
                 List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
 
-                createPartitions(oldest != null ? oldest.id() : null, aff, updateSeq);
+                createPartitions(aff, updateSeq);
             }
 
             consistencyCheck();
@@ -579,8 +565,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            UUID oldest = currentCoordinator();
-
             for (int p = 0; p < num; p++) {
                 GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
 
@@ -607,7 +591,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                                 assert owned : "Failed to own partition [cacheName" + cctx.name()
+ ", locPart=" +
                                     locPart + ']';
 
-                                updateSeq = updateLocal(oldest, p, locPart.state(), updateSeq);
+                                updateSeq = updateLocal(p, locPart.state(), updateSeq);
 
                                 changed = true;
 
@@ -627,7 +611,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                                     locPart + ", owners = " + owners + ']');
                         }
                         else
-                            updateSeq = updateLocal(oldest, p, locPart.state(), updateSeq);
+                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
                     }
                 }
                 else {
@@ -637,7 +621,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                         if (state == MOVING) {
                             locPart.rent(false);
 
-                            updateSeq = updateLocal(oldest, p, locPart.state(), updateSeq);
+                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
 
                             changed = true;
 
@@ -1046,24 +1030,23 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                 return false;
             }
 
+            long updateSeq = this.updateSeq.incrementAndGet();
+
             if (exchId != null)
                 lastExchangeId = exchId;
 
             if (node2part != null) {
-                for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet())
{
-                    GridDhtPartitionMap2 newPart = partMap.get(e.getKey());
-
-                    if (newPart == null)
-                        continue;
+                for (GridDhtPartitionMap2 part : node2part.values()) {
+                    GridDhtPartitionMap2 newPart = partMap.get(part.nodeId());
 
-                    GridDhtPartitionMap2 part = e.getValue();
-
-                    assert part.nodeId().equals(e.getKey());
-
-                    if (newPart.updateSequence() < part.updateSequence() || (
+                    // If for some nodes current partition has a newer map,
+                    // then we keep the newer value.
+                    if (newPart != null &&
+                        (newPart.updateSequence() < part.updateSequence() || (
                             cctx.startTopologyVersion() != null &&
                                 newPart.topologyVersion() != null && // Backward
compatibility.
-                                cctx.startTopologyVersion().compareTo(newPart.topologyVersion())
> 0)) {
+                                cctx.startTopologyVersion().compareTo(newPart.topologyVersion())
> 0))
+                        ) {
                         if (log.isDebugEnabled())
                             log.debug("Overriding partition map in full update map [exchId="
+ exchId + ", curPart=" +
                                 mapString(part) + ", newPart=" + mapString(newPart) + ']');
@@ -1104,8 +1087,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
             part2node = p2n;
 
-            final long updateSeq = this.updateSeq.incrementAndGet();
-
             boolean changed = false;
 
             AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
@@ -1113,7 +1094,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer)
>= 0) {
                 List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 
-                changed = checkEvictions(currentCoordinator(), updateSeq, aff);
+                changed = checkEvictions(updateSeq, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1257,7 +1238,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
         if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer)
>= 0) {
             List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 
-            changed = checkEvictions(currentCoordinator(), updateSeq, aff);
+            changed = checkEvictions(updateSeq, aff);
 
             updateRebalanceVersion(aff);
         }
@@ -1282,12 +1263,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     }
 
     /**
-     * @param oldest Oldest server node.
      * @param updateSeq Update sequence.
      * @param aff Affinity assignments.
      * @return Checks if any of the local partitions need to be evicted.
      */
-    private boolean checkEvictions(final UUID oldest, long updateSeq, List<List<ClusterNode>>
aff) {
+    private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff)
{
         boolean changed = false;
 
         UUID locId = cctx.nodeId();
@@ -1310,7 +1290,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                     if (nodeIds.containsAll(F.nodeIds(affNodes))) {
                         part.rent(false);
 
-                        updateSeq = updateLocal(oldest, part.id(), part.state(), updateSeq);
+                        updateSeq = updateLocal(part.id(), part.state(), updateSeq);
 
                         changed = true;
 
@@ -1335,7 +1315,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                                 if (locId.equals(n.id())) {
                                     part.rent(false);
 
-                                    updateSeq = updateLocal(oldest, part.id(), part.state(),
updateSeq);
+                                    updateSeq = updateLocal(part.id(), part.state(), updateSeq);
 
                                     changed = true;
 
@@ -1356,30 +1336,46 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     }
 
     /**
+     * @return Current coordinator node.
+     */
+    @Nullable private ClusterNode currentCoordinator() {
+        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+        assert oldest != null || cctx.kernalContext().clientNode();
+
+        return oldest;
+    }
+
+    /**
      * Updates value for single partition.
      *
-     * @param oldest Oldest server node.
      * @param p Partition.
      * @param state State.
      * @param updateSeq Update sequence.
      * @return Update sequence.
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    private long updateLocal(final UUID oldest, int p, GridDhtPartitionState state, long
updateSeq) {
+    private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
+        ClusterNode oldest = currentCoordinator();
+
         assert oldest != null || cctx.kernalContext().clientNode();
 
         // If this node became the oldest node.
-        if (cctx.localNodeId().equals(oldest)) {
+        if (cctx.localNode().equals(oldest)) {
             long seq = node2part.updateSequence();
 
             if (seq != updateSeq) {
                 if (seq > updateSeq) {
-                    if (this.updateSeq.get() < seq) {
+                    long seq0 = this.updateSeq.get();
+
+                    if (seq0 < seq) {
                         // Update global counter if necessary.
-                        boolean b = this.updateSeq.compareAndSet(this.updateSeq.get(), seq
+ 1);
+                        boolean b = this.updateSeq.compareAndSet(seq0, seq + 1);
 
-                        assert b : "Invalid update sequence [updateSeq=" + updateSeq + ",
seq=" + seq +
-                            ", curUpdateSeq=" + this.updateSeq.get() + ", node2part=" + node2part.toFullString()
+ ']';
+                        assert b : "Invalid update sequence [updateSeq=" + updateSeq +
+                            ", seq=" + seq +
+                            ", curUpdateSeq=" + this.updateSeq.get() +
+                            ", node2part=" + node2part.toFullString() + ']';
 
                         updateSeq = seq + 1;
                     }
@@ -1466,7 +1462,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
         try {
             if (part.own()) {
-                updateLocal(currentCoordinator(), part.id(), part.state(), updateSeq.incrementAndGet());
+                updateLocal(part.id(), part.state(), updateSeq.incrementAndGet());
 
                 consistencyCheck();
 
@@ -1494,7 +1490,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
             long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get();
 
-            updateLocal(currentCoordinator(), part.id(), part.state(), seq);
+            updateLocal(part.id(), part.state(), seq);
 
             consistencyCheck();
         }


Mime
View raw message