ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4154
Date Wed, 16 Nov 2016 12:18:33 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4154-opt2 6e1028bde -> 160cb202a


ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/160cb202
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/160cb202
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/160cb202

Branch: refs/heads/ignite-4154-opt2
Commit: 160cb202ad2002087629ba3773bed4b8f66d3498
Parents: 6e1028b
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Nov 16 15:18:03 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Nov 16 15:18:03 2016 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |   2 +-
 .../affinity/AffinityCalculateCache.java        |  87 +++++++++
 .../affinity/AffinityConfiguration.java         |  59 ++++++
 .../affinity/GridAffinityAssignmentCache.java   |  52 ++++--
 .../cache/CacheAffinitySharedManager.java       | 182 ++++++++++++++++---
 .../cache/GridCacheAffinityManager.java         |   7 +-
 6 files changed, 341 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index f65bf52..354ea10 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -2402,7 +2402,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K,
V> {
     /**
      *  Filter that accepts all nodes.
      */
-    public static class IgniteAllNodesPredicate  implements IgnitePredicate<ClusterNode>
{
+    public static class IgniteAllNodesPredicate implements IgnitePredicate<ClusterNode>
{
         /** */
         private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
new file mode 100644
index 0000000..80ee238
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class AffinityCalculateCache {
+    /** */
+    private final Map<Object, List<List<ClusterNode>>> assignCache = new
HashMap<>();
+
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final DiscoveryEvent discoEvt;
+
+    /** */
+    private Map<Integer, List<List<ClusterNode>>> grpAssign;
+
+    public AffinityCalculateCache(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt)
{
+        this.topVer = topVer;
+        this.discoEvt = discoEvt;
+    }
+
+    public List<List<ClusterNode>> assignPartitions(AffinityFunction aff,
+        int backups,
+        List<ClusterNode> nodes,
+        List<List<ClusterNode>> prevAssignment,
+        @Nullable Integer affGrp,
+        Object affKey) {
+        if (affGrp != null && grpAssign != null) {
+            List<List<ClusterNode>> calcAssign = grpAssign.get(affGrp);
+
+            if (calcAssign != null)
+                return calcAssign;
+        }
+
+        AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(nodes,
+            prevAssignment,
+            discoEvt,
+            topVer,
+            backups);
+
+        List<List<ClusterNode>> assign = aff.assignPartitions(ctx);
+
+        List<List<ClusterNode>> assign0 = assignCache.get(affKey);
+
+        if (assign0 != null && assign0.equals(assign))
+            assign = assign0;
+        else
+            assignCache.put(affKey, assign);
+
+        if (affGrp != null) {
+            if (grpAssign == null)
+                grpAssign = new HashMap<>();
+
+            grpAssign.put(affGrp, assign);
+        }
+
+        return assign;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java
new file mode 100644
index 0000000..c9ebbbc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ *
+ */
+public class AffinityConfiguration {
+    /** */
+    private final AffinityFunction aff;
+
+    /** */
+    private final IgnitePredicate<ClusterNode> nodeFilter;
+
+    /** */
+    private final int backups;
+
+    /**
+     * @param aff
+     * @param nodeFilter
+     * @param backups
+     */
+    public AffinityConfiguration(AffinityFunction aff, IgnitePredicate<ClusterNode>
nodeFilter, int backups) {
+        this.aff = aff;
+        this.nodeFilter = nodeFilter;
+        this.backups = backups;
+    }
+
+    public AffinityFunction affinityFunction() {
+        return aff;
+    }
+
+    public IgnitePredicate<ClusterNode> nodeFilter() {
+        return nodeFilter;
+    }
+
+    public int backups() {
+        return backups;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a388c7a..3b62858 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
 import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.GridKernalContext;
@@ -110,35 +111,35 @@ public class GridAffinityAssignmentCache {
     /** */
     private final Object similarAffKey;
 
+    /** */
+    private final Integer affGrp;
+
     /**
      * Constructs affinity cached calculations.
      *
      * @param ctx Kernal context.
      * @param cacheName Cache name.
-     * @param aff Affinity function.
-     * @param nodeFilter Node filter.
-     * @param backups Number of backups.
+     * @param affCfg Affinity configuration.
      * @param locCache Local cache flag.
      */
     @SuppressWarnings("unchecked")
     public GridAffinityAssignmentCache(GridKernalContext ctx,
         String cacheName,
-        AffinityFunction aff,
-        IgnitePredicate<ClusterNode> nodeFilter,
-        int backups,
+        AffinityConfiguration affCfg,
         boolean locCache)
     {
         assert ctx != null;
-        assert aff != null;
-        assert nodeFilter != null;
 
         this.ctx = ctx;
-        this.aff = aff;
-        this.nodeFilter = nodeFilter;
         this.cacheName = cacheName;
-        this.backups = backups;
+        this.aff = affCfg.affinityFunction();
+        this.nodeFilter = affCfg.nodeFilter();
+        this.backups = affCfg.backups();
         this.locCache = locCache;
 
+        assert aff != null;
+        assert nodeFilter != null;
+
         cacheId = CU.cacheId(cacheName);
 
         log = ctx.log(GridAffinityAssignmentCache.class);
@@ -148,8 +149,9 @@ public class GridAffinityAssignmentCache {
         head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
 
         similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt);
-
         assert similarAffKey != null;
+
+        affGrp = locCache ? null : ctx.cache().context().affinity().equalAffinityGroup(cacheId,
affCfg);
     }
 
     /**
@@ -255,7 +257,9 @@ public class GridAffinityAssignmentCache {
      * @return Affinity assignments.
      */
     @SuppressWarnings("IfMayBeConditional")
-    public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer,
DiscoveryEvent discoEvt) {
+    public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer,
+        DiscoveryEvent discoEvt,
+        AffinityCalculateCache cache) {
         if (log.isDebugEnabled())
             log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId()
+
                 ", discoEvt=" + discoEvt + ']');
@@ -273,20 +277,28 @@ public class GridAffinityAssignmentCache {
         else
             sorted = Collections.singletonList(ctx.discovery().localNode());
 
-        List<List<ClusterNode>> assignment;
+        List<List<ClusterNode>> assignment = null;
 
         if (prevAssignment != null && discoEvt != null) {
             boolean affNode = CU.affinityNode(discoEvt.eventNode(), nodeFilter);
 
             if (!affNode)
                 assignment = prevAssignment;
-            else
-                assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted,
prevAssignment,
-                    discoEvt, topVer, backups));
         }
-        else
-            assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted,
prevAssignment, discoEvt,
-                topVer, backups));
+
+        if (assignment == null) {
+            if (cache != null)
+                assignment = cache.assignPartitions(aff, backups, sorted, prevAssignment,
affGrp, similarAffKey);
+            else {
+                AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(sorted,
+                    prevAssignment,
+                    discoEvt,
+                    topVer,
+                    backups);
+
+                assignment = aff.assignPartitions(ctx);
+            }
+        }
 
         assert assignment != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index b50479d..bae3e9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -30,12 +30,18 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityNodeAddressHashResolver;
+import org.apache.ignite.cache.affinity.AffinityNodeIdHashResolver;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityCalculateCache;
+import org.apache.ignite.internal.processors.affinity.AffinityConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
@@ -97,6 +103,109 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture>
         pendingAssignmentFetchFuts = new ConcurrentHashMap8<>();
 
+    /** */
+    private final Map<Integer, EqualAffinityCacheGroup> eqAffCacheGroups = new HashMap<>();
+
+    /**
+     *
+     */
+    private class EqualAffinityCacheGroup {
+        /** */
+        private final Map<Integer, AffinityConfiguration> caches = new HashMap<>();
+
+        private EqualAffinityCacheGroup(Integer cacheId, AffinityConfiguration cfg) {
+            caches.put(cacheId, cfg);
+        }
+
+        void add(Integer cacheId, AffinityConfiguration cfg) {
+            caches.put(cacheId, cfg);
+        }
+
+        /**
+         * @param cfg Affinity configuration.
+         * @return {@code True} if cache configurations have exactly the same affinity configuration.
+         */
+        boolean equalAffinity(AffinityConfiguration cfg) {
+            assert !caches.isEmpty();
+
+            AffinityConfiguration cfg0 = F.firstValue(caches);
+
+            assert cfg0 != null;
+
+            if (cfg0.backups() != cfg.backups())
+                return false;
+
+            if (!cfg0.nodeFilter().equals(cfg.nodeFilter()))
+                return false;
+
+            if (cfg0.affinityFunction().getClass() != cfg.affinityFunction().getClass())
+                return false;
+
+            if (cfg0.affinityFunction() == cfg.affinityFunction())
+                return true;
+
+            if (cfg0.affinityFunction() instanceof RendezvousAffinityFunction) {
+                RendezvousAffinityFunction f1 = (RendezvousAffinityFunction)cfg0.affinityFunction();
+                RendezvousAffinityFunction f2 = (RendezvousAffinityFunction)cfg.affinityFunction();
+
+                if (f1.getHashIdResolver() != f2.getHashIdResolver()) {
+                    if (f1.getHashIdResolver() == null || f2.getHashIdResolver() == null)
+                        return false;
+
+                    boolean eqRslvr = (f1.getHashIdResolver().getClass() == f2.getHashIdResolver().getClass())
&&
+                        (f1.getHashIdResolver().getClass() == AffinityNodeAddressHashResolver.class
||
+                            f1.getHashIdResolver().getClass() == AffinityNodeIdHashResolver.class);
+
+                    if (!eqRslvr)
+                        return false;
+                }
+
+                return f1.partitions() == f2.partitions() &&
+                    f1.isExcludeNeighbors() == f2.isExcludeNeighbors() &&
+                    f1.getBackupFilter() == f2.getBackupFilter() &&
+                    f1.getAffinityBackupFilter() == f2.getAffinityBackupFilter();
+            }
+            else if (cfg0.affinityFunction() instanceof FairAffinityFunction) {
+                FairAffinityFunction f1 = (FairAffinityFunction)cfg0.affinityFunction();
+                FairAffinityFunction f2 = (FairAffinityFunction)cfg.affinityFunction();
+
+                return f1.partitions() == f2.partitions() &&
+                    f1.isExcludeNeighbors() == f2.isExcludeNeighbors() &&
+                    f1.getBackupFilter() == f2.getBackupFilter() &&
+                    f1.getAffinityBackupFilter() == f2.getAffinityBackupFilter();
+            }
+            else
+                return false;
+        }
+    }
+
+    @Nullable public Integer equalAffinityGroup(Integer cacheId, AffinityConfiguration cfg)
{
+        if (!(cfg.affinityFunction().getClass() == RendezvousAffinityFunction.class ||
+            cfg.affinityFunction().getClass() == FairAffinityFunction.class))
+            return null;
+
+        synchronized (eqAffCacheGroups) {
+            for (Map.Entry<Integer, EqualAffinityCacheGroup> e : eqAffCacheGroups.entrySet())
{
+                EqualAffinityCacheGroup grp = e.getValue();
+
+                if (grp.caches.containsKey(cacheId))
+                    return e.getKey();
+
+                if (e.getValue().equalAffinity(cfg)) {
+                    e.getValue().add(cacheId, cfg);
+
+                    return e.getKey();
+                }
+            }
+
+            Integer grp = eqAffCacheGroups.size();
+
+            eqAffCacheGroups.put(grp, new EqualAffinityCacheGroup(cacheId, cfg));
+
+            return grp;
+        }
+    }
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -372,6 +481,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         Set<Integer> stoppedCaches = null;
 
+        AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
fut.discoveryEvent());
+
         for (DynamicCacheChangeRequest req : reqs) {
             if (!(req.clientStartOnly() || req.close()))
                 clientOnly = false;
@@ -394,7 +505,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
 
                         if (clientCacheStarted)
-                            initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
+                            initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign,
affCache);
                         else if (!req.clientStartOnly()) {
                             assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
@@ -403,7 +514,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             assert aff.lastVersion().equals(AffinityTopologyVersion.NONE)
: aff.lastVersion();
 
                             List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
-                                fut.discoveryEvent());
+                                fut.discoveryEvent(),
+                                affCache);
 
                             aff.initialize(fut.topologyVersion(), assignment);
                         }
@@ -753,7 +865,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             assert old == null : old;
 
-            List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(),
fut.discoveryEvent());
+            List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                null);
 
             cache.affinity().initialize(fut.topologyVersion(), newAff);
         }
