ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [35/50] [abbrv] ignite git commit: Exclude neighbors flag for affinity functions. This closes #80
Date Wed, 28 Oct 2015 12:47:35 GMT
Exclude neighbors flag for affinity functions. This closes #80


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a180027
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a180027
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a180027

Branch: refs/heads/ignite-1537
Commit: 5a180027b174c1b76ab71a789633fe5f80bc9180
Parents: a4d625d
Author: agura <agura@gridgain.com>
Authored: Tue Oct 27 15:57:59 2015 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Tue Oct 27 16:01:23 2015 +0300

----------------------------------------------------------------------
 .../ClientAbstractMultiThreadedSelfTest.java    |   3 +-
 .../affinity/fair/FairAffinityFunction.java     | 497 ++++++++++++++-----
 .../rendezvous/RendezvousAffinityFunction.java  | 140 ++----
 .../processors/cache/GridCacheProcessor.java    |  13 -
 .../processors/cache/GridCacheUtils.java        |  50 ++
 .../AbstractAffinityFunctionSelfTest.java       | 293 +++++++++++
 .../affinity/AffinityClientNodeSelfTest.java    | 194 ++++++++
 ...ityFunctionBackupFilterAbstractSelfTest.java | 138 +++++
 ...unctionExcludeNeighborsAbstractSelfTest.java | 182 +++++++
 .../affinity/IgniteClientNodeAffinityTest.java  | 194 --------
 .../fair/FairAffinityDynamicCacheSelfTest.java  |  97 ++++
 ...airAffinityFunctionBackupFilterSelfTest.java |  35 ++
 ...ffinityFunctionExcludeNeighborsSelfTest.java |  31 ++
 .../fair/FairAffinityFunctionNodesSelfTest.java | 245 +++++++++
 .../fair/FairAffinityFunctionSelfTest.java      |  31 ++
 .../GridFairAffinityFunctionNodesSelfTest.java  | 245 ---------
 .../fair/GridFairAffinityFunctionSelfTest.java  | 270 ----------
 .../IgniteFairAffinityDynamicCacheSelfTest.java |  97 ----
 ...ousAffinityFunctionBackupFilterSelfTest.java |  35 ++
 ...ffinityFunctionExcludeNeighborsSelfTest.java |  32 ++
 .../RendezvousAffinityFunctionSelfTest.java     |  50 ++
 .../cache/CrossCacheTxRandomOperationsTest.java |   2 +-
 .../GridCacheAbstractLocalStoreSelfTest.java    |   5 +
 ...idCacheConfigurationConsistencySelfTest.java |  17 -
 ...dCachePartitionedAffinityFilterSelfTest.java | 143 ------
 .../dht/GridCacheDhtPreloadPutGetSelfTest.java  |   3 +
 ...unctionExcludeNeighborsAbstractSelfTest.java | 184 -------
 ...ffinityFunctionExcludeNeighborsSelfTest.java |  32 --
 ...xcludeNeighborsMultiNodeFullApiSelfTest.java |  36 ++
 ...tedFairAffinityMultiNodeFullApiSelfTest.java |  35 ++
 ...xcludeNeighborsMultiNodeFullApiSelfTest.java |  36 ++
 ...dezvousAffinityMultiNodeFullApiSelfTest.java |  36 ++
 .../IgniteCacheFullApiSelfTestSuite.java        |   8 +
 .../ignite/testsuites/IgniteCacheTestSuite.java |  16 +-
 .../testsuites/IgniteCacheTestSuite2.java       |  12 +-
 .../cache/IgniteCacheAbstractQuerySelfTest.java |   4 +
 36 files changed, 2021 insertions(+), 1420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java
index 9dd4d83..9f6bf2b 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java
@@ -254,8 +254,7 @@ public abstract class ClientAbstractMultiThreadedSelfTest extends GridCommonAbst
         final ConcurrentLinkedQueue<String> execQueue = new ConcurrentLinkedQueue<>();
 
         IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-            @Override
