ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5595 Client event exchange optimization
Date Tue, 27 Jun 2017 12:48:07 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5595 [created] 7d8e645f6


ignite-5595 Client event exchange optimization


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d8e645f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d8e645f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d8e645f

Branch: refs/heads/ignite-5595
Commit: 7d8e645f65ffb4df7363f62c02803c2d1415e3b6
Parents: d2b4759
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Jun 27 15:47:55 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Jun 27 15:47:55 2017 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       | 147 ++++++++++---------
 .../GridDhtPartitionsExchangeFuture.java        |  24 +--
 2 files changed, 94 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7d8e645f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f3170fa..bb6aab3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -62,6 +62,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
@@ -208,6 +209,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
      * @throws IgniteCheckedException If failed.
      */
     private boolean waitForRent() throws IgniteCheckedException {
+        if (!grp.affinityNode())
+            return false;
+
         final long longOpDumpTimeout =
             IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT,
60_000);
 
@@ -379,91 +383,93 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
      * @param updateSeq Update sequence.
      */
     private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq)
{
-        ClusterNode loc = ctx.localNode();
+        List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion());
 
-        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+        if (grp.affinityNode()) {
+            ClusterNode loc = ctx.localNode();
 
-        GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+            ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
-        assert topVer.equals(exchFut.topologyVersion()) :
-            "Invalid topology [topVer=" + topVer +
-                ", grp=" + grp.cacheOrGroupName() +
-                ", futVer=" + exchFut.topologyVersion() +
-                ", fut=" + exchFut + ']';
-        assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) :
-            "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
-                ", grp=" + grp.cacheOrGroupName() +
-                ", futVer=" + exchFut.topologyVersion() +
-                ", fut=" + exchFut + ']';
+            GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
-        List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion());
+            assert topVer.equals(exchFut.topologyVersion()) :
+                "Invalid topology [topVer=" + topVer +
+                    ", grp=" + grp.cacheOrGroupName() +
+                    ", futVer=" + exchFut.topologyVersion() +
+                    ", fut=" + exchFut + ']';
+            assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) :
+                "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
+                    ", grp=" + grp.cacheOrGroupName() +
+                    ", futVer=" + exchFut.topologyVersion() +
+                    ", fut=" + exchFut + ']';
 
-        int num = grp.affinity().partitions();
+            int num = grp.affinity().partitions();
 
-        if (grp.rebalanceEnabled()) {
-            boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
+            if (grp.rebalanceEnabled()) {
+                boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
 
-            boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId())
&& exchId.isJoined());
+                boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId())
&& exchId.isJoined());
 