@@ -785,13 +899,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         if (crd && lateAffAssign) {
+            final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+                fut.discoveryEvent());
+
             forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
                 @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException
{
                     CacheHolder cache = cache(fut, desc);
 
                     if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE))
{
                         List<List<ClusterNode>> assignment =
-                            cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                            cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(),
affCache);
 
                         cache.affinity().initialize(fut.topologyVersion(), assignment);
                     }
@@ -799,10 +916,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             });
         }
         else {
+            final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+                fut.discoveryEvent());
+
             forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>()
{
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException
{
                     if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
-                        initAffinity(aff, fut, false);
+                        initAffinity(aff, fut, false, affCache);
                 }
             });
         }
@@ -814,10 +934,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fetch Force fetch flag.
      * @throws IgniteCheckedException If failed.
      */
-    private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture
fut, boolean fetch)
+    private void initAffinity(GridAffinityAssignmentCache aff,
+        GridDhtPartitionsExchangeFuture fut,
+        boolean fetch,
+        AffinityCalculateCache affCache)
         throws IgniteCheckedException {
         if (!fetch && canCalculateAffinity(aff, fut)) {
-            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
fut.discoveryEvent());
+            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
fut.discoveryEvent(), affCache);
 
             aff.initialize(fut.topologyVersion(), assignment);
         }