-            public void run() {
+            @Override public void run() {
                 long processed;
 
                 while ((processed = cnt.getAndIncrement()) < taskExecutionCount()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
index cc04875..b42b683 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.RandomAccess;
 import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionContext;
@@ -38,15 +39,38 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Fair affinity function which tries to ensure that all nodes get equal number of partitions with
  * minimum amount of reassignments between existing nodes.
+ * This function supports the following configuration:
+ * <ul>
+ * <li>
+ *      {@code partitions} - Number of partitions to spread across nodes.
+ * </li>
+ * <li>
+ *      {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors
+ *      from being backups of each other. This flag can be ignored in cases when topology has no enough nodes
+ *      for assign backups.
+ *      Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ * </li>
+ * <li>
+ *      {@code backupFilter} - Optional filter for back up nodes. If provided, then only
+ *      nodes that pass this filter will be selected as backup nodes. If not provided, then
+ *      primary and backup nodes will be selected out of all nodes available for this cache.
+ * </li>
+ * </ul>
  * <p>
- * Cache affinity can be configured for individual caches via
- * {@link CacheConfiguration#setAffinity(AffinityFunction)} method.
+ * Cache affinity can be configured for individual caches via {@link CacheConfiguration#getAffinity()} method.
  */
 @AffinityCentralizedFunction
 public class FairAffinityFunction implements AffinityFunction {
@@ -62,21 +86,165 @@ public class FairAffinityFunction implements AffinityFunction {
     /** Descending comparator. */
     private static final Comparator<PartitionSet> DESC_CMP = Collections.reverseOrder(ASC_CMP);
 
-    /** */
-    private final int parts;
+    /** Number of partitions. */
+    private int parts;
+
+    /** Exclude neighbors flag. */
+    private boolean exclNeighbors;
+
+    /** Exclude neighbors warning. */
+    private transient boolean exclNeighborsWarn;
+
+    /** Logger instance. */
+    @LoggerResource
+    private transient IgniteLogger log;
+
+    /** Optional backup filter. First node is primary, second node is a node being tested. */
+    private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
 
     /**
-     * Creates fair affinity with default partition count.
+     * Empty constructor with all defaults.
      */
     public FairAffinityFunction() {
-        this(DFLT_PART_CNT);
+        this(false);
+    }
+
+    /**
+     * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
+     * and specified number of backups.
+     * <p>
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+     *
+     * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+     *      of each other.
+     */
+    public FairAffinityFunction(boolean exclNeighbors) {
+        this(exclNeighbors, DFLT_PART_CNT);
     }
 
     /**
      * @param parts Number of partitions.
      */
     public FairAffinityFunction(int parts) {
+        this(false, parts);
+    }
+
+    /**
+     * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
+     * and specified number of backups and partitions.
+     * <p>
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+     *
+     * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+     *      of each other.
+     * @param parts Total number of partitions.
+     */
+    public FairAffinityFunction(boolean exclNeighbors, int parts) {
+        this(exclNeighbors, parts, null);
+    }
+
+    /**
+     * Initializes optional counts for replicas and backups.
+     * <p>
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+     *
+     * @param parts Total number of partitions.
+     * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
+     *      from all nodes that pass this filter. First argument for this filter is primary node, and second
+     *      argument is node being tested.
+     */
+    public FairAffinityFunction(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+        this(false, parts, backupFilter);
+    }
+
+    /**
+     * Private constructor.
+     *
+     * @param exclNeighbors Exclude neighbors flag.
+     * @param parts Partitions count.
+     * @param backupFilter Backup filter.
+     */
+    private FairAffinityFunction(boolean exclNeighbors, int parts,
+        IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+        A.ensure(parts > 0, "parts > 0");
+
+        this.exclNeighbors = exclNeighbors;
         this.parts = parts;
+        this.backupFilter = backupFilter;
+    }
+
+    /**
+     * Gets total number of key partitions. To ensure that all partitions are
+     * equally distributed across all nodes, please make sure that this
+     * number is significantly larger than a number of nodes. Also, partition
+     * size should be relatively small. Try to avoid having partitions with more
+     * than quarter million keys.
+     * <p>
+     * Note that for fully replicated caches this method should always
+     * return {@code 1}.
+     *
+     * @return Total partition count.
+     */
+    public int getPartitions() {
+        return parts;
+    }
+
+    /**
+     * Sets total number of partitions.
+     *
+     * @param parts Total number of partitions.
+     */
+    public void setPartitions(int parts) {
+        this.parts = parts;
+    }
+
+
+    /**
+     * Gets optional backup filter. If not {@code null}, backups will be selected
+     * from all nodes that pass this filter. First node passed to this filter is primary node,
+     * and second node is a node being tested.
+     * <p>
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+     *
+     * @return Optional backup filter.
+     */
+    @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() {
+        return backupFilter;
+    }
+
+    /**
+     * Sets optional backup filter. If provided, then backups will be selected from all
+     * nodes that pass this filter. First node being passed to this filter is primary node,
+     * and second node is a node being tested.
+     * <p>
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+     *
+     * @param backupFilter Optional backup filter.
+     */
+    public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+        this.backupFilter = backupFilter;
+    }
+
+    /**
+     * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+     * <p>
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+     *
+     * @return {@code True} if nodes residing on the same host may not act as backups of each other.
+     */
+    public boolean isExcludeNeighbors() {
+        return exclNeighbors;
+    }
+
+    /**
+     * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+     * <p>
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+     *
+     * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
+     */
+    public void setExcludeNeighbors(boolean exclNeighbors) {
+        this.exclNeighbors = exclNeighbors;
     }
 
     /** {@inheritDoc} */
@@ -89,14 +257,20 @@ public class FairAffinityFunction implements AffinityFunction {
             return Collections.nCopies(parts, Collections.singletonList(primary));
         }
 
-        List<List<ClusterNode>> assignment = createCopy(ctx);
+        Map<UUID, Collection<ClusterNode>> neighborhoodMap = exclNeighbors
+            ? GridCacheUtils.neighbors(ctx.currentTopologySnapshot())
+            : null;
+
+        List<List<ClusterNode>> assignment = createCopy(ctx, neighborhoodMap);
+
+        int backups = ctx.backups();
 
-        int tiers = Math.min(ctx.backups() + 1, topSnapshot.size());
+        int tiers = backups == Integer.MAX_VALUE ? topSnapshot.size() : Math.min(backups + 1, topSnapshot.size());
 
         // Per tier pending partitions.
         Map<Integer, Queue<Integer>> pendingParts = new HashMap<>();
 
-        FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot);
+        FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot, neighborhoodMap);
 
         for (int tier = 0; tier < tiers; tier++) {
             // Check if this is a new tier and add pending partitions.
@@ -104,23 +278,32 @@ public class FairAffinityFunction implements AffinityFunction {
 
             for (int part = 0; part < parts; part++) {
                 if (fullMap.assignments.get(part).size() < tier + 1) {
-                    if (pending == null) {
-                        pending = new LinkedList<>();
-
-                        pendingParts.put(tier, pending);
-                    }
+                    if (pending == null)
+                        pendingParts.put(tier, pending = new LinkedList<>());
 
                     if (!pending.contains(part))
                         pending.add(part);
-
                 }
             }
 
             // Assign pending partitions, if any.
-            assignPending(tier, pendingParts, fullMap, topSnapshot);
+            assignPending(tier, pendingParts, fullMap, topSnapshot, false);
 
             // Balance assignments.
-            balance(tier, pendingParts, fullMap, topSnapshot);
+            boolean balanced = balance(tier, pendingParts, fullMap, topSnapshot, false);
+
+            if (!balanced && exclNeighbors) {
+                assignPending(tier, pendingParts, fullMap, topSnapshot, true);
+
+                balance(tier, pendingParts, fullMap, topSnapshot, true);
+
+                if (!exclNeighborsWarn) {
+                    LT.warn(log, null, "Affinity function excludeNeighbors property is ignored " +
+                        "because topology has no enough nodes to assign backups.");
+
+                    exclNeighborsWarn = true;
+                }
+            }
         }
 
         return fullMap.assignments;
@@ -153,9 +336,14 @@ public class FairAffinityFunction implements AffinityFunction {
      * @param pendingMap Pending partitions per tier.
      * @param fullMap Full assignment map to modify.
      * @param topSnapshot Topology snapshot.
+     * @param allowNeighbors Allow neighbors nodes for partition.
      */
-    private void assignPending(int tier, Map<Integer, Queue<Integer>> pendingMap, FullAssignmentMap fullMap,
-        List<ClusterNode> topSnapshot) {
+    private void assignPending(int tier,
+        Map<Integer, Queue<Integer>> pendingMap,
+        FullAssignmentMap fullMap,
+        List<ClusterNode> topSnapshot,
+        boolean allowNeighbors)
+    {
         Queue<Integer> pending = pendingMap.get(tier);
 
         if (F.isEmpty(pending))
@@ -168,19 +356,18 @@ public class FairAffinityFunction implements AffinityFunction {
         PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, idealPartCnt, false);
 
         // First iterate over underloaded nodes.
-        assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false);
+        assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false, allowNeighbors);
 
         if (!pending.isEmpty() && !underloadedNodes.isEmpty()) {
             // Same, forcing updates.
-            assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true);
+            assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true, allowNeighbors);
         }
 
         if (!pending.isEmpty())
-            assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot);
-
-        assert pending.isEmpty();
+            assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot, allowNeighbors);
 
-        pendingMap.remove(tier);
+        if (pending.isEmpty())
+            pendingMap.remove(tier);
     }
 
     /**
@@ -192,6 +379,7 @@ public class FairAffinityFunction implements AffinityFunction {
      * @param underloadedNodes Underloaded nodes.
      * @param topSnapshot Topology snapshot.
      * @param force {@code True} if partitions should be moved.
+     * @param allowNeighbors Allow neighbors nodes for partition.
      */
     private void assignPendingToUnderloaded(
         int tier,
@@ -199,7 +387,8 @@ public class FairAffinityFunction implements AffinityFunction {
         FullAssignmentMap fullMap,
         PrioritizedPartitionMap underloadedNodes,
         Collection<ClusterNode> topSnapshot,
-        boolean force) {
+        boolean force,
+        boolean allowNeighbors) {
         Iterator<Integer> it = pendingMap.get(tier).iterator();
 
         int ideal = parts / topSnapshot.size();
@@ -212,7 +401,7 @@ public class FairAffinityFunction implements AffinityFunction {
 
                 assert node != null;
 
-                if (fullMap.assign(part, tier, node, force, pendingMap)) {
+                if (fullMap.assign(part, tier, node, pendingMap, force, allowNeighbors)) {
                     // We could add partition to partition map without forcing, remove partition from pending.
                     it.remove();
 
@@ -237,9 +426,10 @@ public class FairAffinityFunction implements AffinityFunction {
      * @param pendingMap Pending partitions per tier.
      * @param fullMap Full assignment map to modify.
      * @param topSnapshot Topology snapshot.
+     * @param allowNeighbors Allow neighbors nodes for partition.
      */
     private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> pendingMap,
-        FullAssignmentMap fullMap, List<ClusterNode> topSnapshot) {
+        FullAssignmentMap fullMap, List<ClusterNode> topSnapshot, boolean allowNeighbors) {
         Iterator<Integer> it = pendingMap.get(tier).iterator();
 
         int idx = 0;
@@ -254,7 +444,7 @@ public class FairAffinityFunction implements AffinityFunction {
             do {
                 ClusterNode node = topSnapshot.get(i);
 
-                if (fullMap.assign(part, tier, node, false, pendingMap)) {
+                if (fullMap.assign(part, tier, node, pendingMap, false, allowNeighbors)) {
                     it.remove();
 
                     assigned = true;
@@ -270,7 +460,7 @@ public class FairAffinityFunction implements AffinityFunction {
                 do {
                     ClusterNode node = topSnapshot.get(i);
 
-                    if (fullMap.assign(part, tier, node, true, pendingMap)) {
+                    if (fullMap.assign(part, tier, node, pendingMap, true, allowNeighbors)) {
                         it.remove();
 
                         assigned = true;
@@ -283,7 +473,7 @@ public class FairAffinityFunction implements AffinityFunction {
                 } while (i != idx);
             }
 
-            if (!assigned)
+            if (!assigned && (!exclNeighbors || exclNeighbors && allowNeighbors))
                 throw new IllegalStateException("Failed to find assignable node for partition.");
         }
     }
@@ -295,9 +485,10 @@ public class FairAffinityFunction implements AffinityFunction {
      * @param pendingParts Pending partitions per tier.
      * @param fullMap Full assignment map to modify.
      * @param topSnapshot Topology snapshot.
+     * @param allowNeighbors Allow neighbors nodes for partition.
      */
-    private void balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap,
-        Collection<ClusterNode> topSnapshot) {
+    private boolean balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap,
+        Collection<ClusterNode> topSnapshot, boolean allowNeighbors) {
         int idealPartCnt = parts / topSnapshot.size();
 
         Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier);
@@ -313,7 +504,7 @@ public class FairAffinityFunction implements AffinityFunction {
                     boolean assigned = false;
 
                     for (PartitionSet underloaded : underloadedNodes.assignments()) {
-                        if (fullMap.assign(part, tier, underloaded.node(), false, pendingParts)) {
+                        if (fullMap.assign(part, tier, underloaded.node(), pendingParts, false, allowNeighbors)) {
                             // Size of partition sets has changed.
                             if (overloaded.size() <= idealPartCnt)
                                 overloadedNodes.remove(overloaded.nodeId());
@@ -335,7 +526,7 @@ public class FairAffinityFunction implements AffinityFunction {
 
                     if (!assigned) {
                         for (PartitionSet underloaded : underloadedNodes.assignments()) {
-                            if (fullMap.assign(part, tier, underloaded.node(), true, pendingParts)) {
+                            if (fullMap.assign(part, tier, underloaded.node(), pendingParts, true, allowNeighbors)) {
                                 // Size of partition sets has changed.
                                 if (overloaded.size() <= idealPartCnt)
                                     overloadedNodes.remove(overloaded.nodeId());
@@ -366,6 +557,8 @@ public class FairAffinityFunction implements AffinityFunction {
                 break;
         }
         while (true);
+
+        return underloadedNodes.isEmpty();
     }
 
     /**
@@ -393,9 +586,12 @@ public class FairAffinityFunction implements AffinityFunction {
      * Creates copy of previous partition assignment.
      *
      * @param ctx Affinity function context.
+     * @param neighborhoodMap Neighbors nodes grouped by target node.
      * @return Assignment copy and per node partition map.
      */
-    private List<List<ClusterNode>> createCopy(AffinityFunctionContext ctx) {
+    private List<List<ClusterNode>> createCopy(AffinityFunctionContext ctx,
+        Map<UUID, Collection<ClusterNode>> neighborhoodMap)
+    {
         DiscoveryEvent discoEvt = ctx.discoveryEvent();
 
         UUID leftNodeId = (discoEvt == null || discoEvt.type() == EventType.EVT_NODE_JOINED)
@@ -411,26 +607,42 @@ public class FairAffinityFunction implements AffinityFunction {
 
             if (partNodes == null)
                 partNodesCp = new ArrayList<>();
-            else {
-                if (leftNodeId == null) {
-                    partNodesCp = new ArrayList<>(partNodes.size() + 1); // Node joined.
+            else
+                partNodesCp = copyAssigments(neighborhoodMap, partNodes, leftNodeId);
 
-                    partNodesCp.addAll(partNodes);
-                }
-                else {
-                    partNodesCp = new ArrayList<>(partNodes.size());
+            cp.add(partNodesCp);
+        }
+
+        return cp;
+    }
+
+    /**
+     * @param neighborhoodMap Neighbors nodes grouped by target node.
+     * @param partNodes Partition nodes.
+     * @param leftNodeId Left node id.
+     */
+    private List<ClusterNode> copyAssigments(Map<UUID, Collection<ClusterNode>> neighborhoodMap,
+        List<ClusterNode> partNodes, UUID leftNodeId) {
+        final List<ClusterNode> partNodesCp = new ArrayList<>(partNodes.size());
+
+        for (ClusterNode node : partNodes) {
+            if (node.id().equals(leftNodeId))
+                continue;
+
+            boolean containsNeighbor = false;
 
-                    for (ClusterNode affNode : partNodes) {
-                        if (!affNode.id().equals(leftNodeId))
-                            partNodesCp.add(affNode);
+            if (neighborhoodMap != null)
+                containsNeighbor = F.exist(neighborhoodMap.get(node.id()), new IgnitePredicate<ClusterNode>() {
+                    @Override public boolean apply(ClusterNode node) {
+                        return partNodesCp.contains(node);
                     }
-                }
-            }
+                });
 
-            cp.add(partNodesCp);
+            if (!containsNeighbor)
+                partNodesCp.add(node);
         }
 
-        return cp;
+        return partNodesCp;
     }
 
     /**
@@ -512,59 +724,11 @@ public class FairAffinityFunction implements AffinityFunction {
     }
 
     /**
-     * Constructs assignment map for specified tier.
-     *
-     * @param tier Tier number, -1 for all tiers altogether.
-     * @param assignment Assignment to construct map from.
-     * @param topSnapshot Topology snapshot.
-     * @return Assignment map.
-     */
-    private static Map<UUID, PartitionSet> assignments(int tier, List<List<ClusterNode>> assignment,
-        Collection<ClusterNode> topSnapshot) {
-        Map<UUID, PartitionSet> tmp = new LinkedHashMap<>();
-
-        for (int part = 0; part < assignment.size(); part++) {
-            List<ClusterNode> nodes = assignment.get(part);
-
-            assert nodes instanceof RandomAccess;
-
-            if (nodes.size() <= tier)
-                continue;
-
-            int start = tier < 0 ? 0 : tier;
-            int end = tier < 0 ? nodes.size() : tier + 1;
-
-            for (int i = start; i < end; i++) {
-                ClusterNode n = nodes.get(i);
-
-                PartitionSet set = tmp.get(n.id());
-
-                if (set == null) {
-                    set = new PartitionSet(n);
-
-                    tmp.put(n.id(), set);
-                }
-
-                set.add(part);
-            }
-        }
-
-        if (tmp.size() < topSnapshot.size()) {
-            for (ClusterNode node : topSnapshot) {
-                if (!tmp.containsKey(node.id()))
-                    tmp.put(node.id(), new PartitionSet(node));
-            }
-        }
-
-        return tmp;
-    }
-
-    /**
      * Full assignment map. Auxiliary data structure which maintains resulting assignment and temporary
      * maps consistent.
      */
     @SuppressWarnings("unchecked")
-    private static class FullAssignmentMap {
+    private class FullAssignmentMap {
         /** Per-tier assignment maps. */
         private Map<UUID, PartitionSet>[] tierMaps;
 
@@ -574,20 +738,28 @@ public class FairAffinityFunction implements AffinityFunction {
         /** Resulting assignment. */
         private List<List<ClusterNode>> assignments;
 
+        /** Neighborhood map. */
+        private final Map<UUID, Collection<ClusterNode>> neighborhoodMap;
+
         /**
          * @param tiers Number of tiers.
          * @param assignments Assignments to modify.
          * @param topSnapshot Topology snapshot.
+         * @param neighborhoodMap Neighbors nodes grouped by target node.
          */
-        private FullAssignmentMap(int tiers, List<List<ClusterNode>> assignments, Collection<ClusterNode> topSnapshot) {
+        private FullAssignmentMap(int tiers,
+            List<List<ClusterNode>> assignments,
+            Collection<ClusterNode> topSnapshot,
+            Map<UUID, Collection<ClusterNode>> neighborhoodMap)
+        {
             this.assignments = assignments;
-
-            tierMaps = new Map[tiers];
+            this.neighborhoodMap = neighborhoodMap;
+            this.tierMaps = new Map[tiers];
 
             for (int tier = 0; tier < tiers; tier++)
-                tierMaps[tier] = assignments(tier, assignments, topSnapshot);
+                tierMaps[tier] = assignments(tier, topSnapshot);
 
-            fullMap = assignments(-1, assignments, topSnapshot);
+            fullMap = assignments(-1, topSnapshot);
         }
 
         /**
@@ -599,14 +771,20 @@ public class FairAffinityFunction implements AffinityFunction {
          * @param part Partition to assign.
          * @param tier Tier number to assign.
          * @param node Node to move partition to.
-         * @param force Force flag.
          * @param pendingParts per tier pending partitions map.
+         * @param force Force flag.
+         * @param allowNeighbors Allow neighbors nodes for partition.
          * @return {@code True} if assignment succeeded.
          */
-        boolean assign(int part, int tier, ClusterNode node, boolean force, Map<Integer, Queue<Integer>> pendingParts) {
+        boolean assign(int part,
+            int tier,
+            ClusterNode node,
+            Map<Integer, Queue<Integer>> pendingParts, boolean force,
+            boolean allowNeighbors)
+        {
             UUID nodeId = node.id();
 
-            if (!fullMap.get(nodeId).contains(part)) {
+            if (isAssignable(part, tier, node, allowNeighbors)) {
                 tierMaps[tier].get(nodeId).add(part);
 
                 fullMap.get(nodeId).add(part);
@@ -656,11 +834,8 @@ public class FairAffinityFunction implements AffinityFunction {
 
                         Queue<Integer> pending = pendingParts.get(t);
 
-                        if (pending == null) {
-                            pending = new LinkedList<>();
-
-                            pendingParts.put(t, pending);
-                        }
+                        if (pending == null)
+                            pendingParts.put(t, pending = new LinkedList<>());
 
                         pending.add(part);
 
@@ -668,7 +843,7 @@ public class FairAffinityFunction implements AffinityFunction {
                     }
                 }
 
-                throw new IllegalStateException("Unable to assign partition to node while force is true.");
+                return false;
             }
 
             // !force.
@@ -684,6 +859,102 @@ public class FairAffinityFunction implements AffinityFunction {
         public Map<UUID, PartitionSet> tierMapping(int tier) {
             return tierMaps[tier];
         }
+
+        /**
+         * @param part Partition.
+         * @param tier Tier.
+         * @param node Node.
+         * @param allowNeighbors Allow neighbors.
+         */
+        private boolean isAssignable(int part, int tier, final ClusterNode node, boolean allowNeighbors) {
+            if (containsPartition(part, node))
+                return false;
+
+            if (exclNeighbors)
+                return allowNeighbors || !neighborsContainPartition(node, part);
+            else if (backupFilter == null)
+                return true;
+            else {
+                if (tier == 0) {
+                    List<ClusterNode> assigment = assignments.get(part);
+
+                    assert assigment.size() > 0;
+
+                    List<ClusterNode> backups = assigment.subList(1, assigment.size());
+
+                    return !F.exist(backups, new IgnitePredicate<ClusterNode>() {
+                        @Override public boolean apply(ClusterNode n) {
+                            return !backupFilter.apply(node, n);
+                        }
+                    });
+                }
+                else
+                    return (backupFilter.apply(assignments.get(part).get(0), node));
+            }
+        }
+
+        /**
+         * @param part Partition.
+         * @param node Node.
+         */
+        private boolean containsPartition(int part, ClusterNode node) {
+            return fullMap.get(node.id()).contains(part);
+        }
+
+        /**
+         * @param node Node.
+         * @param part Partition.
+         */
+        private boolean neighborsContainPartition(ClusterNode node, final int part) {
+            return F.exist(neighborhoodMap.get(node.id()), new IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode n) {
+                    return fullMap.get(n.id()).contains(part);
+                }
+            });
+        }
+
+        /**
+         * Constructs assignments map for specified tier.
+         *
+         * @param tier Tier number, -1 for all tiers altogether.
+         * @param topSnapshot Topology snapshot.
+         * @return Assignment map.
+         */
+        private Map<UUID, PartitionSet> assignments(int tier, Collection<ClusterNode> topSnapshot) {
+            Map<UUID, PartitionSet> tmp = new LinkedHashMap<>();
+
+            for (int part = 0; part < assignments.size(); part++) {
+                List<ClusterNode> nodes = assignments.get(part);
+
+                assert nodes instanceof RandomAccess;
+
+                if (nodes.size() <= tier)
+                    continue;
+
+                int start = tier < 0 ? 0 : tier;
+                int end = tier < 0 ? nodes.size() : tier + 1;
+
+                for (int i = start; i < end; i++) {
+                    ClusterNode n = nodes.get(i);
+
+                    PartitionSet set = tmp.get(n.id());
+
+                    if (set == null)
+                        tmp.put(n.id(), set = new PartitionSet(n));
+
+                    set.add(part);
+                }
+            }
+
+            if (tmp.size() < topSnapshot.size()) {
+                for (ClusterNode node : topSnapshot) {
+                    if (!tmp.containsKey(node.id()))
+                        tmp.put(node.id(), new PartitionSet(node));
+                }
+            }
+
+            return tmp;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index fd07eb9..61a21d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -29,26 +29,28 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionContext;
 import org.apache.ignite.cache.affinity.AffinityNodeHashResolver;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -60,8 +62,9 @@ import org.jetbrains.annotations.Nullable;
  * </li>
  * <li>
  *      {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors
- *      from being backups of each other. Note that {@code backupFilter} is ignored if
- *      {@code excludeNeighbors} is set to {@code true}.
+ *      from being backups of each other. This flag can be ignored in cases when topology has no enough nodes
+ *      for assign backups.
+ *      Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
  * </li>
  * <li>
  *      {@code backupFilter} - Optional filter for back up nodes. If provided, then only
@@ -70,7 +73,7 @@ import org.jetbrains.annotations.Nullable;
  * </li>
  * </ul>
  * <p>
- * Cache affinity can be configured for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method.
+ * Cache affinity can be configured for individual caches via {@link CacheConfiguration#getAffinity()} method.
  */
 public class RendezvousAffinityFunction implements AffinityFunction, Externalizable {
     /** */
@@ -80,8 +83,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     public static final int DFLT_PARTITION_COUNT = 1024;
 
     /** Comparator. */
-    private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR =
-        new HashComparator();
+    private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
 
     /** Thread local message digest. */
     private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
@@ -92,8 +94,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
             catch (NoSuchAlgorithmException e) {
                 assert false : "Should have failed in constructor";
 
-                throw new IgniteException("Failed to obtain message digest (digest was available in constructor)",
-                    e);
+                throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e);
             }
         }
     };
@@ -104,6 +105,9 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     /** Exclude neighbors flag. */
     private boolean exclNeighbors;
 
+    /** Exclude neighbors warning. */
+    private transient boolean exclNeighborsWarn;
+
     /** Optional backup filter. First node is primary, second node is a node being tested. */
     private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
 
@@ -114,6 +118,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     @IgniteInstanceResource
     private Ignite ignite;
 
+    /** Logger instance. */
+    @LoggerResource
+    private transient IgniteLogger log;
+
     /**
      * Empty constructor with all defaults.
      */
@@ -125,7 +133,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
      * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
      * and specified number of backups.
      * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
      *
      * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
      *      of each other.
@@ -138,7 +146,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
      * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
      * and specified number of backups and partitions.
      * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
      *
      * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
      *      of each other.
@@ -151,14 +159,14 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     /**
      * Initializes optional counts for replicas and backups.
      * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
      *
      * @param parts Total number of partitions.
      * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
      *      from all nodes that pass this filter. First argument for this filter is primary node, and second
      *      argument is node being tested.
      * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
      */
     public RendezvousAffinityFunction(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
         this(false, parts, backupFilter);
@@ -173,7 +181,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
      */
     private RendezvousAffinityFunction(boolean exclNeighbors, int parts,
         IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
-        A.ensure(parts != 0, "parts != 0");
+        A.ensure(parts > 0, "parts > 0");
 
         this.exclNeighbors = exclNeighbors;
         this.parts = parts;
@@ -253,7 +261,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
      * from all nodes that pass this filter. First node passed to this filter is primary node,
      * and second node is a node being tested.
      * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
      *
      * @return Optional backup filter.
      */
@@ -266,7 +274,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
      * nodes that pass this filter. First node being passed to this filter is primary node,
      * and second node is a node being tested.
      * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
      *
      * @param backupFilter Optional backup filter.
      */
@@ -277,7 +285,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     /**
      * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
      * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
      *
      * @return {@code True} if nodes residing on the same host may not act as backups of each other.
      */
@@ -288,7 +296,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     /**
      * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
      * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+     * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
      *
      * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
      */
@@ -355,20 +363,9 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
 
         Collections.sort(lst, COMPARATOR);
 
-        int primaryAndBackups;
-
-        List<ClusterNode> res;
-
-        if (backups == Integer.MAX_VALUE) {
-            primaryAndBackups = Integer.MAX_VALUE;
-
-            res = new ArrayList<>();
-        }
-        else {
-            primaryAndBackups = backups + 1;
+        int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
 
-            res = new ArrayList<>(primaryAndBackups);
-        }
+        List<ClusterNode> res = new ArrayList<>(primaryAndBackups);
 
         ClusterNode primary = lst.get(0).get2();
 
@@ -376,39 +373,38 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
 
         // Select backups.
         if (backups > 0) {
-            for (int i = 1; i < lst.size(); i++) {
+            for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
                 IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
 
                 ClusterNode node = next.get2();
 
                 if (exclNeighbors) {
-                    Collection<ClusterNode> allNeighbors = allNeighbors(neighborhoodCache, res);
+                    Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res);
 
                     if (!allNeighbors.contains(node))
                         res.add(node);
                 }
-                else {
-                    if (!res.contains(node) && (backupFilter == null || backupFilter.apply(primary, node)))
-                        res.add(next.get2());
-                }
-
-                if (res.size() == primaryAndBackups)
-                    break;
+                else if (backupFilter == null || backupFilter.apply(primary, node))
+                    res.add(next.get2());
             }
         }
 
         if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
-            // Need to iterate one more time in case if there are no nodes which pass exclude backups criteria.
-            for (int i = 1; i < lst.size(); i++) {
+            // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria.
+            for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
                 IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
 
                 ClusterNode node = next.get2();
 
                 if (!res.contains(node))
                     res.add(next.get2());
+            }
+
+            if (!exclNeighborsWarn) {
+                LT.warn(log, null, "Affinity function excludeNeighbors property is ignored " +
+                    "because topology has no enough nodes to assign backups.");
 
-                if (res.size() == primaryAndBackups)
-                    break;
+                exclNeighborsWarn = true;
             }
         }
 
@@ -437,7 +433,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
         List<List<ClusterNode>> assignments = new ArrayList<>(parts);
 
         Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
-            neighbors(affCtx.currentTopologySnapshot()) : null;
+            GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
 
         for (int i = 0; i < parts; i++) {
             List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
@@ -463,6 +459,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         parts = in.readInt();
         exclNeighbors = in.readBoolean();
@@ -471,57 +468,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     }
 
     /**
-     * Builds neighborhood map for all nodes in snapshot.
-     *
-     * @param topSnapshot Topology snapshot.
-     * @return Neighbors map.
-     */
-    private Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
-        Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);
-
-        // Group by mac addresses.
-        for (ClusterNode node : topSnapshot) {
-            String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS);
-
-            Collection<ClusterNode> nodes = macMap.get(macs);
-
-            if (nodes == null) {
-                nodes = new HashSet<>();
-
-                macMap.put(macs, nodes);
-            }
-
-            nodes.add(node);
-        }
-
-        Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
-
-        for (Collection<ClusterNode> group : macMap.values()) {
-            for (ClusterNode node : group)
-                neighbors.put(node.id(), group);
-        }
-
-        return neighbors;
-    }
-
-    /**
-     * @param neighborhoodCache Neighborhood cache.
-     * @param nodes Nodes.
-     * @return All neighbors for given nodes.
-     */
-    private Collection<ClusterNode> allNeighbors(Map<UUID, Collection<ClusterNode>> neighborhoodCache,
-        Iterable<ClusterNode> nodes) {
-        Collection<ClusterNode> res = new HashSet<>();
-
-        for (ClusterNode node : nodes) {
-            if (!res.contains(node))
-                res.addAll(neighborhoodCache.get(node.id()));
-        }
-
-        return res;
-    }
-
-    /**
      *
      */
     private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 722e570..578ad6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -53,7 +53,6 @@ import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionContext;
 import org.apache.ignite.cache.affinity.AffinityNodeAddressHashResolver;
-import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreSessionListener;
@@ -367,18 +366,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         CacheType cacheType,
         @Nullable CacheStore cfgStore) throws IgniteCheckedException {
         if (cc.getCacheMode() == REPLICATED) {
-            if (cc.getAffinity() instanceof FairAffinityFunction)
-                throw new IgniteCheckedException("REPLICATED cache can not be started with FairAffinityFunction" +
-                    " [cacheName=" + U.maskName(cc.getName()) + ']');
-
-            if (cc.getAffinity() instanceof RendezvousAffinityFunction) {
-                RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cc.getAffinity();
-
-                if (aff.isExcludeNeighbors())
-                    throw new IgniteCheckedException("For REPLICATED cache flag 'excludeNeighbors' in " +
-                        "RendezvousAffinityFunction cannot be set [cacheName=" + U.maskName(cc.getName()) + ']');
-            }
-
             if (cc.getNearConfiguration() != null &&
                 ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
                 U.warn(log, "Near cache cannot be used with REPLICATED cache, " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index ee1f4a1..f7d115f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -1811,4 +1812,53 @@ public class GridCacheUtils {
             }
         };
     }
+
+    /**
+     * Builds neighborhood map for all nodes in snapshot.
+     *
+     * @param topSnapshot Topology snapshot.
+     * @return Neighbors map.
+     */
+    public static Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
+        Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);
+
+        // Group by mac addresses.
+        for (ClusterNode node : topSnapshot) {
+            String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS);
+
+            Collection<ClusterNode> nodes = macMap.get(macs);
+
+            if (nodes == null)
+                macMap.put(macs, nodes = new HashSet<>());
+
+            nodes.add(node);
+        }
+
+        Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
+
+        for (Collection<ClusterNode> group : macMap.values())
+            for (ClusterNode node : group)
+                neighbors.put(node.id(), group);
+
+        return neighbors;
+    }
+
+    /**
+     * Returns neighbors for all {@code nodes}.
+     *
+     * @param neighborhood Neighborhood cache.
+     * @param nodes Nodes.
+     * @return All neighbors for given nodes.
+     */
+    public static Collection<ClusterNode> neighborsForNodes(Map<UUID, Collection<ClusterNode>> neighborhood,
+        Iterable<ClusterNode> nodes) {
+        Collection<ClusterNode> res = new HashSet<>();
+
+        for (ClusterNode node : nodes) {
+            if (!res.contains(node))
+                res.addAll(neighborhood.get(node.id()));
+        }
+
+        return res;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
new file mode 100644
index 0000000..878d7d1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+import org.apache.ignite.testframework.GridTestNode;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstractTest {
+    /** MAC prefix. */
+    private static final String MAC_PREF = "MAC";
+
+    /**
+     * Returns affinity function.
+     *
+     * @return Affinity function.
+     */
+    protected abstract AffinityFunction affinityFunction();
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeRemovedNoBackups() throws Exception {
+        checkNodeRemoved(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeRemovedOneBackup() throws Exception {
+        checkNodeRemoved(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeRemovedTwoBackups() throws Exception {
+        checkNodeRemoved(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeRemovedThreeBackups() throws Exception {
+        checkNodeRemoved(3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomReassignmentNoBackups() throws Exception {
+        checkRandomReassignment(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomReassignmentOneBackup() throws Exception {
+        checkRandomReassignment(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomReassignmentTwoBackups() throws Exception {
+        checkRandomReassignment(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomReassignmentThreeBackups() throws Exception {
+        checkRandomReassignment(3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    protected void checkNodeRemoved(int backups) throws Exception {
+        checkNodeRemoved(backups, 1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    protected void checkNodeRemoved(int backups, int neighborsPerHost, int neighborsPeriod) throws Exception {
+
+        AffinityFunction aff = affinityFunction();
+
+        int nodesCnt = 50;
+
+        List<ClusterNode> nodes = new ArrayList<>(nodesCnt);
+
+        List<List<ClusterNode>> prev = null;
+
+        for (int i = 0; i < nodesCnt; i++) {
+            info("======================================");
+            info("Assigning partitions: " + i);
+            info("======================================");
+
+            ClusterNode node = new GridTestNode(UUID.randomUUID());
+
+            if (neighborsPerHost > 0)
+                node.attribute(MAC_PREF + ((i / neighborsPeriod) % (nodesCnt / neighborsPerHost)));
+
+            nodes.add(node);
+
+            DiscoveryEvent discoEvt = new DiscoveryEvent(node, "", EventType.EVT_NODE_JOINED, node);
+
+            GridAffinityFunctionContextImpl ctx =
+                new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), backups);
+
+            List<List<ClusterNode>> assignment = aff.assignPartitions(ctx);
+
+            info("Assigned.");
+
+            verifyAssignment(assignment, backups, aff.partitions(), nodes.size());
+
+            prev = assignment;
+        }
+
+        info("======================================");
+        info("Will remove nodes.");
+        info("======================================");
+
+        for (int i = 0; i < nodesCnt - 1; i++) {
+            info("======================================");
+            info("Assigning partitions: " + i);
+            info("======================================");
+
+            ClusterNode rmv = nodes.remove(nodes.size() - 1);
+
+            DiscoveryEvent discoEvt = new DiscoveryEvent(rmv, "", EventType.EVT_NODE_LEFT, rmv);
+
+            List<List<ClusterNode>> assignment = aff.assignPartitions(
+                new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i),
+                    backups));
+
+            info("Assigned.");
+
+            verifyAssignment(assignment, backups, aff.partitions(), nodes.size());
+
+            prev = assignment;
+        }
+    }
+
+    /**
+     * @param backups Backups.
+     */
+    protected void checkRandomReassignment(int backups) {
+        AffinityFunction aff = affinityFunction();
+
+        Random rnd = new Random();
+
+        int maxNodes = 50;
+
+        List<ClusterNode> nodes = new ArrayList<>(maxNodes);
+
+        List<List<ClusterNode>> prev = null;
+
+        int state = 0;
+
+        int i = 0;
+
+        while (true) {
+            boolean add;
+
+            if (nodes.size() < 2) {
+                // Returned back to one node?
+                if (state == 1)
+                    return;
+
+                add = true;
+            }
+            else if (nodes.size() == maxNodes) {
+                if (state == 0)
+                    state = 1;
+
+                add = false;
+            }
+            else {
+                // Nodes size in [2, maxNodes - 1].
+                if (state == 0)
+                    add = rnd.nextInt(3) != 0; // 66% to add, 33% to remove.
+                else
+                    add = rnd.nextInt(3) == 0; // 33% to add, 66% to remove.
+            }
+
+            DiscoveryEvent discoEvt;
+
+            if (add) {
+                ClusterNode addedNode = new GridTestNode(UUID.randomUUID());
+
+                nodes.add(addedNode);
+
+                discoEvt = new DiscoveryEvent(addedNode, "", EventType.EVT_NODE_JOINED, addedNode);
+            }
+            else {
+                ClusterNode rmvNode = nodes.remove(rnd.nextInt(nodes.size()));
+
+                discoEvt = new DiscoveryEvent(rmvNode, "", EventType.EVT_NODE_LEFT, rmvNode);
+            }
+
+            info("======================================");
+            info("Assigning partitions [iter=" + i + ", discoEvt=" + discoEvt + ", nodesSize=" + nodes.size() + ']');
+            info("======================================");
+
+            List<List<ClusterNode>> assignment = aff.assignPartitions(
+                new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i),
+                    backups));
+
+            verifyAssignment(assignment, backups, aff.partitions(), nodes.size());
+
+            prev = assignment;
+
+            i++;
+        }
+    }
+
+
+    /**
+     * @param assignment Assignment to verify.
+     */
+    private void verifyAssignment(List<List<ClusterNode>> assignment, int keyBackups, int partsCnt, int topSize) {
+        Map<UUID, Collection<Integer>> mapping = new HashMap<>();
+
+        int ideal = Math.round((float)partsCnt / topSize * Math.min(keyBackups + 1, topSize));
+
+        for (int part = 0; part < assignment.size(); part++) {
+            for (ClusterNode node : assignment.get(part)) {
+                assert node != null;
+
+                Collection<Integer> parts = mapping.get(node.id());
+
+                if (parts == null) {
+                    parts = new HashSet<>();
+
+                    mapping.put(node.id(), parts);
+                }
+
+                assertTrue(parts.add(part));
+            }
+        }
+
+        int max = -1, min = Integer.MAX_VALUE;
+
+        for (Collection<Integer> parts : mapping.values()) {
+            max = Math.max(max, parts.size());
+            min = Math.min(min, parts.size());
+        }
+
+        log().warning("max=" + max + ", min=" + min + ", ideal=" + ideal + ", minDev=" + deviation(min, ideal) + "%, " +
+            "maxDev=" + deviation(max, ideal) + "%");
+    }
+
+    /**
+     * @param val Value.
+     * @param ideal Ideal.
+     */
+    private static int deviation(int val, int ideal) {
+        return Math.round(Math.abs(((float)val - ideal) / ideal * 100));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java
new file mode 100644
index 0000000..24704ed
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.Collection;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class AffinityClientNodeSelfTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODE_CNT = 4;
+
+    /** */
+    private static final String CACHE1 = "cache1";
+
+    /** */
+    private static final String CACHE2 = "cache2";
+
+    /** */
+    private static final String CACHE3 = "cache3";
+
+    /** */
+    private static final String CACHE4 = "cache4";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        if (gridName.equals(getTestGridName(NODE_CNT - 1)))
+            cfg.setClientMode(true);
+
+        CacheConfiguration ccfg1 = new CacheConfiguration();
+
+        ccfg1.setBackups(1);
+        ccfg1.setName(CACHE1);
+        ccfg1.setAffinity(new RendezvousAffinityFunction());
+        ccfg1.setNodeFilter(new TestNodesFilter());
+
+        CacheConfiguration ccfg2 = new CacheConfiguration();
+
+        ccfg2.setBackups(1);
+        ccfg2.setName(CACHE2);
+        ccfg2.setAffinity(new RendezvousAffinityFunction());
+
+        CacheConfiguration ccfg3 = new CacheConfiguration();
+
+        ccfg3.setBackups(1);
+        ccfg3.setName(CACHE3);
+        ccfg3.setAffinity(new FairAffinityFunction());
+        ccfg3.setNodeFilter(new TestNodesFilter());
+
+        CacheConfiguration ccfg4 = new CacheConfiguration();
+
+        ccfg4.setCacheMode(REPLICATED);
+        ccfg4.setName(CACHE4);
+        ccfg4.setNodeFilter(new TestNodesFilter());
+
+        cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODE_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientNodeNotInAffinity() throws Exception {
+        checkCache(CACHE1, 2);
+
+        checkCache(CACHE2, 2);
+
+        checkCache(CACHE3, 2);
+
+        checkCache(CACHE4, 3);
+
+        Ignite client = ignite(NODE_CNT - 1);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setBackups(0);
+
+        ccfg.setNodeFilter(new TestNodesFilter());
+
+        IgniteCache<Integer, Integer> cache = client.createCache(ccfg);
+
+        try {
+            checkCache(null, 1);
+        }
+        finally {
+            cache.destroy();
+        }
+
+        cache = client.createCache(ccfg, new NearCacheConfiguration());
+
+        try {
+            checkCache(null, 1);
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param expNodes Expected number of nodes per partition.
+     */
+    private void checkCache(String cacheName, int expNodes) {
+        log.info("Test cache: " + cacheName);
+
+        Ignite client = ignite(NODE_CNT - 1);
+
+        assertTrue(client.configuration().isClientMode());
+
+        ClusterNode clientNode = client.cluster().localNode();
+
+        for (int i = 0; i < NODE_CNT; i++) {
+            Ignite ignite = ignite(i);
+
+            Affinity<Integer> aff = ignite.affinity(cacheName);
+
+            for (int part = 0; part < aff.partitions(); part++) {
+                Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(part);
+
+                assertEquals(expNodes, nodes.size());
+
+                assertFalse(nodes.contains(clientNode));
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestNodesFilter implements IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode clusterNode) {
+            Boolean attr = clusterNode.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
+
+            assertNotNull(attr);
+
+            assertFalse(attr);
+
+            return true;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
new file mode 100644
index 0000000..3bf41c1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+
+/**
+ * Base tests of {@link AffinityFunction} implementations with user provided backup filter.
+ */
+public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridCommonAbstractTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Backup count. */
+    private static final int BACKUPS = 1;
+
+    /** Split attribute name. */
+    private static final String SPLIT_ATTRIBUTE_NAME = "split-attribute";
+
+    /** Split attribute value. */
+    private String splitAttrVal;
+
+    /** Test backup filter. */
+    protected static final IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter =
+        new IgniteBiPredicate<ClusterNode, ClusterNode>() {
+            @Override public boolean apply(ClusterNode primary, ClusterNode backup) {
+                assert primary != null : "primary is null";
+                assert backup != null : "backup is null";
+
+                return !F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME));
+            }
+        };
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setBackups(BACKUPS);
+        cacheCfg.setAffinity(affinityFunction());
+        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setRebalanceMode(SYNC);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+        spi.setIpFinder(IP_FINDER);
+
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(cacheCfg);
+        cfg.setDiscoverySpi(spi);
+        cfg.setUserAttributes(F.asMap(SPLIT_ATTRIBUTE_NAME, splitAttrVal));
+
+        return cfg;
+    }
+
+    /**
+     * @return Affinity function for test.
+     */
+    protected abstract AffinityFunction affinityFunction();
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionDistribution() throws Exception {
+        try {
+            for (int i = 0; i < 3; i++) {
+                splitAttrVal = "A";
+
+                startGrid(2 * i);
+
+                splitAttrVal = "B";
+
+                startGrid(2 * i + 1);
+
+                awaitPartitionMapExchange();
+
+                checkPartitions();
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private void checkPartitions() throws Exception {
+        AffinityFunction aff = cacheConfiguration(grid(0).configuration(), null).getAffinity();
+
+        int partCnt = aff.partitions();
+
+        IgniteCache<Object, Object> cache = grid(0).cache(null);
+
+        for (int i = 0; i < partCnt; i++) {
+            Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i);
+
+            assertEquals(2, nodes.size());
+
+            ClusterNode primary = F.first(nodes);
+            ClusterNode backup = F.last(nodes);
+
+            assertFalse(F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME)));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a180027/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
new file mode 100644
index 0000000..10cb5a5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+
+/**
+ * Partitioned affinity test.
+ */
+@SuppressWarnings({"PointlessArithmeticExpression", "FieldCanBeLocal"})
+public abstract class AffinityFunctionExcludeNeighborsAbstractSelfTest extends GridCommonAbstractTest {
+    /** Number of backups. */
+    private int backups = 2;
+
+    /** Number of girds. */
+    private int gridInstanceNum;
+
+    /** Ip finder. */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        // Override node attributes in discovery spi.
+        TcpDiscoverySpi spi = new TcpDiscoverySpi() {
+            @Override public void setNodeAttributes(Map<String, Object> attrs,
+                IgniteProductVersion ver) {
+                super.setNodeAttributes(attrs, ver);
+
+                // Set unique mac addresses for every group of three nodes.
+                String macAddrs = "MOCK_MACS_" + (gridInstanceNum / 3);
+
+                attrs.put(IgniteNodeAttributes.ATTR_MACS, macAddrs);
+
+                gridInstanceNum++;
+            }
+        };
+
+        spi.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(spi);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(PARTITIONED);
+
+        cc.setBackups(backups);
+
+        cc.setAffinity(affinityFunction());
+
+        cc.setRebalanceMode(NONE);
+
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /**
+     * @return Affinity function for test.
+     */
+    protected abstract AffinityFunction affinityFunction();
+
+    /**
+     * @param aff Affinity.
+     * @param key Key.
+     * @return Nodes.
+     */
+    private static Collection<? extends ClusterNode> nodes(Affinity<Object> aff, Object key) {
+        return aff.mapKeyToPrimaryAndBackups(key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityMultiNode() throws Exception {
+        int grids = 9;
+
+        startGrids(grids);
+
+        try {
+            Object key = 12345;
+
+            int copies = backups + 1;
+
+            for (int i = 0; i < grids; i++) {
+                final Ignite g = grid(i);
+
+                Affinity<Object> aff = g.affinity(null);
+
+                List<TcpDiscoveryNode> top = new ArrayList<>();
+
+                for (ClusterNode node : g.cluster().nodes())
+                    top.add((TcpDiscoveryNode) node);
+
+                Collections.sort(top);
+
+                assertEquals(grids, top.size());
+
+                int idx = 1;
+
+                for (ClusterNode n : top) {
+                    assertEquals(idx, n.order());
+
+                    idx++;
+                }
+
+                Collection<? extends ClusterNode> affNodes = nodes(aff, key);
+
+                info("Affinity picture for grid [i=" + i + ", aff=" + U.toShortString(affNodes));
+
+                assertEquals(copies, affNodes.size());
+
+                Set<String> macs = new HashSet<>();
+
+                for (ClusterNode node : affNodes)
+                    macs.add((String)node.attribute(IgniteNodeAttributes.ATTR_MACS));
+
+                assertEquals(copies, macs.size());
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinitySingleNode() throws Exception {
+        Ignite g = startGrid();
+
+        try {
+            Object key = 12345;
+
+            Collection<? extends ClusterNode> affNodes = nodes(g.affinity(null), key);
+
+            info("Affinity picture for grid: " + U.toShortString(affNodes));
+
+            assertEquals(1, affNodes.size());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message