ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [12/17] ignite git commit: Merge branch 'ignite-1.8.4-p1' into ignite-1.9.2
Date Tue, 11 Apr 2017 12:35:45 GMT
Merge branch 'ignite-1.8.4-p1' into ignite-1.9.2

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
#	modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b78aebed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b78aebed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b78aebed

Branch: refs/heads/ignite-4932
Commit: b78aebedeac80de37910e002c2b579bfd0de7f2d
Parents: 9d8de41 8273e67
Author: dkarachentsev <dkarachentsev@gridgain.com>
Authored: Mon Apr 3 14:52:58 2017 +0300
Committer: dkarachentsev <dkarachentsev@gridgain.com>
Committed: Mon Apr 3 14:52:58 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 310 ++++++++
 .../discovery/GridDiscoveryManager.java         | 710 ++++++-------------
 .../eventstorage/DiscoveryEventListener.java    |  33 +
 .../eventstorage/GridEventStorageManager.java   | 162 ++++-
 .../affinity/GridAffinityAssignmentCache.java   |   7 +-
 .../cache/CacheAffinitySharedManager.java       |  35 +-
 .../cache/GridCacheAffinityManager.java         |   2 +-
 .../GridCachePartitionExchangeManager.java      |  64 +-
 .../dht/GridClientPartitionTopology.java        |  20 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |   7 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  44 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   4 +-
 .../GridDhtPartitionsExchangeFuture.java        |  33 +-
 .../service/GridServiceProcessor.java           |  21 +-
 .../GridDiscoveryManagerAliveCacheSelfTest.java |  61 +-
 .../GridDiscoveryManagerAttributesSelfTest.java |  10 +-
 .../discovery/GridDiscoveryManagerSelfTest.java | 214 ------
 .../IgniteTopologyPrintFormatSelfTest.java      |   8 +-
 .../testsuites/IgniteKernalSelfTestSuite.java   |   3 -
 19 files changed, 851 insertions(+), 897 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b78aebed/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index f94d51b,80549dc..62fed16
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -112,8 -108,8 +108,9 @@@ import org.apache.ignite.spi.discovery.
  import org.apache.ignite.spi.discovery.DiscoverySpiListener;
  import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
  import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
 +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
  import org.apache.ignite.thread.IgniteThread;
+ import org.jetbrains.annotations.NotNull;
  import org.jetbrains.annotations.Nullable;
  import org.jsr166.ConcurrentHashMap8;
  