@@ -872,13 +995,18 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (lateAffAssign) {
             if (locJoin) {
                 if (crd) {
+                    final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+                        fut.discoveryEvent());
+
                     forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>()
{
                         @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws
IgniteCheckedException {
                             AffinityTopologyVersion topVer = fut.topologyVersion();
 
                             CacheHolder cache = cache(fut, cacheDesc);
 
-                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
fut.discoveryEvent());
+                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
+                                fut.discoveryEvent(),
+                                affCache);
 
                             cache.affinity().initialize(topVer, newAff);
                         }
@@ -939,6 +1067,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>();
 
+        AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
fut.discoveryEvent());
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal())
                 continue;
@@ -946,8 +1076,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             DynamicCacheDescriptor cacheDesc = registeredCaches.get(cacheCtx.cacheId());
 
             if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) {
-                List<List<ClusterNode>> assignment =
-                    cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(),
fut.discoveryEvent());
+                List<List<ClusterNode>> assignment = cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(),
+                    fut.discoveryEvent(),
+                    affCache);
 
                 cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment);
             }
@@ -990,7 +1121,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         GridDhtAffinityAssignmentResponse res = fetchFut.get();
 
         if (res == null) {
-            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent());
+            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(),
null);
 
             affCache.initialize(topVer, aff);
         }
