ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/6] ignite git commit: Moved logic related to caches discovery data handling to ClusterCachesInfo. Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
Date Thu, 18 May 2017 08:34:16 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 66e780f..3c65326 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -18,10 +18,9 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -33,29 +32,22 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage
{
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Discovery custom message ID. */
+    private IgniteUuid id = IgniteUuid.randomUuid();
+
     /** Change requests. */
     @GridToStringInclude
     private Collection<DynamicCacheChangeRequest> reqs;
 
-    /** Client nodes map. Used in discovery data exchange. */
-    @GridToStringInclude
-    private Map<String, Map<UUID, Boolean>> clientNodes;
-
-    /** Custom message ID. */
-    private IgniteUuid id = IgniteUuid.randomUuid();
-
-    /** */
-    private boolean clientReconnect;
-
-    /** */
-    private boolean startCaches;
+    /** Cache updates to be executed on exchange. */
+    private transient ExchangeActions exchangeActions;
 
     /**
      * @param reqs Requests.
      */
-    public DynamicCacheChangeBatch(
-        Collection<DynamicCacheChangeRequest> reqs
-    ) {
+    public DynamicCacheChangeBatch(Collection<DynamicCacheChangeRequest> reqs) {
+        assert !F.isEmpty(reqs) : reqs;
+
         this.reqs = reqs;
     }
 
@@ -64,34 +56,6 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage
{
         return id;
     }
 
-    /**
-     * @param id Message ID.
-     */
-    public void id(IgniteUuid id) {
-        this.id = id;
-    }
-
-    /**
-     * @return Collection of change requests.
-     */
-    public Collection<DynamicCacheChangeRequest> requests() {
-        return reqs;
-    }
-
-    /**
-     * @return Client nodes map.
-     */
-    public Map<String, Map<UUID, Boolean>> clientNodes() {
-        return clientNodes;
-    }
-
-    /**
-     * @param clientNodes Client nodes map.
-     */
-    public void clientNodes(Map<String, Map<UUID, Boolean>> clientNodes) {
-        this.clientNodes = clientNodes;
-    }
-
     /** {@inheritDoc} */
     @Nullable @Override public DiscoveryCustomMessage ackMessage() {
         return null;
@@ -103,45 +67,33 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage
{
     }
 
     /**
-     * @param clientReconnect {@code True} if this is discovery data sent on client reconnect.
-     */
-    public void clientReconnect(boolean clientReconnect) {
-        this.clientReconnect = clientReconnect;
-    }
-
-    /**
-     * @return {@code True} if this is discovery data sent on client reconnect.
+     * @return Collection of change requests.
      */
-    public boolean clientReconnect() {
-        return clientReconnect;
+    public Collection<DynamicCacheChangeRequest> requests() {
+        return reqs;
     }
 
     /**
-     * @return {@code True} if required to start all caches on client node.
+     * @return {@code True} if request should trigger partition exchange.
      */
-    public boolean startCaches() {
-        return startCaches;
+    public boolean exchangeNeeded() {
+        return exchangeActions != null;
     }
 
     /**
-     * @param startCaches {@code True} if required to start all caches on client node.
+     * @return Cache updates to be executed on exchange.
      */
-    public void startCaches(boolean startCaches) {
-        this.startCaches = startCaches;
+    ExchangeActions exchangeActions() {
+        return exchangeActions;
     }
 
     /**
-     * @return {@code True} if request should trigger partition exchange.
+     * @param exchangeActions Cache updates to be executed on exchange.
      */
-    public boolean exchangeNeeded() {
-        if (reqs != null) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.exchangeNeeded())
-                    return true;
-            }
-        }
+    void exchangeActions(ExchangeActions exchangeActions) {
+        assert exchangeActions != null && !exchangeActions.empty() : exchangeActions;
 
-        return false;
+        this.exchangeActions = exchangeActions;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 9d2563d..f8c2c7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.QuerySchema;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -88,78 +88,114 @@ public class DynamicCacheChangeRequest implements Serializable {
     /** Dynamic schema. */
     private QuerySchema schema;
 
-    /** */
-    private transient boolean exchangeNeeded;
-
-    /** */
-    private transient AffinityTopologyVersion cacheFutTopVer;
-
     /**
-     * Constructor creates cache stop request.
-     *
+     * @param reqId Unique request ID.
      * @param cacheName Cache stop name.
      * @param initiatingNodeId Initiating node ID.
      */
     public DynamicCacheChangeRequest(UUID reqId, String cacheName, UUID initiatingNodeId)
{
+        assert reqId != null;
+        assert cacheName != null;
+        assert initiatingNodeId != null;
+
         this.reqId = reqId;
         this.cacheName = cacheName;
         this.initiatingNodeId = initiatingNodeId;
     }
 
     /**
-     * @return Request ID.
+     * @param reqId Unique request ID.
+     * @param state New cluster state.
+     * @param initiatingNodeId Initiating node ID.
      */
-    public UUID requestId() {
-        return reqId;
+    public DynamicCacheChangeRequest(UUID reqId, ClusterState state, UUID initiatingNodeId)
{
+        assert reqId != null;
+        assert state != null;
+        assert initiatingNodeId != null;
+
+        this.reqId = reqId;
+        this.state = state;
+        this.initiatingNodeId = initiatingNodeId;
     }
 
     /**
-     * @return {@code True} if request should trigger partition exchange.
+     * @param ctx Context.
+     * @param cacheName Cache name.
+     * @return Request to reset lost partitions.
      */
-    public boolean exchangeNeeded() {
-        return exchangeNeeded;
+    static DynamicCacheChangeRequest resetLostPartitions(GridKernalContext ctx, String cacheName)
{
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(),
cacheName, ctx.localNodeId());
+
+        req.markResetLostPartitions();
+
+        return req;
     }
 
     /**
-     * @return State.
+     * @param ctx Context.
+     * @param cfg0 Template configuration.
+     * @return Request to add template.
      */
-    public ClusterState state() {
-        return state;
+    static DynamicCacheChangeRequest addTemplateRequest(GridKernalContext ctx, CacheConfiguration<?,
?> cfg0) {
+        CacheConfiguration<?, ?> cfg = new CacheConfiguration<>(cfg0);
+
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(),
cfg.getName(), ctx.localNodeId());
+
+        req.template(true);
+        req.startCacheConfiguration(cfg);
+        req.schema(new QuerySchema(cfg.getQueryEntities()));
+        req.deploymentId(IgniteUuid.randomUuid());
+
+        return req;
     }
 
     /**
-     * @param state State.
+     * @param ctx Context.
+     * @param cacheName Cache name.
+     * @return Request to close client cache.
      */
-    public void state(ClusterState state) {
-        this.state = state;
+    static DynamicCacheChangeRequest closeRequest(GridKernalContext ctx, String cacheName)
{
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(),
cacheName, ctx.localNodeId());
+
+        req.close(true);
+
+        return req;
     }
 
     /**
-     * @return {@code True} if global caches state is changes.
+     * @param ctx Context.
+     * @param cacheName Cache name.
+     * @param destroy Destroy flag.
+     * @return Cache stop request.
      */
-    public boolean globalStateChange() {
-        return state != null;
+    static DynamicCacheChangeRequest stopRequest(GridKernalContext ctx, String cacheName,
boolean destroy) {
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(),
cacheName, ctx.localNodeId());
+
+        req.stop(true);
+        req.destroy(destroy);
+
+        return req;
     }
 
     /**
-     * @param cacheFutTopVer Ready topology version when dynamic cache future should be completed.
+     * @return Request ID.
      */
-    public void cacheFutureTopologyVersion(AffinityTopologyVersion cacheFutTopVer) {
-        this.cacheFutTopVer = cacheFutTopVer;
+    public UUID requestId() {
+        return reqId;
     }
 
     /**
-     * @return Ready topology version when dynamic cache future should be completed.
+     * @return State.
      */
-    @Nullable public AffinityTopologyVersion cacheFutureTopologyVersion() {
-        return cacheFutTopVer;
+    public ClusterState state() {
+        return state;
     }
 
     /**
-     * @param exchangeNeeded {@code True} if request should trigger partition exchange.
+     * @return {@code True} if global caches state is changes.
      */
-    public void exchangeNeeded(boolean exchangeNeeded) {
-        this.exchangeNeeded = exchangeNeeded;
+    public boolean globalStateChange() {
+        return state != null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 09b4c3a..40d3706 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -17,10 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -46,21 +45,12 @@ public class DynamicCacheDescriptor {
     @GridToStringExclude
     private CacheConfiguration cacheCfg;
 
-    /** Locally configured flag. */
-    private boolean locCfg;
-
     /** Statically configured flag. */
-    private boolean staticCfg;
-
-    /** Started flag. */
-    private boolean started;
+    private final boolean staticCfg;
 
     /** Cache type. */
     private CacheType cacheType;
 
-    /** */
-    private volatile Map<UUID, CacheConfiguration> rmtCfgs;
-
     /** Template configuration flag. */
     private boolean template;
 
@@ -71,19 +61,10 @@ public class DynamicCacheDescriptor {
     private boolean updatesAllowed = true;
 
     /** */
-    private AffinityTopologyVersion startTopVer;
-
-    /** */
-    private boolean rcvdOnDiscovery;
-
-    /** */
     private Integer cacheId;
 
     /** */
-    private UUID rcvdFrom;
-
-    /** */
-    private AffinityTopologyVersion rcvdFromVer;
+    private final UUID rcvdFrom;
 
     /** Mutex. */
     private final Object mux = new Object();
@@ -92,7 +73,16 @@ public class DynamicCacheDescriptor {
     private volatile CacheObjectContext objCtx;
 
     /** */
-    private transient AffinityTopologyVersion clientCacheStartVer;
+    private boolean rcvdOnDiscovery;
+
+    /** */
+    private AffinityTopologyVersion startTopVer;
+
+    /** */
+    private AffinityTopologyVersion rcvdFromVer;
+
+    /** */
+    private AffinityTopologyVersion clientCacheStartVer;
 
     /** Mutex to control schema. */
     private final Object schemaMux = new Object();
@@ -105,21 +95,34 @@ public class DynamicCacheDescriptor {
      * @param cacheCfg Cache configuration.
      * @param cacheType Cache type.
      * @param template {@code True} if this is template configuration.
+     * @param rcvdFrom ID of node provided cache configuration
+     * @param staticCfg {@code True} if cache statically configured.
      * @param deploymentId Deployment ID.
+     * @param schema Query schema.
      */
     @SuppressWarnings("unchecked")
     public DynamicCacheDescriptor(GridKernalContext ctx,
         CacheConfiguration cacheCfg,
         CacheType cacheType,
         boolean template,
+        UUID rcvdFrom,
+        boolean staticCfg,
         IgniteUuid deploymentId,
         QuerySchema schema) {
         assert cacheCfg != null;
         assert schema != null;
 
+        if (cacheCfg.getCacheMode() == CacheMode.REPLICATED && cacheCfg.getNearConfiguration()
!= null) {
+            cacheCfg = new CacheConfiguration(cacheCfg);
+
+            cacheCfg.setNearConfiguration(null);
+        }
+
         this.cacheCfg = cacheCfg;
         this.cacheType = cacheType;
         this.template = template;
+        this.rcvdFrom = rcvdFrom;
+        this.staticCfg = staticCfg;
         this.deploymentId = deploymentId;
 
         pluginMgr = new CachePluginManager(ctx, cacheCfg);
@@ -139,20 +142,6 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @return Start topology version.
-     */
-    @Nullable public AffinityTopologyVersion startTopologyVersion() {
-        return startTopVer;
-    }
-
-    /**
-     * @param startTopVer Start topology version.
-     */
-    public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
-        this.startTopVer = startTopVer;
-    }
-
-    /**
      * @return {@code True} if this is template configuration.
      */
     public boolean template() {
@@ -174,27 +163,6 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @param deploymentId Deployment ID.
-     */
-    public void deploymentId(IgniteUuid deploymentId) {
-        this.deploymentId = deploymentId;
-    }
-
-    /**
-     * @return Locally configured flag.
-     */
-    public boolean locallyConfigured() {
-        return locCfg;
-    }
-
-    /**
-     * @param locCfg Locally configured flag.
-     */
-    public void locallyConfigured(boolean locCfg) {
-        this.locCfg = locCfg;
-    }
-
-    /**
      * @return {@code True} if statically configured.
      */
     public boolean staticallyConfigured() {
@@ -202,30 +170,12 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @param staticCfg {@code True} if statically configured.
+     * @return Cache name.
      */
-    public void staticallyConfigured(boolean staticCfg) {
-        this.staticCfg = staticCfg;
-    }
+    public String cacheName() {
+        assert cacheCfg != null : this;
 
-    /**
-     * @return {@code True} if started flag was flipped by this call.
-     */
-    public boolean onStart() {
-        if (!started) {
-            started = true;
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @return Started flag.
-     */
-    public boolean started() {
-        return started;
+        return cacheCfg.getName();
     }
 
     /**
@@ -239,6 +189,7 @@ public class DynamicCacheDescriptor {
      * Creates and caches cache object context if needed.
      *
      * @param proc Object processor.
+     * @return Cache object context.
      */
     public CacheObjectContext cacheObjectContext(IgniteCacheObjectProcessor proc) throws
IgniteCheckedException {
         if (objCtx == null) {
@@ -259,36 +210,6 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @param nodeId Remote node ID.
-     * @return Configuration.
-     */
-    public CacheConfiguration remoteConfiguration(UUID nodeId) {
-        Map<UUID, CacheConfiguration> cfgs = rmtCfgs;
-
-        return cfgs == null ? null : cfgs.get(nodeId);
-    }
-
-    /**
-     * @param nodeId Remote node ID.
-     * @param cfg Remote node configuration.
-     */
-    public void addRemoteConfiguration(UUID nodeId, CacheConfiguration cfg) {
-        Map<UUID, CacheConfiguration> cfgs = rmtCfgs;
-
-        if (cfgs == null)
-            rmtCfgs = cfgs = new HashMap<>();
-
-        cfgs.put(nodeId, cfg);
-    }
-
-    /**
-     *
-     */
-    public void clearRemoteConfigurations() {
-        rmtCfgs = null;
-    }
-
-    /**
      * @return Updates allowed flag.
      */
     public boolean updatesAllowed() {
@@ -305,43 +226,51 @@ public class DynamicCacheDescriptor {
     /**
      * @return {@code True} if received in discovery data.
      */
-    public boolean receivedOnDiscovery() {
+    boolean receivedOnDiscovery() {
         return rcvdOnDiscovery;
     }
 
     /**
      * @param rcvdOnDiscovery {@code True} if received in discovery data.
      */
-    public void receivedOnDiscovery(boolean rcvdOnDiscovery) {
+    void receivedOnDiscovery(boolean rcvdOnDiscovery) {
         this.rcvdOnDiscovery = rcvdOnDiscovery;
     }
 
     /**
-     * @param nodeId ID of node provided cache configuration in discovery data.
+     * @return ID of node provided cache configuration in discovery data.
      */
-    public void receivedFrom(UUID nodeId) {
-        rcvdFrom = nodeId;
+    @Nullable public UUID receivedFrom() {
+        return rcvdFrom;
     }
 
     /**
      * @return Topology version when node provided cache configuration was started.
      */
-    @Nullable public AffinityTopologyVersion receivedFromStartVersion() {
+    @Nullable AffinityTopologyVersion receivedFromStartVersion() {
         return rcvdFromVer;
     }
 
     /**
      * @param rcvdFromVer Topology version when node provided cache configuration was started.
      */
-    public void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
+    void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
         this.rcvdFromVer = rcvdFromVer;
     }
 
+
     /**
-     * @return ID of node provided cache configuration in discovery data.
+     * @return Start topology version.
      */
-    @Nullable public UUID receivedFrom() {
-        return rcvdFrom;
+    @Nullable public AffinityTopologyVersion startTopologyVersion() {
+        return startTopVer;
+    }
+
+    /**
+     * @param startTopVer Start topology version.
+     */
+    public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
+        this.startTopVer = startTopVer;
     }
 
     /**
@@ -354,7 +283,7 @@ public class DynamicCacheDescriptor {
     /**
      * @param clientCacheStartVer Version when client cache on local node was started.
      */
-    public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
+    void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
         this.clientCacheStartVer = clientCacheStartVer;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
new file mode 100644
index 0000000..eac1120
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cache change requests to execute when receive {@link DynamicCacheChangeBatch} event.
+ */
+public class ExchangeActions {
+    /** */
+    private Map<String, ActionData> cachesToStart;
+
+    /** */
+    private Map<String, ActionData> clientCachesToStart;
+
+    /** */
+    private Map<String, ActionData> cachesToStop;
+
+    /** */
+    private Map<String, ActionData> cachesToClose;
+
+    /** */
+    private Map<String, ActionData> cachesToResetLostParts;
+
+    /** */
+    private ClusterState newState;
+
+    /**
+     * @return {@code True} if server nodes should not participate in exchange.
+     */
+    boolean clientOnlyExchange() {
+        return F.isEmpty(cachesToStart) &&
+            F.isEmpty(cachesToStop) &&
+            F.isEmpty(cachesToResetLostParts);
+    }
+
+    /**
+     * @param nodeId Local node ID.
+     * @return Close cache requests.
+     */
+    List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) {
+        List<DynamicCacheChangeRequest> res = null;
+
+        if (cachesToClose != null) {
+            for (ActionData req : cachesToClose.values()) {
+                if (nodeId.equals(req.req.initiatingNodeId())) {
+                    if (res == null)
+                        res = new ArrayList<>(cachesToClose.size());
+
+                    res.add(req.req);
+                }
+            }
+        }
+
+        return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
+    }
+
+    /**
+     * @return New caches start requests.
+     */
+    Collection<ActionData> cacheStartRequests() {
+        return cachesToStart != null ? cachesToStart.values() : Collections.<ActionData>emptyList();
+    }
+
+    /**
+     * @return Start cache requests.
+     */
+    Collection<ActionData> newAndClientCachesStartRequests() {
+        if (cachesToStart != null || clientCachesToStart != null) {
+            List<ActionData> res = new ArrayList<>();
+
+            if (cachesToStart != null)
+                res.addAll(cachesToStart.values());
+
+            if (clientCachesToStart != null)
+                res.addAll(clientCachesToStart.values());
+
+            return res;
+        }
+
+        return Collections.emptyList();
+    }
+
+    /**
+     * @return Stop cache requests.
+     */
+    Collection<ActionData> cacheStopRequests() {
+        return cachesToStop != null ? cachesToStop.values() : Collections.<ActionData>emptyList();
+    }
+
+    /**
+     * @param ctx Context.
+     */
+    public void completeRequestFutures(GridCacheSharedContext ctx) {
+        completeRequestFutures(cachesToStart, ctx);
+        completeRequestFutures(cachesToStop, ctx);
+        completeRequestFutures(cachesToClose, ctx);
+        completeRequestFutures(clientCachesToStart, ctx);
+        completeRequestFutures(cachesToResetLostParts, ctx);
+    }
+
+    /**
+     * @param map Actions map.
+     * @param ctx Context.
+     */
+    private void completeRequestFutures(Map<String, ActionData> map, GridCacheSharedContext
ctx) {
+        if (map != null) {
+            for (ActionData req : map.values())
+                ctx.cache().completeCacheStartFuture(req.req, null);
+        }
+    }
+
+    /**
+     * @return {@code True} if have cache stop requests.
+     */
+    public boolean hasStop() {
+        return !F.isEmpty(cachesToStop);
+    }
+
+    /**
+     * @return Caches to reset lost partitions for.
+     */
+    public Set<String> cachesToResetLostPartitions() {
+        Set<String> caches = null;
+        
+        if (cachesToResetLostParts != null)
+            caches = new HashSet<>(cachesToResetLostParts.keySet());
+
+        return caches != null ? caches : Collections.<String>emptySet();
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return {@code True} if cache stop was requested.
+     */
+    public boolean cacheStopped(int cacheId) {
+        if (cachesToStop != null) {
+            for (ActionData cache : cachesToStop.values()) {
+                if (cache.desc.cacheId() == cacheId)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return {@code True} if cache start was requested.
+     */
+    public boolean cacheStarted(int cacheId) {
+        if (cachesToStart != null) {
+            for (ActionData cache : cachesToStart.values()) {
+                if (cache.desc.cacheId() == cacheId)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param nodeId Local node ID.
+     * @return {@code True} if client cache was started.
+     */
+    public boolean clientCacheStarted(UUID nodeId) {
+        if (clientCachesToStart != null) {
+            for (ActionData cache : clientCachesToStart.values()) {
+                if (nodeId.equals(cache.req.initiatingNodeId()))
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param state New cluster state.
+     */
+    void newClusterState(ClusterState state) {
+        assert state != null;
+
+        newState = state;
+    }
+
+    /**
+     * @return New cluster state if state change was requested.
+     */
+    @Nullable public ClusterState newClusterState() {
+        return newState;
+    }
+
+    /**
+     * @param map Actions map.
+     * @param req Request.
+     * @param desc Cache descriptor.
+     * @return Actions map.
+     */
+    private Map<String, ActionData> add(Map<String, ActionData> map,
+        DynamicCacheChangeRequest req,
+        DynamicCacheDescriptor desc) {
+        assert req != null;
+        assert desc != null;
+
+        if (map == null)
+            map = new HashMap<>();
+
+        ActionData old = map.put(req.cacheName(), new ActionData(req, desc));
+
+        assert old == null : old;
+
+        return map;
+    }
+
+    /**
+     * @param req Request.
+     * @param desc Cache descriptor.
+     */
+    void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.start() : req;
+
+        cachesToStart = add(cachesToStart, req, desc);
+    }
+
+    /**
+     * @param req Request.
+     * @param desc Cache descriptor.
+     */
+    void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc)
{
+        assert req.start() : req;
+
+        clientCachesToStart = add(clientCachesToStart, req, desc);
+    }
+
+    /**
+     * @param req Request.
+     * @param desc Cache descriptor.
+     */
+    void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.stop() : req;
+
+        cachesToStop = add(cachesToStop, req, desc);
+    }
+
+    /**
+     * @param req Request.
+     * @param desc Cache descriptor.
+     */
+    void addCacheToClose(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.close() : req;
+
+        cachesToClose = add(cachesToClose, req, desc);
+    }
+
+    /**
+     * @param req Request.
+     * @param desc Cache descriptor.
+     */
+    void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor
desc) {
+        assert req.resetLostPartitions() : req;
+
+        cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
+    }
+
+    /**
+     * @return {@code True} if there are no cache change actions.
+     */
+    public boolean empty() {
+        return F.isEmpty(cachesToStart) &&
+            F.isEmpty(clientCachesToStart) &&
+            F.isEmpty(cachesToStop) &&
+            F.isEmpty(cachesToClose) &&
+            F.isEmpty(cachesToResetLostParts);
+    }
+
+    /**
+     *
+     */
+    static class ActionData {
+        /** */
+        private DynamicCacheChangeRequest req;
+
+        /** */
+        private DynamicCacheDescriptor desc;
+
+        /**
+         * @param req Request.
+         * @param desc Cache descriptor.
+         */
+        ActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+            assert req != null;
+            assert desc != null;
+
+            this.req = req;
+            this.desc = desc;
+        }
+
+        /**
+         * @return Request.
+         */
+        public DynamicCacheChangeRequest request() {
+            return req;
+        }
+
+        /**
+         * @return Cache descriptor.
+         */
+        public DynamicCacheDescriptor descriptor() {
+            return desc;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index a0489fc..aa503b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -234,8 +234,11 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** */
     private CountDownLatch startLatch = new CountDownLatch(1);
 
-    /** Start topology version. */
-    private AffinityTopologyVersion startTopVer;
+    /** Topology version when cache was started on local node. */
+    private AffinityTopologyVersion locStartTopVer;
+
+    /** */
+    private UUID rcvdFrom;
 
     /** Dynamic cache deployment ID. */
     private IgniteUuid dynamicDeploymentId;
@@ -289,6 +292,8 @@ public class GridCacheContext<K, V> implements Externalizable {
         GridCacheSharedContext sharedCtx,
         CacheConfiguration cacheCfg,
         CacheType cacheType,
+        AffinityTopologyVersion locStartTopVer,
+        UUID rcvdFrom,
         boolean affNode,
         boolean updatesAllowed,
         MemoryPolicy memPlc,
@@ -316,6 +321,7 @@ public class GridCacheContext<K, V> implements Externalizable {
         assert ctx != null;
         assert sharedCtx != null;
         assert cacheCfg != null;
+        assert locStartTopVer != null : cacheCfg.getName();
 
         assert evtMgr != null;
         assert storeMgr != null;
@@ -333,6 +339,8 @@ public class GridCacheContext<K, V> implements Externalizable {
         this.sharedCtx = sharedCtx;
         this.cacheCfg = cacheCfg;
         this.cacheType = cacheType;
+        this.locStartTopVer = locStartTopVer;
+        this.rcvdFrom = rcvdFrom;
         this.affNode = affNode;
         this.updatesAllowed = updatesAllowed;
         this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
@@ -452,17 +460,19 @@ public class GridCacheContext<K, V> implements Externalizable
{
     }
 
     /**
-     * @return Start topology version.
+     * @return Node ID cache was received from.
      */
-    public AffinityTopologyVersion startTopologyVersion() {
-        return startTopVer;
+    public UUID receivedFrom() {
+        return rcvdFrom;
     }
 
     /**
-     * @param startTopVer Start topology version.
+     * @return Topology version when cache was started on local node.
      */
-    public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
-        this.startTopVer = startTopVer;
+    public AffinityTopologyVersion startTopologyVersion() {
+        assert locStartTopVer != null : name();
+
+        return locStartTopVer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4775ea1..04c647f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -235,34 +235,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (customMsg instanceof DynamicCacheChangeBatch) {
                         DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
 
-                        Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
-
-                        // Validate requests to check if event should trigger partition exchange.
-                        for (final DynamicCacheChangeRequest req : batch.requests()) {
-                            if (req.exchangeNeeded())
-                                valid.add(req);
-                            else {
-                                IgniteInternalFuture<?> fut = null;
-
-                                if (req.cacheFutureTopologyVersion() != null)
-                                    fut = affinityReadyFuture(req.cacheFutureTopologyVersion());
-
-                                if (fut == null || fut.isDone())
-                                    cctx.cache().completeStartFuture(req);
-                                else {
-                                    fut.listen(new CI1<IgniteInternalFuture<?>>()
{
-                                        @Override public void apply(IgniteInternalFuture<?>
fut) {
-                                            cctx.cache().completeStartFuture(req);
-                                        }
-                                    });
-                                }
-                            }
-                        }
+                        ExchangeActions exchActions = batch.exchangeActions();
 
-                        if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange()))
{
+                        if (exchActions != null) {
                             exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 
-                            exchFut = exchangeFuture(exchId, evt, cache, valid, null);
+                            exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
                         }
                     }
                     else if (customMsg instanceof CacheAffinityChangeMessage) {
@@ -385,10 +363,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         assert startTime > 0;
 
         // Generate dummy discovery event for local node joining.
-        T2<DiscoveryEvent, DiscoCache> localJoin = cctx.discovery().localJoin();
+        T2<DiscoveryEvent, DiscoCache> locJoin = cctx.discovery().localJoin();
 
-        DiscoveryEvent discoEvt = localJoin.get1();
-        DiscoCache discoCache = localJoin.get2();
+        DiscoveryEvent discoEvt = locJoin.get1();
+        DiscoCache discoCache = locJoin.get2();
 
         GridDhtPartitionExchangeId exchId = initialExchangeId();
 
@@ -488,8 +466,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
             }
 
+            AffinityTopologyVersion nodeStartVer = new AffinityTopologyVersion(discoEvt.topologyVersion(),
0);
+
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.startTopologyVersion() == null)
+                if (nodeStartVer.equals(cacheCtx.startTopologyVersion()))
                     cacheCtx.preloader().onInitialExchangeComplete(null);
             }
 
@@ -917,7 +897,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (exchId != null) {
                         AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
 
-                        ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion())
<= 0;
+                        ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
                     }
                     else
                         ready = cacheCtx.started();
@@ -1123,25 +1103,25 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
      * @param exchId Exchange ID.
      * @param discoEvt Discovery event.
      * @param cache Discovery data cache.
-     * @param reqs Cache change requests.
+     * @param exchActions Cache change actions.
      * @param affChangeMsg Affinity change message.
      * @return Exchange future.
      */
     private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
         @Nullable DiscoveryEvent discoEvt,
         @Nullable DiscoCache cache,
-        @Nullable Collection<DynamicCacheChangeRequest> reqs,
+        @Nullable ExchangeActions exchActions,
         @Nullable CacheAffinityChangeMessage affChangeMsg) {
         GridDhtPartitionsExchangeFuture fut;
 
         GridDhtPartitionsExchangeFuture old = exchFuts.addx(
-            fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs, affChangeMsg));
+            fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, exchActions,
affChangeMsg));
 
         if (old != null) {
             fut = old;
 
-            if (reqs != null)
-                fut.cacheChangeRequests(reqs);
+            if (exchActions != null)
+                fut.exchangeActions(exchActions);
 
             if (affChangeMsg != null)
                 fut.affinityChangeMessage(affChangeMsg);
@@ -1320,9 +1300,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
-                    if (cacheCtx != null && cacheCtx.startTopologyVersion() != null
&&
-                        entry.getValue() != null &&
-                        entry.getValue().topologyVersion() != null && // Backward
compatibility.
+                    if (cacheCtx != null &&
                         cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion())
> 0)
                         continue;
 


Mime
View raw message