-            if (first) {
-                assert exchId.isJoined() || added;
+                if (first) {
+                    assert exchId.isJoined() || added;
 
-                for (int p = 0; p < num; p++) {
-                    if (localNode(p, aff)) {
-                        GridDhtLocalPartition locPart = createPartition(p);
+                    for (int p = 0; p < num; p++) {
+                        if (localNode(p, aff)) {
+                            GridDhtLocalPartition locPart = createPartition(p);
 
-                        boolean owned = locPart.own();
+                            boolean owned = locPart.own();
 
-                        assert owned : "Failed to own partition for oldest node [grp=" +
grp.cacheOrGroupName() +
-                            ", part=" + locPart + ']';
+                            assert owned : "Failed to own partition for oldest node [grp="
+ grp.cacheOrGroupName() +
+                                ", part=" + locPart + ']';
 
-                        if (log.isDebugEnabled())
-                            log.debug("Owned partition for oldest node: " + locPart);
+                            if (log.isDebugEnabled())
+                                log.debug("Owned partition for oldest node: " + locPart);
 
-                        updateSeq = updateLocal(p, locPart.state(), updateSeq);
+                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
+                        }
                     }
                 }
+                else
+                    createPartitions(aff, updateSeq);
             }
-            else
-                createPartitions(aff, updateSeq);
-        }
-        else {
-            // If preloader is disabled, then we simply clear out
-            // the partitions this node is not responsible for.
-            for (int p = 0; p < num; p++) {
-                GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
+            else {
+                // If preloader is disabled, then we simply clear out
+                // the partitions this node is not responsible for.
+                for (int p = 0; p < num; p++) {
+                    GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
 
-                boolean belongs = localNode(p, aff);
+                    boolean belongs = localNode(p, aff);
 
-                if (locPart != null) {
-                    if (!belongs) {
-                        GridDhtPartitionState state = locPart.state();
+                    if (locPart != null) {
+                        if (!belongs) {
+                            GridDhtPartitionState state = locPart.state();
 
-                        if (state.active()) {
-                            locPart.rent(false);
+                            if (state.active()) {
+                                locPart.rent(false);
 
-                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
+                                updateSeq = updateLocal(p, locPart.state(), updateSeq);
 
-                            if (log.isDebugEnabled())
-                                log.debug("Evicting partition with rebalancing disabled "
+
-                                    "(it does not belong to affinity): " + locPart);
+                                if (log.isDebugEnabled())
+                                    log.debug("Evicting partition with rebalancing disabled
" +
+                                        "(it does not belong to affinity): " + locPart);
+                            }
                         }
+                        else
+                            locPart.own();
                     }
-                    else
-                        locPart.own();
-                }
-                else if (belongs) {
-                    locPart = createPartition(p);
+                    else if (belongs) {
+                        locPart = createPartition(p);
 
-                    locPart.own();
+                        locPart.own();
 
-                    updateLocal(p, locPart.state(), updateSeq);
+                        updateLocal(p, locPart.state(), updateSeq);
+                    }
                 }
             }
-        }
 
-        if (node2part != null && node2part.valid())
-            checkEvictions(updateSeq, aff);
+            if (node2part != null && node2part.valid())
+                checkEvictions(updateSeq, aff);
+        }
 
         updateRebalanceVersion(aff);
     }
@@ -473,6 +479,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
      * @param updateSeq Update sequence.
      */
     private void createPartitions(List<List<ClusterNode>> aff, long updateSeq)
{
+        if (!grp.affinityNode())
+            return;
+
         int num = grp.affinity().partitions();
 
         for (int p = 0; p < num; p++) {
@@ -527,15 +536,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             }
 
             try {
-                GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
-
                 if (stopping)
                     return;
 
+                GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+
                 assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version
[topVer=" +
                     topVer + ", exchId=" + exchId + ']';
 
-                if (exchId.isLeft())
+                if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
                     removeNode(exchId.nodeId());
 
                 ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
@@ -547,8 +556,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
                 cntrMap.clear();
 
+                boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
+
                 // If this is the oldest node.
-                if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(),
grp.receivedFrom()))) {
+                if (oldest != null && (loc.equals(oldest) || grpStarted)) {
                     if (node2part == null) {
                         node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(),
updateSeq);
 
@@ -572,12 +583,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                     }
                 }
 
-                if (affReady)
-                    initPartitions0(exchFut, updateSeq);
-                else {
-                    List<List<ClusterNode>> aff = grp.affinity().idealAssignment();
+                if (grpStarted ||
+                    exchFut.discoveryEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
+                    exchFut.serverNodeDiscoveryEvent()) {
+                    if (affReady)
+                        initPartitions0(exchFut, updateSeq);
+                    else {
+                        List<List<ClusterNode>> aff = grp.affinity().idealAssignment();
 
-                    createPartitions(aff, updateSeq);
+                        createPartitions(aff, updateSeq);
+                    }
                 }
 
                 consistencyCheck();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d8e645f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c8138f7..1f13e56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1150,7 +1150,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      * @return {@code True} if exchange triggered by server node join or fail.
      */
-    private boolean serverNodeDiscoveryEvent() {
+    public boolean serverNodeDiscoveryEvent() {
         assert discoEvt != null;
 
         return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode());
@@ -1173,16 +1173,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
        }
 
         if (err == null && realExchange) {
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (grp.isLocal())
-                    continue;
+            if (centralizedAff) {
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    if (grp.isLocal())
+                        continue;
 
-                try {
-                    if (centralizedAff)
+                    try {
                         grp.topology().initPartitions(this);
-                }
-                catch (IgniteInterruptedCheckedException e) {
-                    U.error(log, "Failed to initialize partitions.", e);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        U.error(log, "Failed to initialize partitions.", e);
+                    }
                 }
             }
 
@@ -1199,9 +1200,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            if (discoEvt.type() == EVT_NODE_LEFT ||
+            if (serverNodeDiscoveryEvent() &&
+                (discoEvt.type() == EVT_NODE_LEFT ||
                 discoEvt.type() == EVT_NODE_FAILED ||
-                discoEvt.type() == EVT_NODE_JOINED)
+                discoEvt.type() == EVT_NODE_JOINED))
                 detectLostPartitions();
 
             Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());


Mime
View raw message