@@ -1002,7 +1133,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             else {
                 assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
 
-                affCache.calculate(topVer, fut.discoveryEvent());
+                affCache.calculate(topVer, fut.discoveryEvent(), null);
             }
 
             List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery());
@@ -1028,11 +1159,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         boolean centralizedAff;
 
         if (lateAffAssign) {
+            AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+                fut.discoveryEvent());
+
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 if (cacheCtx.isLocal())
                     continue;
 
-                cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(),
affCache);
             }
 
             centralizedAff = true;
@@ -1061,11 +1195,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         long start = System.currentTimeMillis();
 
+        AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
fut.discoveryEvent());
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal())
                 continue;
 
-            initAffinity(cacheCtx.affinity().affinityCache(), fut, false);
+            initAffinity(cacheCtx.affinity().affinityCache(), fut, false, affCache);
         }
 
         log.info("Affinity init time [topVer=" + fut.topologyVersion() + ", time=" + (System.currentTimeMillis()
- start) + ']');
@@ -1080,13 +1216,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new
ArrayList<>();
 
+        final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
fut.discoveryEvent());
+
         forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
             @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException
{
                 CacheHolder cache = caches.get(desc.cacheId());
 
                 if (cache != null) {
                     if (cache.client())
-                        cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                        cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(),
affCache);
 
                     return;
                 }
@@ -1137,7 +1275,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             throws IgniteCheckedException {
                             fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut);
 
-                            aff.calculate(fut.topologyVersion(), fut.discoveryEvent());
+                            aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
 
                             affFut.onDone(fut.topologyVersion());
                         }
@@ -1218,7 +1356,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
-        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+        final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
fut.discoveryEvent());
 
         if (!crd) {
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1261,7 +1399,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
         boolean latePrimary,
-        Map<Object, List<List<ClusterNode>>> affCache)
+        AffinityCalculateCache affCache)
         throws IgniteCheckedException
     {
         assert lateAffAssign;
@@ -1277,7 +1415,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert aff.idealAssignment() != null : "Previous assignment is not available.";
 
-        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent());
+        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(),
affCache);
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {
@@ -1308,7 +1446,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (newAssignment == null)
             newAssignment = idealAssignment;
 
-        aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+        aff.initialize(fut.topologyVersion(), newAssignment);
     }
 
     /**
@@ -1732,9 +1870,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             GridAffinityAssignmentCache aff = new GridAffinityAssignmentCache(cctx.kernalContext(),
                 ccfg.getName(),
-                affFunc,
-                ccfg.getNodeFilter(),
-                ccfg.getBackups(),
+                new AffinityConfiguration(affFunc, ccfg.getNodeFilter(), ccfg.getBackups()),
                 ccfg.getCacheMode() == LOCAL);
 
             return new CacheHolder2(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff);

http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index c6e7ee6..6b47f82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.util.GridLeanSet;
@@ -67,9 +68,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
 
         aff = new GridAffinityAssignmentCache(cctx.kernalContext(),
             cctx.namex(),
-            affFunction,
-            cctx.config().getNodeFilter(),
-            cctx.config().getBackups(),
+            new AffinityConfiguration(affFunction, cctx.config().getNodeFilter(), cctx.config().getBackups()),
             cctx.isLocal());
     }
 
@@ -77,7 +76,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
     @Override protected void onKernalStart0() throws IgniteCheckedException {
         if (cctx.isLocal())
             // No discovery event needed for local affinity.
-            aff.calculate(LOC_CACHE_TOP_VER, null);
+            aff.calculate(LOC_CACHE_TOP_VER, null, null);
     }
 
     /** {@inheritDoc} */


Mime
View raw message