ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: Merge branch 'ignite-4154' into ignite-4154-opt2
Date Fri, 11 Nov 2016 10:16:20 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4154-opt2 c48957940 -> 0fb0578aa


Merge branch 'ignite-4154' into ignite-4154-opt2


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0fb0578a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0fb0578a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0fb0578a

Branch: refs/heads/ignite-4154-opt2
Commit: 0fb0578aaeb074ed9ba1fda1abbc5adc47bc0c99
Parents: c489579
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Nov 11 13:07:51 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Nov 11 13:07:51 2016 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         | 30 ++++++--------
 .../GridCachePartitionExchangeManager.java      | 11 +++++
 .../GridDhtPartitionsExchangeFuture.java        | 43 ++++++++++++--------
 3 files changed, 49 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0fb0578a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 8b945d4..640b7cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1621,14 +1621,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
     }
 
     /**
-     * Gets cache remote nodes for cache with given name.
-     *
-     * @param cacheName Cache name.
      * @param topVer Topology version.
-     * @return Collection of cache nodes.
+     * @param node Node.
+     * @return DHT caches started on given node.
      */
-    public Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion
topVer) {
-        return resolveDiscoCache(cacheName, topVer).remoteCacheNodes(cacheName, topVer.topologyVersion());
+    public Collection<String> nodeCaches(AffinityTopologyVersion topVer, ClusterNode
node) {
+        return resolveDiscoCache(null, topVer).nodeCaches(node);
     }
 
     /**
@@ -2743,7 +2741,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
         }
 
         Collection<String> nodeCaches(ClusterNode node) {
-            return null;
+            List<String> res = new ArrayList<>();
+
+            for (Map.Entry<String, Collection<ClusterNode>> e : affCacheNodes.entrySet())
{
+                if (F.contains(e.getValue(), node))
+                    res.add(e.getKey());
+            }
+
+            return res;
         }
 
         /**
@@ -2824,17 +2829,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
         }
 
         /**
-         * Gets all remote nodes that have cache with given name.
-         *
-         * @param cacheName Cache name.
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final
long topVer) {
-            return filter(topVer, rmtCacheNodes.get(cacheName));
-        }
-
-        /**
          * Gets all remote nodes that have at least one cache configured.
          *
          * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0fb0578a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index faafa84..e587668 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -102,6 +102,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -119,6 +120,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Exchange history size. */
     private static final int EXCHANGE_HISTORY_SIZE = 1000;
 
+    /** */
+    private final boolean skipFirstExchangeMsg = getBoolean("SKIP_FIRST_EXCHANGE_MSG", true);
+
     /** Atomic reference for pending timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
 
@@ -300,10 +304,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
     };
 
+    public boolean skipFirstExchangeMessage() {
+        return skipFirstExchangeMsg;
+    }
+
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
+        if (skipFirstExchangeMsg)
+            cctx.kernalContext().addNodeAttribute("SKIP_FIRST_EXCHANGE_MSG", true);
+
         exchWorker = new ExchangeWorker();
 
         cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT,
EVT_NODE_FAILED,

http://git-wip-us.apache.org/repos/asf/ignite/blob/0fb0578a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f9fc097..7c22f4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -81,6 +81,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 
 /**
  * Future for exchanging partition maps.
@@ -92,9 +93,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD,
10);
 
     /** */
-    private static final boolean SKIP_FIRST_EXCHANGE_MSG = false;
-
-    /** */
     private static final long serialVersionUID = 0L;
 
     /** Dummy flag. */
@@ -742,31 +740,40 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
 
         if (crd.isLocal()) {
-            if (SKIP_FIRST_EXCHANGE_MSG && discoEvt.type() == EVT_NODE_JOINED &&
!discoEvt.eventNode().isLocal()) {
-                ClusterNode node = discoEvt.eventNode();
+            ClusterNode node = discoEvt.eventNode();
 
+            if (discoEvt.type() == EVT_NODE_JOINED && !discoEvt.eventNode().isLocal()
&& Boolean.TRUE.equals(node.attribute("SKIP_FIRST_EXCHANGE_MSG"))) {
                 assert !CU.clientNode(node) : discoEvt;
                 assert srvNodes.contains(node);
 
                 boolean rmv = remaining.remove(node.id());
                 assert rmv;
 
-                List<String> caches = new ArrayList<>();
+                Collection<String> caches = cctx.discovery().nodeCaches(topologyVersion(),
node);
+
+                if (!caches.isEmpty()) {
+                    GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(exchangeId(),
+                        false,
+                        null,
+                        false);
 
-                GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(exchangeId(),
-                    false,
-                    null,
-                    false);
+                    for (String cache : caches) {
+                        Map<Integer, GridDhtPartitionState> m = new HashMap<>();
 
-                for (String cache : caches) {
-                    Map<Integer, GridDhtPartitionState> m = new HashMap<>();
+                        GridDhtPartitionMap2 partMap = new GridDhtPartitionMap2(node.id(),
1, topologyVersion(), m, true);
 
-                    GridDhtPartitionMap2 partMap = new GridDhtPartitionMap2(node.id(), 0,
topologyVersion(), m, true);
+                        GridAffinityAssignmentCache assign = cctx.affinity().cacheAssignment(CU.cacheId(cache));
 
-                    msg.addLocalPartitionMap(CU.cacheId(cache), partMap, null);
-                }
+                        for (Integer part : assign.primaryPartitions(node.id(), topologyVersion()))
+                            partMap.put(part, MOVING);
+                        for (Integer part : assign.backupPartitions(node.id(), topologyVersion()))
+                            partMap.put(part, MOVING);
 
-                updatePartitionSingleMap(msg);
+                        msg.addLocalPartitionMap(CU.cacheId(cache), partMap, null);
+                    }
+
+                    updatePartitionSingleMap(msg);
+                }
             }
 
             if (remaining.isEmpty())
@@ -775,11 +782,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         else {
             boolean skipSnd = false;
 
-            if (SKIP_FIRST_EXCHANGE_MSG && discoEvt.type() == EVT_NODE_JOINED &&
discoEvt.eventNode().isLocal())
+            if (cctx.exchange().skipFirstExchangeMessage() && discoEvt.type() ==
EVT_NODE_JOINED && discoEvt.eventNode().isLocal())
                 skipSnd = true;
 
             if (!skipSnd)
                 sendPartitions(crd);
+            else
+                log.info("Skip first exchange message [topVer=" + topologyVersion() + ']');
         }
 
         initDone();


Mime
View raw message