ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [07/12] incubator-ignite git commit: IGNITE-575 - Renaimings
Date Wed, 25 Mar 2015 06:39:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
new file mode 100644
index 0000000..0b9e33b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.fair;
+
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Fair affinity function which tries to ensure that all nodes get equal number of partitions with
+ * minimum amount of reassignments between existing nodes.
+ * <p>
+ * Cache affinity can be configured for individual caches via {@link CacheConfiguration#getAffinity()} method.
+ */
+@CentralizedAffinityFunction
+public class FairAffinityFunction implements AffinityFunction {
+    /** Default partition count. */
+    public static final int DFLT_PART_CNT = 256;
+
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Ascending comparator. */
+    private static final Comparator<PartitionSet> ASC_CMP = new PartitionSetComparator();
+
+    /** Descending comparator. */
+    private static final Comparator<PartitionSet> DESC_CMP = Collections.reverseOrder(ASC_CMP);
+
+    /** */
+    private final int parts;
+
+    /**
+     * Creates fair affinity with default partition count.
+     */
+    public FairAffinityFunction() {
+        this(DFLT_PART_CNT);
+    }
+
+    /**
+     * @param parts Number of partitions.
+     */
+    public FairAffinityFunction(int parts) {
+        this.parts = parts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext ctx) {
+        List<ClusterNode> topSnapshot = ctx.currentTopologySnapshot();
+
+        if (topSnapshot.size() == 1) {
+            ClusterNode primary = topSnapshot.get(0);
+
+            return Collections.nCopies(parts, Collections.singletonList(primary));
+        }
+
+        List<List<ClusterNode>> assignment = createCopy(ctx);
+
+        int tiers = Math.min(ctx.backups() + 1, topSnapshot.size());
+
+        // Per tier pending partitions.
+        Map<Integer, Queue<Integer>> pendingParts = new HashMap<>();
+
+        FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot);
+
+        for (int tier = 0; tier < tiers; tier++) {
+            // Check if this is a new tier and add pending partitions.
+            Queue<Integer> pending = pendingParts.get(tier);
+
+            for (int part = 0; part < parts; part++) {
+                if (fullMap.assignments.get(part).size() < tier + 1) {
+                    if (pending == null) {
+                        pending = new LinkedList<>();
+
+                        pendingParts.put(tier, pending);
+                    }
+
+                    if (!pending.contains(part))
+                        pending.add(part);
+
+                }
+            }
+
+            // Assign pending partitions, if any.
+            assignPending(tier, pendingParts, fullMap, topSnapshot);
+
+            // Balance assignments.
+            balance(tier, pendingParts, fullMap, topSnapshot);
+        }
+
+        return fullMap.assignments;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partitions() {
+        return parts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition(Object key) {
+        return U.safeAbs(hash(key.hashCode())) % parts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeNode(UUID nodeId) {
+        // No-op.
+    }
+
+    /**
+     * Assigns pending (unassigned) partitions to nodes.
+     *
+     * @param tier Tier to assign (0 is primary, 1 - 1st backup,...).
+     * @param pendingMap Pending partitions per tier.
+     * @param fullMap Full assignment map to modify.
+     * @param topSnapshot Topology snapshot.
+     */
+    private void assignPending(int tier, Map<Integer, Queue<Integer>> pendingMap, FullAssignmentMap fullMap,
+        List<ClusterNode> topSnapshot) {
+        Queue<Integer> pending = pendingMap.get(tier);
+
+        if (F.isEmpty(pending))
+            return;
+
+        int idealPartCnt = parts / topSnapshot.size();
+
+        Map<UUID, PartitionSet> tierMapping = fullMap.tierMapping(tier);
+
+        PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, idealPartCnt, false);
+
+        // First iterate over underloaded nodes.
+        assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false);
+
+        if (!pending.isEmpty() && !underloadedNodes.isEmpty()) {
+            // Same, forcing updates.
+            assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true);
+        }
+
+        if (!pending.isEmpty())
+            assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot);
+
+        assert pending.isEmpty();
+
+        pendingMap.remove(tier);
+    }
+
+    /**
+     * Assigns pending partitions to underloaded nodes.
+     *
+     * @param tier Tier to assign.
+     * @param pendingMap Pending partitions per tier.
+     * @param fullMap Full assignment map to modify.
+     * @param underloadedNodes Underloaded nodes.
+     * @param topSnapshot Topology snapshot.
+     * @param force {@code True} if partitions should be moved.
+     */
+    private void assignPendingToUnderloaded(
+        int tier,
+        Map<Integer, Queue<Integer>> pendingMap,
+        FullAssignmentMap fullMap,
+        PrioritizedPartitionMap underloadedNodes,
+        Collection<ClusterNode> topSnapshot,
+        boolean force) {
+        Iterator<Integer> it = pendingMap.get(tier).iterator();
+
+        int ideal = parts / topSnapshot.size();
+
+        while (it.hasNext()) {
+            int part = it.next();
+
+            for (PartitionSet set : underloadedNodes.assignments()) {
+                ClusterNode node = set.node();
+
+                assert node != null;
+
+                if (fullMap.assign(part, tier, node, force, pendingMap)) {
+                    // We could add partition to partition map without forcing, remove partition from pending.
+                    it.remove();
+
+                    if (set.size() <= ideal)
+                        underloadedNodes.remove(set.nodeId());
+                    else
+                        underloadedNodes.update();
+
+                    break; // for, continue to the next partition.
+                }
+            }
+
+            if (underloadedNodes.isEmpty())
+                return;
+        }
+    }
+
+    /**
+     * Spreads pending partitions equally to all nodes in topology snapshot.
+     *
+     * @param tier Tier to assign.
+     * @param pendingMap Pending partitions per tier.
+     * @param fullMap Full assignment map to modify.
+     * @param topSnapshot Topology snapshot.
+     */
+    private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> pendingMap,
+        FullAssignmentMap fullMap, List<ClusterNode> topSnapshot) {
+        Iterator<Integer> it = pendingMap.get(tier).iterator();
+
+        int idx = 0;
+
+        while (it.hasNext()) {
+            int part = it.next();
+
+            int i = idx;
+
+            boolean assigned = false;
+
+            do {
+                ClusterNode node = topSnapshot.get(i);
+
+                if (fullMap.assign(part, tier, node, false, pendingMap)) {
+                    it.remove();
+
+                    assigned = true;
+                }
+
+                i = (i + 1) % topSnapshot.size();
+
+                if (assigned)
+                    idx = i;
+            } while (i != idx);
+
+            if (!assigned) {
+                do {
+                    ClusterNode node = topSnapshot.get(i);
+
+                    if (fullMap.assign(part, tier, node, true, pendingMap)) {
+                        it.remove();
+
+                        assigned = true;
+                    }
+
+                    i = (i + 1) % topSnapshot.size();
+
+                    if (assigned)
+                        idx = i;
+                } while (i != idx);
+            }
+
+            if (!assigned)
+                throw new IllegalStateException("Failed to find assignable node for partition.");
+        }
+    }
+
+    /**
+     * Tries to balance assignments between existing nodes in topology.
+     *
+     * @param tier Tier to assign.
+     * @param pendingParts Pending partitions per tier.
+     * @param fullMap Full assignment map to modify.
+     * @param topSnapshot Topology snapshot.
+     */
+    private void balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap,
+        Collection<ClusterNode> topSnapshot) {
+        int idealPartCnt = parts / topSnapshot.size();
+
+        Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier);
+
+        PrioritizedPartitionMap underloadedNodes = filterNodes(mapping, idealPartCnt, false);
+        PrioritizedPartitionMap overloadedNodes = filterNodes(mapping, idealPartCnt, true);
+
+        do {
+            boolean retry = false;
+
+            for (PartitionSet overloaded : overloadedNodes.assignments()) {
+                for (Integer part : overloaded.partitions()) {
+                    boolean assigned = false;
+
+                    for (PartitionSet underloaded : underloadedNodes.assignments()) {
+                        if (fullMap.assign(part, tier, underloaded.node(), false, pendingParts)) {
+                            // Size of partition sets has changed.
+                            if (overloaded.size() <= idealPartCnt)
+                                overloadedNodes.remove(overloaded.nodeId());
+                            else
+                                overloadedNodes.update();
+
+                            if (underloaded.size() >= idealPartCnt)
+                                underloadedNodes.remove(underloaded.nodeId());
+                            else
+                                underloadedNodes.update();
+
+                            assigned = true;
+
+                            retry = true;
+
+                            break;
+                        }
+                    }
+
+                    if (!assigned) {
+                        for (PartitionSet underloaded : underloadedNodes.assignments()) {
+                            if (fullMap.assign(part, tier, underloaded.node(), true, pendingParts)) {
+                                // Size of partition sets has changed.
+                                if (overloaded.size() <= idealPartCnt)
+                                    overloadedNodes.remove(overloaded.nodeId());
+                                else
+                                    overloadedNodes.update();
+
+                                if (underloaded.size() >= idealPartCnt)
+                                    underloadedNodes.remove(underloaded.nodeId());
+                                else
+                                    underloadedNodes.update();
+
+                                retry = true;
+
+                                break;
+                            }
+                        }
+                    }
+
+                    if (retry)
+                        break; // for part.
+                }
+
+                if (retry)
+                    break; // for overloaded.
+            }
+
+            if (!retry)
+                break;
+        }
+        while (true);
+    }
+
+    /**
+     * Constructs underloaded or overloaded partition map.
+     *
+     * @param mapping Mapping to filter.
+     * @param idealPartCnt Ideal number of partitions per node.
+     * @param overloaded {@code True} if should create overloaded map, {@code false} for underloaded.
+     * @return Prioritized partition map.
+     */
+    private PrioritizedPartitionMap filterNodes(Map<UUID, PartitionSet> mapping, int idealPartCnt, boolean overloaded) {
+        assert mapping != null;
+
+        PrioritizedPartitionMap res = new PrioritizedPartitionMap(overloaded ? DESC_CMP : ASC_CMP);
+
+        for (PartitionSet set : mapping.values()) {
+            if ((overloaded && set.size() > idealPartCnt) || (!overloaded && set.size() < idealPartCnt))
+               res.add(set);
+        }
+
+        return res;
+    }
+
+    /**
+     * Creates copy of previous partition assignment.
+     *
+     * @param ctx Affinity function context.
+     * @return Assignment copy and per node partition map.
+     */
+    private List<List<ClusterNode>> createCopy(AffinityFunctionContext ctx) {
+        DiscoveryEvent discoEvt = ctx.discoveryEvent();
+
+        UUID leftNodeId = (discoEvt == null || discoEvt.type() == EventType.EVT_NODE_JOINED)
+            ? null
+            : discoEvt.eventNode().id();
+
+        List<List<ClusterNode>> cp = new ArrayList<>(parts);
+
+        for (int part = 0; part < parts; part++) {
+            List<ClusterNode> partNodes = ctx.previousAssignment(part);
+
+            List<ClusterNode> partNodesCp;
+
+            if (partNodes == null)
+                partNodesCp = new ArrayList<>();
+            else {
+                if (leftNodeId == null) {
+                    partNodesCp = new ArrayList<>(partNodes.size() + 1); // Node joined.
+
+                    partNodesCp.addAll(partNodes);
+                }
+                else {
+                    partNodesCp = new ArrayList<>(partNodes.size());
+
+                    for (ClusterNode affNode : partNodes) {
+                        if (!affNode.id().equals(leftNodeId))
+                            partNodesCp.add(affNode);
+                    }
+                }
+            }
+
+            cp.add(partNodesCp);
+        }
+
+        return cp;
+    }
+
+    /**
+     *
+     */
+    private static class PartitionSetComparator implements Comparator<PartitionSet>, Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public int compare(PartitionSet o1, PartitionSet o2) {
+            return Integer.compare(o1.parts.size(), o2.parts.size());
+        }
+    }
+
+    /**
+     * Prioritized partition map. Ordered structure in which nodes are ordered in ascending or descending order
+     * by number of partitions assigned to a node.
+     */
+    private static class PrioritizedPartitionMap {
+        /** Comparator. */
+        private Comparator<PartitionSet> cmp;
+
+        /** Assignment map. */
+        private Map<UUID, PartitionSet> assignmentMap = new HashMap<>();
+
+        /** Assignment list, ordered according to comparator. */
+        private List<PartitionSet> assignmentList = new ArrayList<>();
+
+        /**
+         * @param cmp Comparator.
+         */
+        private PrioritizedPartitionMap(Comparator<PartitionSet> cmp) {
+            this.cmp = cmp;
+        }
+
+        /**
+         * @param set Partition set to add.
+         */
+        public void add(PartitionSet set) {
+            PartitionSet old = assignmentMap.put(set.nodeId(), set);
+
+            if (old == null) {
+                assignmentList.add(set);
+
+                update();
+            }
+        }
+
+        /**
+         * Sorts assignment list.
+         */
+        public void update() {
+            Collections.sort(assignmentList, cmp);
+        }
+
+        /**
+         * @return Sorted assignment list.
+         */
+        public List<PartitionSet> assignments() {
+            return assignmentList;
+        }
+
+        /**
+         * @param uuid Uuid.
+         */
+        public void remove(UUID uuid) {
+            PartitionSet rmv = assignmentMap.remove(uuid);
+
+            assignmentList.remove(rmv);
+        }
+
+        /**
+         *
+         */
+        public boolean isEmpty() {
+            return assignmentList.isEmpty();
+        }
+    }
+
+    /**
+     * Constructs assignment map for specified tier.
+     *
+     * @param tier Tier number, -1 for all tiers altogether.
+     * @param assignment Assignment to construct map from.
+     * @param topSnapshot Topology snapshot.
+     * @return Assignment map.
+     */
+    private static Map<UUID, PartitionSet> assignments(int tier, List<List<ClusterNode>> assignment,
+        Collection<ClusterNode> topSnapshot) {
+        Map<UUID, PartitionSet> tmp = new LinkedHashMap<>();
+
+        for (int part = 0; part < assignment.size(); part++) {
+            List<ClusterNode> nodes = assignment.get(part);
+
+            assert nodes instanceof RandomAccess;
+
+            if (nodes.size() <= tier)
+                continue;
+
+            int start = tier < 0 ? 0 : tier;
+            int end = tier < 0 ? nodes.size() : tier + 1;
+
+            for (int i = start; i < end; i++) {
+                ClusterNode n = nodes.get(i);
+
+                PartitionSet set = tmp.get(n.id());
+
+                if (set == null) {
+                    set = new PartitionSet(n);
+
+                    tmp.put(n.id(), set);
+                }
+
+                set.add(part);
+            }
+        }
+
+        if (tmp.size() < topSnapshot.size()) {
+            for (ClusterNode node : topSnapshot) {
+                if (!tmp.containsKey(node.id()))
+                    tmp.put(node.id(), new PartitionSet(node));
+            }
+        }
+
+        return tmp;
+    }
+
+    /**
+     * Full assignment map. Auxiliary data structure which maintains resulting assignment and temporary
+     * maps consistent.
+     */
+    @SuppressWarnings("unchecked")
+    private static class FullAssignmentMap {
+        /** Per-tier assignment maps. */
+        private Map<UUID, PartitionSet>[] tierMaps;
+
+        /** Full assignment map. */
+        private Map<UUID, PartitionSet> fullMap;
+
+        /** Resulting assignment. */
+        private List<List<ClusterNode>> assignments;
+
+        /**
+         * @param tiers Number of tiers.
+         * @param assignments Assignments to modify.
+         * @param topSnapshot Topology snapshot.
+         */
+        private FullAssignmentMap(int tiers, List<List<ClusterNode>> assignments, Collection<ClusterNode> topSnapshot) {
+            this.assignments = assignments;
+
+            tierMaps = new Map[tiers];
+
+            for (int tier = 0; tier < tiers; tier++)
+                tierMaps[tier] = assignments(tier, assignments, topSnapshot);
+
+            fullMap = assignments(-1, assignments, topSnapshot);
+        }
+
+        /**
+         * Tries to assign partition to given node on specified tier. If force is false, assignment will succeed
+         * only if this partition is not already assigned to a node. If force is true, then assignment will succeed
+         * only if partition is not assigned to a tier with number less than passed in. Assigned partition from
+         * greater tier will be moved to pending queue.
+         *
+         * @param part Partition to assign.
+         * @param tier Tier number to assign.
+         * @param node Node to move partition to.
+         * @param force Force flag.
+         * @param pendingParts per tier pending partitions map.
+         * @return {@code True} if assignment succeeded.
+         */
+        boolean assign(int part, int tier, ClusterNode node, boolean force, Map<Integer, Queue<Integer>> pendingParts) {
+            UUID nodeId = node.id();
+
+            if (!fullMap.get(nodeId).contains(part)) {
+                tierMaps[tier].get(nodeId).add(part);
+
+                fullMap.get(nodeId).add(part);
+
+                List<ClusterNode> assignment = assignments.get(part);
+
+                if (assignment.size() <= tier)
+                    assignment.add(node);
+                else {
+                    ClusterNode oldNode = assignment.set(tier, node);
+
+                    if (oldNode != null) {
+                        UUID oldNodeId = oldNode.id();
+
+                        tierMaps[tier].get(oldNodeId).remove(part);
+                        fullMap.get(oldNodeId).remove(part);
+                    }
+                }
+
+                return true;
+            }
+            else if (force) {
+                assert !tierMaps[tier].get(nodeId).contains(part);
+
+                // Check previous tiers first.
+                for (int t = 0; t < tier; t++) {
+                    if (tierMaps[t].get(nodeId).contains(part))
+                        return false;
+                }
+
+                // Partition is on some lower tier, switch it.
+                for (int t = tier + 1; t < tierMaps.length; t++) {
+                    if (tierMaps[t].get(nodeId).contains(part)) {
+                        ClusterNode oldNode = assignments.get(part).get(tier);
+
+                        // Move partition from level t to tier.
+                        assignments.get(part).set(tier, node);
+                        assignments.get(part).set(t, null);
+
+                        if (oldNode != null) {
+                            tierMaps[tier].get(oldNode.id()).remove(part);
+                            fullMap.get(oldNode.id()).remove(part);
+                        }
+
+                        tierMaps[tier].get(nodeId).add(part);
+                        tierMaps[t].get(nodeId).remove(part);
+
+                        Queue<Integer> pending = pendingParts.get(t);
+
+                        if (pending == null) {
+                            pending = new LinkedList<>();
+
+                            pendingParts.put(t, pending);
+                        }
+
+                        pending.add(part);
+
+                        return true;
+                    }
+                }
+
+                throw new IllegalStateException("Unable to assign partition to node while force is true.");
+            }
+
+            // !force.
+            return false;
+        }
+
+        /**
+         * Gets tier mapping.
+         *
+         * @param tier Tier to get mapping.
+         * @return Per node map.
+         */
+        public Map<UUID, PartitionSet> tierMapping(int tier) {
+            return tierMaps[tier];
+        }
+    }
+
+    /**
+     * Applies a supplemental hash function to a given hashCode, which
+     * defends against poor quality hash functions.
+     *
+     * @param h Hash code.
+     * @return Enhanced hash code.
+     */
+    private static int hash(int h) {
+        // Spread bits to regularize both segment and index locations,
+        // using variant of single-word Wang/Jenkins hash.
+        h += (h <<  15) ^ 0xffffcd7d;
+        h ^= (h >>> 10);
+        h += (h <<   3);
+        h ^= (h >>>  6);
+        h += (h <<   2) + (h << 14);
+        return h ^ (h >>> 16);
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
+    private static class PartitionSet {
+        /** */
+        private ClusterNode node;
+
+        /** Partitions. */
+        private Collection<Integer> parts = new LinkedList<>();
+
+        /**
+         * @param node Node.
+         */
+        private PartitionSet(ClusterNode node) {
+            this.node = node;
+        }
+
+        /**
+         * @return Node.
+         */
+        private ClusterNode node() {
+            return node;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        private UUID nodeId() {
+            return node.id();
+        }
+
+        /**
+         * @return Partition set size.
+         */
+        private int size() {
+            return parts.size();
+        }
+
+        /**
+         * Adds partition to partition set.
+         *
+         * @param part Partition to add.
+         * @return {@code True} if partition was added, {@code false} if partition already exists.
+         */
+        private boolean add(int part) {
+            if (!parts.contains(part)) {
+                parts.add(part);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @param part Partition to remove.
+         */
+        private void remove(Integer part) {
+            parts.remove(part); // Remove object, not index.
+        }
+
+        /**
+         * @return Partitions.
+         */
+        @SuppressWarnings("TypeMayBeWeakened")
+        private Collection<Integer> partitions() {
+            return parts;
+        }
+
+        /**
+         * Checks if partition set contains given partition.
+         *
+         * @param part Partition to check.
+         * @return {@code True} if partition set contains given partition.
+         */
+        private boolean contains(int part) {
+            return parts.contains(part);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "PartSet [nodeId=" + node.id() + ", size=" + parts.size() + ", parts=" + parts + ']';
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java
deleted file mode 100644
index ac0ac5e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java
+++ /dev/null
@@ -1,501 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity.rendezvous;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.security.*;
-import java.util.*;
-
-/**
- * Affinity function for partitioned cache based on Highest Random Weight algorithm.
- * This function supports the following configuration:
- * <ul>
- * <li>
- *      {@code partitions} - Number of partitions to spread across nodes.
- * </li>
- * <li>
- *      {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors
- *      from being backups of each other. Note that {@code backupFilter} is ignored if
- *      {@code excludeNeighbors} is set to {@code true}.
- * </li>
- * <li>
- *      {@code backupFilter} - Optional filter for back up nodes. If provided, then only
- *      nodes that pass this filter will be selected as backup nodes. If not provided, then
- *      primary and backup nodes will be selected out of all nodes available for this cache.
- * </li>
- * </ul>
- * <p>
- * Cache affinity can be configured for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method.
- */
-public class CacheRendezvousAffinityFunction implements CacheAffinityFunction, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Default number of partitions. */
-    public static final int DFLT_PARTITION_COUNT = 1024;
-
-    /** Comparator. */
-    private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR =
-        new HashComparator();
-
-    /** Thread local message digest. */
-    private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
-        @Override protected MessageDigest initialValue() {
-            try {
-                return MessageDigest.getInstance("MD5");
-            }
-            catch (NoSuchAlgorithmException e) {
-                assert false : "Should have failed in constructor";
-
-                throw new IgniteException("Failed to obtain message digest (digest was available in constructor)",
-                    e);
-            }
-        }
-    };
-
-    /** Number of partitions. */
-    private int parts;
-
-    /** Exclude neighbors flag. */
-    private boolean exclNeighbors;
-
-    /** Optional backup filter. First node is primary, second node is a node being tested. */
-    private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
-
-    /** Hash ID resolver. */
-    private CacheAffinityNodeHashResolver hashIdRslvr = new CacheAffinityNodeAddressHashResolver();
-
-    /** Ignite instance. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
-    /**
-     * Empty constructor with all defaults.
-     */
-    public CacheRendezvousAffinityFunction() {
-        this(false);
-    }
-
-    /**
-     * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
-     * and specified number of backups.
-     * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
-     *
-     * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
-     *      of each other.
-     */
-    public CacheRendezvousAffinityFunction(boolean exclNeighbors) {
-        this(exclNeighbors, DFLT_PARTITION_COUNT);
-    }
-
-    /**
-     * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
-     * and specified number of backups and partitions.
-     * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
-     *
-     * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
-     *      of each other.
-     * @param parts Total number of partitions.
-     */
-    public CacheRendezvousAffinityFunction(boolean exclNeighbors, int parts) {
-        this(exclNeighbors, parts, null);
-    }
-
-    /**
-     * Initializes optional counts for replicas and backups.
-     * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
-     *
-     * @param parts Total number of partitions.
-     * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
-     *      from all nodes that pass this filter. First argument for this filter is primary node, and second
-     *      argument is node being tested.
-     * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
-     */
-    public CacheRendezvousAffinityFunction(int parts,
-                                           @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
-        this(false, parts, backupFilter);
-    }
-
-    /**
-     * Private constructor.
-     *
-     * @param exclNeighbors Exclude neighbors flag.
-     * @param parts Partitions count.
-     * @param backupFilter Backup filter.
-     */
-    private CacheRendezvousAffinityFunction(boolean exclNeighbors, int parts,
-                                            IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
-        A.ensure(parts != 0, "parts != 0");
-
-        this.exclNeighbors = exclNeighbors;
-        this.parts = parts;
-        this.backupFilter = backupFilter;
-
-        try {
-            MessageDigest.getInstance("MD5");
-        }
-        catch (NoSuchAlgorithmException e) {
-            throw new IgniteException("Failed to obtain MD5 message digest instance.", e);
-        }
-    }
-
-    /**
-     * Gets total number of key partitions. To ensure that all partitions are
-     * equally distributed across all nodes, please make sure that this
-     * number is significantly larger than a number of nodes. Also, partition
-     * size should be relatively small. Try to avoid having partitions with more
-     * than quarter million keys.
-     * <p>
-     * Note that for fully replicated caches this method should always
-     * return {@code 1}.
-     *
-     * @return Total partition count.
-     */
-    public int getPartitions() {
-        return parts;
-    }
-
-    /**
-     * Sets total number of partitions.
-     *
-     * @param parts Total number of partitions.
-     */
-    public void setPartitions(int parts) {
-        this.parts = parts;
-    }
-
-    /**
-     * Gets hash ID resolver for nodes. This resolver is used to provide
-     * alternate hash ID, other than node ID.
-     * <p>
-     * Node IDs constantly change when nodes get restarted, which causes them to
-     * be placed on different locations in the hash ring, and hence causing
-     * repartitioning. Providing an alternate hash ID, which survives node restarts,
-     * puts node on the same location on the hash ring, hence minimizing required
-     * repartitioning.
-     *
-     * @return Hash ID resolver.
-     */
-    public CacheAffinityNodeHashResolver getHashIdResolver() {
-        return hashIdRslvr;
-    }
-
-    /**
-     * Sets hash ID resolver for nodes. This resolver is used to provide
-     * alternate hash ID, other than node ID.
-     * <p>
-     * Node IDs constantly change when nodes get restarted, which causes them to
-     * be placed on different locations in the hash ring, and hence causing
-     * repartitioning. Providing an alternate hash ID, which survives node restarts,
-     * puts node on the same location on the hash ring, hence minimizing required
-     * repartitioning.
-     *
-     * @param hashIdRslvr Hash ID resolver.
-     */
-    public void setHashIdResolver(CacheAffinityNodeHashResolver hashIdRslvr) {
-        this.hashIdRslvr = hashIdRslvr;
-    }
-
-    /**
-     * Gets optional backup filter. If not {@code null}, backups will be selected
-     * from all nodes that pass this filter. First node passed to this filter is primary node,
-     * and second node is a node being tested.
-     * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
-     *
-     * @return Optional backup filter.
-     */
-    @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() {
-        return backupFilter;
-    }
-
-    /**
-     * Sets optional backup filter. If provided, then backups will be selected from all
-     * nodes that pass this filter. First node being passed to this filter is primary node,
-     * and second node is a node being tested.
-     * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
-     *
-     * @param backupFilter Optional backup filter.
-     */
-    public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
-        this.backupFilter = backupFilter;
-    }
-
-    /**
-     * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
-     * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
-     *
-     * @return {@code True} if nodes residing on the same host may not act as backups of each other.
-     */
-    public boolean isExcludeNeighbors() {
-        return exclNeighbors;
-    }
-
-    /**
-     * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
-     * <p>
-     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
-     *
-     * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
-     */
-    public void setExcludeNeighbors(boolean exclNeighbors) {
-        this.exclNeighbors = exclNeighbors;
-    }
-
-    /**
-     * Returns collection of nodes (primary first) for specified partition.
-     */
-    public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups,
-        @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
-        if (nodes.size() <= 1)
-            return nodes;
-
-        List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>();
-
-        MessageDigest d = digest.get();
-
-        for (ClusterNode node : nodes) {
-            Object nodeHash = hashIdRslvr.resolve(node);
-
-            try {
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-                byte[] nodeHashBytes = ignite.configuration().getMarshaller().marshal(nodeHash);
-
-                out.write(U.intToBytes(part), 0, 4); // Avoid IOException.
-                out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException.
-
-                d.reset();
-
-                byte[] bytes = d.digest(out.toByteArray());
-
-                long hash =
-                      (bytes[0] & 0xFFL)
-                    | ((bytes[1] & 0xFFL) << 8)
-                    | ((bytes[2] & 0xFFL) << 16)
-                    | ((bytes[3] & 0xFFL) << 24)
-                    | ((bytes[4] & 0xFFL) << 32)
-                    | ((bytes[5] & 0xFFL) << 40)
-                    | ((bytes[6] & 0xFFL) << 48)
-                    | ((bytes[7] & 0xFFL) << 56);
-
-                lst.add(F.t(hash, node));
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        Collections.sort(lst, COMPARATOR);
-
-        int primaryAndBackups;
-
-        List<ClusterNode> res;
-
-        if (backups == Integer.MAX_VALUE) {
-            primaryAndBackups = Integer.MAX_VALUE;
-
-            res = new ArrayList<>();
-        }
-        else {
-            primaryAndBackups = backups + 1;
-
-            res = new ArrayList<>(primaryAndBackups);
-        }
-
-        ClusterNode primary = lst.get(0).get2();
-
-        res.add(primary);
-
-        // Select backups.
-        if (backups > 0) {
-            for (int i = 1; i < lst.size(); i++) {
-                IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
-
-                ClusterNode node = next.get2();
-
-                if (exclNeighbors) {
-                    Collection<ClusterNode> allNeighbors = allNeighbors(neighborhoodCache, res);
-
-                    if (!allNeighbors.contains(node))
-                        res.add(node);
-                }
-                else {
-                    if (!res.contains(node) && (backupFilter == null || backupFilter.apply(primary, node)))
-                        res.add(next.get2());
-                }
-
-                if (res.size() == primaryAndBackups)
-                    break;
-            }
-        }
-
-        if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
-            // Need to iterate one more time in case if there are no nodes which pass exclude backups criteria.
-            for (int i = 1; i < lst.size(); i++) {
-                IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
-
-                ClusterNode node = next.get2();
-
-                if (!res.contains(node))
-                    res.add(next.get2());
-
-                if (res.size() == primaryAndBackups)
-                    break;
-            }
-        }
-
-        assert res.size() <= primaryAndBackups;
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void reset() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public int partitions() {
-        return parts;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int partition(Object key) {
-        return U.safeAbs(key.hashCode() % parts);
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext affCtx) {
-        List<List<ClusterNode>> assignments = new ArrayList<>(parts);
-
-        Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
-            neighbors(affCtx.currentTopologySnapshot()) : null;
-
-        for (int i = 0; i < parts; i++) {
-            List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
-                neighborhoodCache);
-
-            assignments.add(partAssignment);
-        }
-
-        return assignments;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void removeNode(UUID nodeId) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(parts);
-        out.writeBoolean(exclNeighbors);
-        out.writeObject(hashIdRslvr);
-        out.writeObject(backupFilter);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        parts = in.readInt();
-        exclNeighbors = in.readBoolean();
-        hashIdRslvr = (CacheAffinityNodeHashResolver)in.readObject();
-        backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject();
-    }
-
-    /**
-     * Builds neighborhood map for all nodes in snapshot.
-     *
-     * @param topSnapshot Topology snapshot.
-     * @return Neighbors map.
-     */
-    private Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
-        Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);
-
-        // Group by mac addresses.
-        for (ClusterNode node : topSnapshot) {
-            String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS);
-
-            Collection<ClusterNode> nodes = macMap.get(macs);
-
-            if (nodes == null) {
-                nodes = new HashSet<>();
-
-                macMap.put(macs, nodes);
-            }
-
-            nodes.add(node);
-        }
-
-        Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
-
-        for (Collection<ClusterNode> group : macMap.values()) {
-            for (ClusterNode node : group)
-                neighbors.put(node.id(), group);
-        }
-
-        return neighbors;
-    }
-
-    /**
-     * @param neighborhoodCache Neighborhood cache.
-     * @param nodes Nodes.
-     * @return All neighbors for given nodes.
-     */
-    private Collection<ClusterNode> allNeighbors(Map<UUID, Collection<ClusterNode>> neighborhoodCache,
-        Iterable<ClusterNode> nodes) {
-        Collection<ClusterNode> res = new HashSet<>();
-
-        for (ClusterNode node : nodes) {
-            if (!res.contains(node))
-                res.addAll(neighborhoodCache.get(node.id()));
-        }
-
-        return res;
-    }
-
-    /**
-     *
-     */
-    private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
-            return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
-                o1.get2().id().compareTo(o2.get2().id());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
new file mode 100644
index 0000000..2b26630
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.security.*;
+import java.util.*;
+
+/**
+ * Affinity function for partitioned cache based on Highest Random Weight algorithm.
+ * This function supports the following configuration:
+ * <ul>
+ * <li>
+ *      {@code partitions} - Number of partitions to spread across nodes.
+ * </li>
+ * <li>
+ *      {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors
+ *      from being backups of each other. Note that {@code backupFilter} is ignored if
+ *      {@code excludeNeighbors} is set to {@code true}.
+ * </li>
+ * <li>
+ *      {@code backupFilter} - Optional filter for back up nodes. If provided, then only
+ *      nodes that pass this filter will be selected as backup nodes. If not provided, then
+ *      primary and backup nodes will be selected out of all nodes available for this cache.
+ * </li>
+ * </ul>
+ * <p>
+ * Cache affinity can be configured for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method.
+ */
+public class RendezvousAffinityFunction implements AffinityFunction, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Default number of partitions. */
+    public static final int DFLT_PARTITION_COUNT = 1024;
+
+    /** Comparator. */
+    private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR =
+        new HashComparator();
+
+    /** Thread local message digest. */
+    private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
+        @Override protected MessageDigest initialValue() {
+            try {
+                return MessageDigest.getInstance("MD5");
+            }
+            catch (NoSuchAlgorithmException e) {
+                assert false : "Should have failed in constructor";
+
+                throw new IgniteException("Failed to obtain message digest (digest was available in constructor)",
+                    e);
+            }
+        }
+    };
+
+    /** Number of partitions. */
+    private int parts;
+
+    /** Exclude neighbors flag. */
+    private boolean exclNeighbors;
+
+    /** Optional backup filter. First node is primary, second node is a node being tested. */
+    private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
+
+    /** Hash ID resolver. */
+    private AffinityNodeHashResolver hashIdRslvr = new AffinityNodeAddressHashResolver();
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /**
+     * Empty constructor with all defaults.
+     */
+    public RendezvousAffinityFunction() {
+        this(false);
+    }
+
+    /**
+     * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
+     * and specified number of backups.
+     * <p>
+     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+     *
+     * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+     *      of each other.
+     */
+    public RendezvousAffinityFunction(boolean exclNeighbors) {
+        this(exclNeighbors, DFLT_PARTITION_COUNT);
+    }
+
+    /**
+     * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
+     * and specified number of backups and partitions.
+     * <p>
+     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+     *
+     * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+     *      of each other.
+     * @param parts Total number of partitions.
+     */
+    public RendezvousAffinityFunction(boolean exclNeighbors, int parts) {
+        this(exclNeighbors, parts, null);
+    }
+
+    /**
+     * Initializes optional counts for replicas and backups.
+     * <p>
+     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+     *
+     * @param parts Total number of partitions.
+     * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
+     *      from all nodes that pass this filter. First argument for this filter is primary node, and second
+     *      argument is node being tested.
+     * <p>
+     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+     */
+    public RendezvousAffinityFunction(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+        this(false, parts, backupFilter);
+    }
+
+    /**
+     * Private constructor.
+     *
+     * @param exclNeighbors Exclude neighbors flag.
+     * @param parts Partitions count.
+     * @param backupFilter Backup filter.
+     */
+    private RendezvousAffinityFunction(boolean exclNeighbors, int parts,
+        IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+        A.ensure(parts != 0, "parts != 0");
+
+        this.exclNeighbors = exclNeighbors;
+        this.parts = parts;
+        this.backupFilter = backupFilter;
+
+        try {
+            MessageDigest.getInstance("MD5");
+        }
+        catch (NoSuchAlgorithmException e) {
+            throw new IgniteException("Failed to obtain MD5 message digest instance.", e);
+        }
+    }
+
+    /**
+     * Gets total number of key partitions. To ensure that all partitions are
+     * equally distributed across all nodes, please make sure that this
+     * number is significantly larger than a number of nodes. Also, partition
+     * size should be relatively small. Try to avoid having partitions with more
+     * than quarter million keys.
+     * <p>
+     * Note that for fully replicated caches this method should always
+     * return {@code 1}.
+     *
+     * @return Total partition count.
+     */
+    public int getPartitions() {
+        return parts;
+    }
+
+    /**
+     * Sets total number of partitions.
+     *
+     * @param parts Total number of partitions.
+     */
+    public void setPartitions(int parts) {
+        this.parts = parts;
+    }
+
+    /**
+     * Gets hash ID resolver for nodes. This resolver is used to provide
+     * alternate hash ID, other than node ID.
+     * <p>
+     * Node IDs constantly change when nodes get restarted, which causes them to
+     * be placed on different locations in the hash ring, and hence causing
+     * repartitioning. Providing an alternate hash ID, which survives node restarts,
+     * puts node on the same location on the hash ring, hence minimizing required
+     * repartitioning.
+     *
+     * @return Hash ID resolver.
+     */
+    public AffinityNodeHashResolver getHashIdResolver() {
+        return hashIdRslvr;
+    }
+
+    /**
+     * Sets hash ID resolver for nodes. This resolver is used to provide
+     * alternate hash ID, other than node ID.
+     * <p>
+     * Node IDs constantly change when nodes get restarted, which causes them to
+     * be placed on different locations in the hash ring, and hence causing
+     * repartitioning. Providing an alternate hash ID, which survives node restarts,
+     * puts node on the same location on the hash ring, hence minimizing required
+     * repartitioning.
+     *
+     * @param hashIdRslvr Hash ID resolver.
+     */
+    public void setHashIdResolver(AffinityNodeHashResolver hashIdRslvr) {
+        this.hashIdRslvr = hashIdRslvr;
+    }
+
+    /**
+     * Gets optional backup filter. If not {@code null}, backups will be selected
+     * from all nodes that pass this filter. First node passed to this filter is primary node,
+     * and second node is a node being tested.
+     * <p>
+     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+     *
+     * @return Optional backup filter.
+     */
+    @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() {
+        return backupFilter;
+    }
+
+    /**
+     * Sets optional backup filter. If provided, then backups will be selected from all
+     * nodes that pass this filter. First node being passed to this filter is primary node,
+     * and second node is a node being tested.
+     * <p>
+     * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+     *
+     * @param backupFilter Optional backup filter.
+     */
+    public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+        this.backupFilter = backupFilter;
+    }
+
+    /**
+     * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+     * <p>
+     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+     *
+     * @return {@code True} if nodes residing on the same host may not act as backups of each other.
+     */
+    public boolean isExcludeNeighbors() {
+        return exclNeighbors;
+    }
+
+    /**
+     * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+     * <p>
+     * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+     *
+     * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
+     */
+    public void setExcludeNeighbors(boolean exclNeighbors) {
+        this.exclNeighbors = exclNeighbors;
+    }
+
+    /**
+     * Returns collection of nodes (primary first) for specified partition.
+     */
+    public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups,
+        @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
+        if (nodes.size() <= 1)
+            return nodes;
+
+        List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>();
+
+        MessageDigest d = digest.get();
+
+        for (ClusterNode node : nodes) {
+            Object nodeHash = hashIdRslvr.resolve(node);
+
+            try {
+                ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+                byte[] nodeHashBytes = ignite.configuration().getMarshaller().marshal(nodeHash);
+
+                out.write(U.intToBytes(part), 0, 4); // Avoid IOException.
+                out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException.
+
+                d.reset();
+
+                byte[] bytes = d.digest(out.toByteArray());
+
+                long hash =
+                      (bytes[0] & 0xFFL)
+                    | ((bytes[1] & 0xFFL) << 8)
+                    | ((bytes[2] & 0xFFL) << 16)
+                    | ((bytes[3] & 0xFFL) << 24)
+                    | ((bytes[4] & 0xFFL) << 32)
+                    | ((bytes[5] & 0xFFL) << 40)
+                    | ((bytes[6] & 0xFFL) << 48)
+                    | ((bytes[7] & 0xFFL) << 56);
+
+                lst.add(F.t(hash, node));
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        Collections.sort(lst, COMPARATOR);
+
+        int primaryAndBackups;
+
+        List<ClusterNode> res;
+
+        if (backups == Integer.MAX_VALUE) {
+            primaryAndBackups = Integer.MAX_VALUE;
+
+            res = new ArrayList<>();
+        }
+        else {
+            primaryAndBackups = backups + 1;
+
+            res = new ArrayList<>(primaryAndBackups);
+        }
+
+        ClusterNode primary = lst.get(0).get2();
+
+        res.add(primary);
+
+        // Select backups.
+        if (backups > 0) {
+            for (int i = 1; i < lst.size(); i++) {
+                IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+
+                ClusterNode node = next.get2();
+
+                if (exclNeighbors) {
+                    Collection<ClusterNode> allNeighbors = allNeighbors(neighborhoodCache, res);
+
+                    if (!allNeighbors.contains(node))
+                        res.add(node);
+                }
+                else {
+                    if (!res.contains(node) && (backupFilter == null || backupFilter.apply(primary, node)))
+                        res.add(next.get2());
+                }
+
+                if (res.size() == primaryAndBackups)
+                    break;
+            }
+        }
+
+        if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
+            // Need to iterate one more time in case if there are no nodes which pass exclude backups criteria.
+            for (int i = 1; i < lst.size(); i++) {
+                IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+
+                ClusterNode node = next.get2();
+
+                if (!res.contains(node))
+                    res.add(next.get2());
+
+                if (res.size() == primaryAndBackups)
+                    break;
+            }
+        }
+
+        assert res.size() <= primaryAndBackups;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partitions() {
+        return parts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition(Object key) {
+        return U.safeAbs(key.hashCode() % parts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+        List<List<ClusterNode>> assignments = new ArrayList<>(parts);
+
+        Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
+            neighbors(affCtx.currentTopologySnapshot()) : null;
+
+        for (int i = 0; i < parts; i++) {
+            List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
+                neighborhoodCache);
+
+            assignments.add(partAssignment);
+        }
+
+        return assignments;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeNode(UUID nodeId) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(parts);
+        out.writeBoolean(exclNeighbors);
+        out.writeObject(hashIdRslvr);
+        out.writeObject(backupFilter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        parts = in.readInt();
+        exclNeighbors = in.readBoolean();
+        hashIdRslvr = (AffinityNodeHashResolver)in.readObject();
+        backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject();
+    }
+
+    /**
+     * Builds neighborhood map for all nodes in snapshot.
+     *
+     * @param topSnapshot Topology snapshot.
+     * @return Neighbors map.
+     */
+    private Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
+        Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);
+
+        // Group by mac addresses.
+        for (ClusterNode node : topSnapshot) {
+            String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS);
+
+            Collection<ClusterNode> nodes = macMap.get(macs);
+
+            if (nodes == null) {
+                nodes = new HashSet<>();
+
+                macMap.put(macs, nodes);
+            }
+
+            nodes.add(node);
+        }
+
+        Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
+
+        for (Collection<ClusterNode> group : macMap.values()) {
+            for (ClusterNode node : group)
+                neighbors.put(node.id(), group);
+        }
+
+        return neighbors;
+    }
+
+    /**
+     * @param neighborhoodCache Neighborhood cache.
+     * @param nodes Nodes.
+     * @return All neighbors for given nodes.
+     */
+    private Collection<ClusterNode> allNeighbors(Map<UUID, Collection<ClusterNode>> neighborhoodCache,
+        Iterable<ClusterNode> nodes) {
+        Collection<ClusterNode> res = new HashSet<>();
+
+        for (ClusterNode node : nodes) {
+            if (!res.contains(node))
+                res.addAll(neighborhoodCache.get(node.id()));
+        }
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
+            return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
+                o1.get2().id().compareTo(o2.get2().id());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictableEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictableEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictableEntry.java
deleted file mode 100644
index 568db12..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictableEntry.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.eviction;
-
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-
-/**
- * Evictable cache entry passed into {@link CacheEvictionPolicy}.
- *
- * @author @java.author
- * @version @java.version
- */
-public interface CacheEvictableEntry<K, V> extends Cache.Entry<K, V> {
-    /**
-     * Evicts entry associated with given key from cache. Note, that entry will be evicted
-     * only if it's not used (not participating in any locks or transactions).
-     *
-     * @return {@code True} if entry could be evicted, {@code false} otherwise.
-     */
-    public boolean evict();
-
-    /**
-     * Checks whether entry is currently present in cache or not. If entry is not in
-     * cache (e.g. has been removed) {@code false} is returned. In this case all
-     * operations on this entry will cause creation of a new entry in cache.
-     *
-     * @return {@code True} if entry is in cache, {@code false} otherwise.
-     */
-    public boolean isCached();
-
-    /**
-     * Gets metadata added by eviction policy.
-     *
-     * @return Metadata value or {@code null}.
-     */
-    @Nullable public <T> T meta();
-
-    /**
-     * Adds a new metadata.
-     *
-     * @param val Metadata value.
-     * @return Metadata previously added, or
-     *      {@code null} if there was none.
-     */
-    @Nullable public <T> T addMeta(T val);
-
-    /**
-     * Adds given metadata value only if it was absent.
-     *
-     * @param val Value to add if it's not attached already.
-     * @return {@code null} if new value was put, or current value if put didn't happen.
-     */
-    @Nullable public <T> T putMetaIfAbsent(T val);
-
-    /**
-     * Replaces given metadata with new {@code newVal} value only if its current value
-     * is equal to {@code curVal}. Otherwise, it is no-op.
-     *
-     * @param curVal Current value to check.
-     * @param newVal New value.
-     * @return {@code true} if replacement occurred, {@code false} otherwise.
-     */
-    public <T> boolean replaceMeta(T curVal, T newVal);
-
-    /**
-     * Removes metadata by name.
-     *
-     * @return Value of removed metadata or {@code null}.
-     */
-    @Nullable public <T> T removeMeta();
-
-    /**
-     * Removes metadata only if its current value is equal to {@code val} passed in.
-     *
-     * @param val Value to compare.
-     * @return {@code True} if value was removed, {@code false} otherwise.
-     */
-    public <T> boolean removeMeta(T val);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionFilter.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionFilter.java
deleted file mode 100644
index 10f63ee..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionFilter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.eviction;
-
-import org.apache.ignite.configuration.CacheConfiguration;
-
-import javax.cache.*;
-import java.io.*;
-
-/**
- * Eviction filter to specify which entries should not be evicted. Not applicable when
- * calling explicit evict via {@link CacheEvictableEntry#evict()}.
- * If {@link #evictAllowed(Cache.Entry)} method returns {@code false} then eviction
- * policy will not be notified and entry will never be evicted.
- * <p>
- * Eviction filter can be configured via {@link CacheConfiguration#getEvictionFilter()}
- * configuration property. Default value is {@code null} which means that all
- * cache entries will be tracked by eviction policy.
- */
-public interface CacheEvictionFilter<K, V> extends Serializable {
-    /**
-     * Checks if entry may be evicted from cache.
-     *
-     * @param entry Cache entry.
-     * @return {@code True} if it is allowed to evict this entry.
-     */
-    public boolean evictAllowed(Cache.Entry<K, V> entry);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionPolicy.java
deleted file mode 100644
index b5f341a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionPolicy.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.eviction;
-
-/**
- * Pluggable cache eviction policy. Usually, implementations will internally order
- * cache entries based on {@link #onEntryAccessed(boolean, CacheEvictableEntry)} notifications and
- * whenever an element needs to be evicted, {@link CacheEvictableEntry#evict()}
- * method should be called.
- * <p>
- * Ignite comes with following eviction policies out-of-the-box:
- * <ul>
- * <li>{@link org.apache.ignite.cache.eviction.lru.CacheLruEvictionPolicy}</li>
- * <li>{@link org.apache.ignite.cache.eviction.random.CacheRandomEvictionPolicy}</li>
- * <li>{@link org.apache.ignite.cache.eviction.fifo.CacheFifoEvictionPolicy}</li>
- * </ul>
- * <p>
- * The eviction policy thread-safety is ensured by Ignition. Implementations of this interface should
- * not worry about concurrency and should be implemented as they were only accessed from one thread.
- * <p>
- * Note that implementations of all eviction policies provided by Ignite are very
- * light weight in a way that they are all lock-free (or very close to it), and do not
- * create any internal tables, arrays, or other expensive structures.
- * The eviction order is preserved by attaching light-weight meta-data to existing
- * cache entries.
- */
-public interface CacheEvictionPolicy<K, V> {
-    /**
-     * Callback for whenever entry is accessed.
-     *
-     * @param rmv {@code True} if entry has been removed, {@code false} otherwise.
-     * @param entry Accessed entry.
-     */
-    public void onEntryAccessed(boolean rmv, CacheEvictableEntry<K, V> entry);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java
new file mode 100644
index 0000000..d87109f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.eviction;
+
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+
+/**
+ * Evictable cache entry passed into {@link EvictionPolicy}.
+ *
+ * @author @java.author
+ * @version @java.version
+ */
+public interface EvictableEntry<K, V> extends Cache.Entry<K, V> {
+    /**
+     * Evicts entry associated with given key from cache. Note, that entry will be evicted
+     * only if it's not used (not participating in any locks or transactions).
+     *
+     * @return {@code True} if entry could be evicted, {@code false} otherwise.
+     */
+    public boolean evict();
+
+    /**
+     * Checks whether entry is currently present in cache or not. If entry is not in
+     * cache (e.g. has been removed) {@code false} is returned. In this case all
+     * operations on this entry will cause creation of a new entry in cache.
+     *
+     * @return {@code True} if entry is in cache, {@code false} otherwise.
+     */
+    public boolean isCached();
+
+    /**
+     * Gets metadata added by eviction policy.
+     *
+     * @return Metadata value or {@code null}.
+     */
+    @Nullable public <T> T meta();
+
+    /**
+     * Adds a new metadata.
+     *
+     * @param val Metadata value.
+     * @return Metadata previously added, or
+     *      {@code null} if there was none.
+     */
+    @Nullable public <T> T addMeta(T val);
+
+    /**
+     * Adds given metadata value only if it was absent.
+     *
+     * @param val Value to add if it's not attached already.
+     * @return {@code null} if new value was put, or current value if put didn't happen.
+     */
+    @Nullable public <T> T putMetaIfAbsent(T val);
+
+    /**
+     * Replaces given metadata with new {@code newVal} value only if its current value
+     * is equal to {@code curVal}. Otherwise, it is no-op.
+     *
+     * @param curVal Current value to check.
+     * @param newVal New value.
+     * @return {@code true} if replacement occurred, {@code false} otherwise.
+     */
+    public <T> boolean replaceMeta(T curVal, T newVal);
+
+    /**
+     * Removes metadata by name.
+     *
+     * @return Value of removed metadata or {@code null}.
+     */
+    @Nullable public <T> T removeMeta();
+
+    /**
+     * Removes metadata only if its current value is equal to {@code val} passed in.
+     *
+     * @param val Value to compare.
+     * @return {@code True} if value was removed, {@code false} otherwise.
+     */
+    public <T> boolean removeMeta(T val);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionFilter.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionFilter.java
new file mode 100644
index 0000000..2b6ead1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionFilter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.eviction;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+
+import javax.cache.*;
+import java.io.*;
+
+/**
+ * Eviction filter to specify which entries should not be evicted. Not applicable when
+ * calling explicit evict via {@link EvictableEntry#evict()}.
+ * If {@link #evictAllowed(Cache.Entry)} method returns {@code false} then eviction
+ * policy will not be notified and entry will never be evicted.
+ * <p>
+ * Eviction filter can be configured via {@link CacheConfiguration#getEvictionFilter()}
+ * configuration property. Default value is {@code null} which means that all
+ * cache entries will be tracked by eviction policy.
+ */
+public interface EvictionFilter<K, V> extends Serializable {
+    /**
+     * Checks if entry may be evicted from cache.
+     *
+     * @param entry Cache entry.
+     * @return {@code True} if it is allowed to evict this entry.
+     */
+    public boolean evictAllowed(Cache.Entry<K, V> entry);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java
new file mode 100644
index 0000000..f409e9b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.eviction;
+
+import org.apache.ignite.cache.eviction.fifo.*;
+import org.apache.ignite.cache.eviction.lru.*;
+import org.apache.ignite.cache.eviction.random.*;
+
+/**
+ * Pluggable cache eviction policy. Usually, implementations will internally order
+ * cache entries based on {@link #onEntryAccessed(boolean, EvictableEntry)} notifications and
+ * whenever an element needs to be evicted, {@link EvictableEntry#evict()}
+ * method should be called.
+ * <p>
+ * Ignite comes with following eviction policies out-of-the-box:
+ * <ul>
+ * <li>{@link LruEvictionPolicy}</li>
+ * <li>{@link RandomEvictionPolicy}</li>
+ * <li>{@link FifoEvictionPolicy}</li>
+ * </ul>
+ * <p>
+ * The eviction policy thread-safety is ensured by Ignition. Implementations of this interface should
+ * not worry about concurrency and should be implemented as they were only accessed from one thread.
+ * <p>
+ * Note that implementations of all eviction policies provided by Ignite are very
+ * light weight in a way that they are all lock-free (or very close to it), and do not
+ * create any internal tables, arrays, or other expensive structures.
+ * The eviction order is preserved by attaching light-weight meta-data to existing
+ * cache entries.
+ */
+public interface EvictionPolicy<K, V> {
+    /**
+     * Callback for whenever entry is accessed.
+     *
+     * @param rmv {@code True} if entry has been removed, {@code false} otherwise.
+     * @param entry Accessed entry.
+     */
+    public void onEntryAccessed(boolean rmv, EvictableEntry<K, V> entry);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java
deleted file mode 100644
index 5fa5d82..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.eviction.fifo;
-
-import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jsr166.*;
-import org.jsr166.ConcurrentLinkedDeque8.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Eviction policy based on {@code First In First Out (FIFO)} algorithm. This
- * implementation is very efficient since it does not create any additional
- * table-like data structures. The {@code FIFO} ordering information is
- * maintained by attaching ordering metadata to cache entries.
- */
-public class CacheFifoEvictionPolicy<K, V> implements CacheEvictionPolicy<K, V>,
-    CacheFifoEvictionPolicyMBean, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Maximum size. */
-    private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE;
-
-    /** FIFO queue. */
-    private final ConcurrentLinkedDeque8<CacheEvictableEntry<K, V>> queue =
-        new ConcurrentLinkedDeque8<>();
-
-    /**
-     * Constructs FIFO eviction policy with all defaults.
-     */
-    public CacheFifoEvictionPolicy() {
-        // No-op.
-    }
-
-    /**
-     * Constructs FIFO eviction policy with maximum size. Empty entries are allowed.
-     *
-     * @param max Maximum allowed size of cache before entry will start getting evicted.
-     */
-    public CacheFifoEvictionPolicy(int max) {
-        A.ensure(max > 0, "max > 0");
-
-        this.max = max;
-    }
-
-    /**
-     * Gets maximum allowed size of cache before entry will start getting evicted.
-     *
-     * @return Maximum allowed size of cache before entry will start getting evicted.
-     */
-    @Override public int getMaxSize() {
-        return max;
-    }
-
-    /**
-     * Sets maximum allowed size of cache before entry will start getting evicted.
-     *
-     * @param max Maximum allowed size of cache before entry will start getting evicted.
-     */
-    @Override public void setMaxSize(int max) {
-        A.ensure(max > 0, "max > 0");
-
-        this.max = max;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getCurrentSize() {
-        return queue.size();
-    }
-
-    /**
-     * Gets read-only view on internal {@code FIFO} queue in proper order.
-     *
-     * @return Read-only view ono internal {@code 'FIFO'} queue.
-     */
-    public Collection<CacheEvictableEntry<K, V>> queue() {
-        return Collections.unmodifiableCollection(queue);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onEntryAccessed(boolean rmv, CacheEvictableEntry<K, V> entry) {
-        if (!rmv) {
-            if (!entry.isCached())
-                return;
-
-            // Shrink only if queue was changed.
-            if (touch(entry))
-                shrink();
-        }
-        else {
-            Node<CacheEvictableEntry<K, V>> node = entry.removeMeta();
-
-            if (node != null)
-                queue.unlinkx(node);
-        }
-    }
-
-    /**
-     * @param entry Entry to touch.
-     * @return {@code True} if queue has been changed by this call.
-     */
-    private boolean touch(CacheEvictableEntry<K, V> entry) {
-        Node<CacheEvictableEntry<K, V>> node = entry.meta();
-
-        // Entry has not been enqueued yet.
-        if (node == null) {
-            while (true) {
-                node = queue.offerLastx(entry);
-
-                if (entry.putMetaIfAbsent(node) != null) {
-                    // Was concurrently added, need to clear it from queue.
-                    queue.unlinkx(node);
-
-                    // Queue has not been changed.
-                    return false;
-                }
-                else if (node.item() != null) {
-                    if (!entry.isCached()) {
-                        // Was concurrently evicted, need to clear it from queue.
-                        queue.unlinkx(node);
-
-                        return false;
-                    }
-
-                    return true;
-                }
-                // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle.
-                else if (!entry.removeMeta(node))
-                    return false;
-            }
-        }
-
-        // Entry is already in queue.
-        return false;
-    }
-
-    /**
-     * Shrinks FIFO queue to maximum allowed size.
-     */
-    private void shrink() {
-        int max = this.max;
-
-        int startSize = queue.sizex();
-
-        for (int i = 0; i < startSize && queue.sizex() > max; i++) {
-            CacheEvictableEntry<K, V> entry = queue.poll();
-
-            if (entry == null)
-                break;
-
-            if (!entry.evict()) {
-                entry.removeMeta();
-
-                touch(entry);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(max);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        max = in.readInt();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CacheFifoEvictionPolicy.class, this);
-    }
-}


Mime
View raw message