@@@ -1892,29 -1901,114 +1902,137 @@@ public class GridDiscoveryManager exten
      }
  
      /**
 +     * @return {@code True} if local node client and discovery SPI supports reconnect.
 +     */
 +    public boolean reconnectSupported() {
 +        DiscoverySpi spi = getSpi();
 +
 +        return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
 +            !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
 +    }
 +
 +    /**
 +     * Leave cluster and try to join again.
 +     *
 +     * @throws IgniteSpiException If failed.
 +     */
 +    public void reconnect() {
 +        assert reconnectSupported();
 +
 +        DiscoverySpi discoverySpi = getSpi();
 +
 +        ((TcpDiscoverySpi)discoverySpi).reconnect();
 +    }
 +
 +    /**
+      * @param loc Local node.
+      * @param topSnapshot Topology snapshot.
+      * @return Newly created discovery cache.
+      */
+     @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode>
topSnapshot) {
+         HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
+         HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
+ 
+         ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size());
+         ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size());
+         ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
+         ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
+ 
+         for (ClusterNode node : topSnapshot) {
+             if (alive(node))
+                 alives.add(node.id());
+ 
+             if (node.isDaemon())
+                 daemonNodes.add(node);
+             else {
+                 allNodes.add(node);
+ 
+                 if (!node.isLocal())
+                     rmtNodes.add(node);
+ 
+                 if (!CU.clientNode(node))
+                     srvNodes.add(node);
+             }
+ 
+             nodeMap.put(node.id(), node);
+         }
+ 
+         assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local
node" +
+             " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
+ 
+         Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
+         Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size());
+ 
+         Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+         Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+         Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+ 
+         Set<Integer> nearEnabledCaches = new HashSet<>();
+ 
+         for (ClusterNode node : allNodes) {
+             assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node="
+ node + ']';
+             assert !node.isDaemon();
+ 
+             for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet())
{
+                 String cacheName = entry.getKey();
+                 CachePredicate filter = entry.getValue();
+ 
+                 if (filter.cacheNode(node)) {
+                     allNodesWithCaches.add(node);
+ 
+                     if(!CU.clientNode(node))
+                         srvNodesWithCaches.add(node);
+ 
+                     if (!node.isLocal())
+                         rmtNodesWithCaches.add(node);
+ 
+                     addToMap(allCacheNodes, cacheName, node);
+ 
+                     if (filter.dataNode(node))
+                         addToMap(affCacheNodes, cacheName, node);
+ 
+                     if (filter.nearNode(node))
+                         nearEnabledCaches.add(CU.cacheId(cacheName));
+                 }
+             }
+         }
+ 
+         return new DiscoCache(
+             loc,
+             Collections.unmodifiableList(rmtNodes),
+             Collections.unmodifiableList(allNodes),
+             Collections.unmodifiableList(srvNodes),
+             Collections.unmodifiableList(daemonNodes),
+             U.sealList(srvNodesWithCaches),
+             U.sealList(allNodesWithCaches),
+             U.sealList(rmtNodesWithCaches),
+             Collections.unmodifiableMap(allCacheNodes),
+             Collections.unmodifiableMap(affCacheNodes),
+             Collections.unmodifiableMap(nodeMap),
+             Collections.unmodifiableSet(nearEnabledCaches),
+             alives);
+     }
+ 
+     /**
+      * Adds node to map.
+      *
+      * @param cacheMap Map to add to.
+      * @param cacheName Cache name.
+      * @param rich Node to add
+      */
+     private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName,
ClusterNode rich) {
+         List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
+ 
+         if (cacheNodes == null) {
+             cacheNodes = new ArrayList<>();
+ 
+             cacheMap.put(CU.cacheId(cacheName), cacheNodes);
+         }
+ 
+         cacheNodes.add(rich);
+     }
+ 
+     /**
       * Updates topology version if current version is smaller than updated.
       *
       * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78aebed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78aebed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78aebed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78aebed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78aebed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78aebed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78aebed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78aebed/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6ced5e6,99146aa..adfbc11
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -315,10 -313,8 +315,10 @@@ public class GridServiceProcessor exten
  
          busyLock.block();
  
 +        U.shutdownNow(GridServiceProcessor.class, depExe, log);
 +
          if (!ctx.clientNode())
-             ctx.event().removeLocalEventListener(topLsnr);
+             ctx.event().removeDiscoveryEventListener(topLsnr);
  
          Collection<ServiceContextImpl> ctxs = new ArrayList<>();
  
@@@ -1576,19 -1586,16 +1576,22 @@@
                          if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
                              return;
                      }
 +                    else if (msg instanceof DynamicCacheChangeBatch) {
 +                        if (!((DynamicCacheChangeBatch)msg).exchangeNeeded())
 +                            return;
 +                    }
 +                    else
 +                        return;
                  }
                  else
-                     topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(),
0);
+                     topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
  
 -                depExe.execute(new BusyRunnable() {
 +                depExe.execute(new DepRunnable() {
                      @Override public void run0() {
-                         ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
+                         // In case the cache instance isn't tracked by DiscoveryManager
anymore.
+                         discoCache.updateAlives(ctx.discovery());
+ 
+                         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
  
                          if (oldest != null && oldest.isLocal()) {
                              final Collection<GridServiceDeployment> retries = new
ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78aebed/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------


Mime
View raw message