ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dma...@apache.org
Subject [7/8] ignite git commit: IGNITE-2655: AffinityFunction: primary and backup copies in different locations Reviewed and merged by Denis Magda (dmagda@gridgain.com)
Date Thu, 02 Jun 2016 13:26:12 GMT
IGNITE-2655: AffinityFunction: primary and backup copies in different locations
Reviewed and merged by Denis Magda (dmagda@gridgain.com)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f175d3c6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f175d3c6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f175d3c6

Branch: refs/heads/master
Commit: f175d3c670025bd619ec347dba2a5c5f68f4cc32
Parents: 3ca9feb
Author: Vladislav Pyatkov <vldpyatkov@gmail.com>
Authored: Thu Jun 2 16:14:10 2016 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Thu Jun 2 16:14:10 2016 +0300

----------------------------------------------------------------------
 .../affinity/fair/FairAffinityFunction.java     |  81 +++++++++++-
 .../rendezvous/RendezvousAffinityFunction.java  |  39 +++++-
 ...ityFunctionBackupFilterAbstractSelfTest.java | 131 ++++++++++++++++++-
 ...airAffinityFunctionBackupFilterSelfTest.java |   9 ++
 ...ousAffinityFunctionBackupFilterSelfTest.java |   9 ++
 5 files changed, 260 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f175d3c6/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
index b42b683..b6b14ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
@@ -102,6 +102,9 @@ public class FairAffinityFunction implements AffinityFunction {
     /** Optional backup filter. First node is primary, second node is a node being tested.
*/
     private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
 
+    /** Optional affinity backups filter. The first node is a node being tested, the second
is a list of nodes that are already assigned for a given partition (primary node is the first
in the list). */
+    private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter;
+
     /**
      * Empty constructor with all defaults.
      */
@@ -220,12 +223,40 @@ public class FairAffinityFunction implements AffinityFunction {
      * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code
true}.
      *
      * @param backupFilter Optional backup filter.
+     * @deprecated Use {@code affinityBackupFilter} instead.
      */
+    @Deprecated
     public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode>
backupFilter) {
         this.backupFilter = backupFilter;
     }
 
     /**
+     * Gets optional backup filter. If not {@code null}, backups will be selected
+     * from all nodes that pass this filter. First node passed to this filter is a node being
tested,
+     * and the second parameter is a list of nodes that are already assigned for a given
partition (primary node is the first in the list).
+     * <p>
+     * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set
to {@code true}.
+     *
+     * @return Optional backup filter.
+     */
+    @Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter()
{
+        return affinityBackupFilter;
+    }
+
+    /**
+     * Sets optional backup filter. If provided, then backups will be selected from all
+     * nodes that pass this filter. First node being passed to this filter is a node being
tested,
+     * and the second parameter is a list of nodes that are already assigned for a given
partition (primary node is the first in the list).
+     * <p>
+     * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set
to {@code true}.
+     *
+     * @param affinityBackupFilter Optional backup filter.
+     */
+    public void setAffinityBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, List<ClusterNode>>
affinityBackupFilter) {
+        this.affinityBackupFilter = affinityBackupFilter;
+    }
+
+    /**
      * Checks flag to exclude same-host-neighbors from being backups of each other (default
is {@code false}).
      * <p>
      * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code
true}.
@@ -865,6 +896,7 @@ public class FairAffinityFunction implements AffinityFunction {
          * @param tier Tier.
          * @param node Node.
          * @param allowNeighbors Allow neighbors.
+         * @return {@code true} if the partition is assignable to the node.
          */
         private boolean isAssignable(int part, int tier, final ClusterNode node, boolean
allowNeighbors) {
             if (containsPartition(part, node))
@@ -872,9 +904,50 @@ public class FairAffinityFunction implements AffinityFunction {
 
             if (exclNeighbors)
                 return allowNeighbors || !neighborsContainPartition(node, part);
-            else if (backupFilter == null)
-                return true;
-            else {
+            else if (affinityBackupFilter != null) {
+                List<ClusterNode> assigment = assignments.get(part);
+
+                assert assigment.size() > 0;
+
+                List<ClusterNode> newAssignment;
+
+                if (tier == 0) {
+                    for (int t = 1; t < assigment.size(); t++) {
+                        newAssignment = new ArrayList<>(assigment.size() - 1);
+
+                        newAssignment.add(node);
+
+                        if (t != 1)
+                            newAssignment.addAll(assigment.subList(1, t));
+
+                        if (t + 1 < assigment.size())
+                            newAssignment.addAll(assigment.subList(t + 1, assigment.size()));
+
+                        if (!affinityBackupFilter.apply(assigment.get(t), newAssignment))
+                            return false;
+
+                    }
+
+                    return true;
+                }
+                else if (tier < assigment.size()) {
+                    newAssignment = new ArrayList<>(assigment.size() - 1);
+
+                    int i = 0;
+
+                    for (ClusterNode assignmentNode: assigment) {
+                        if (i != tier)
+                            newAssignment.add(assignmentNode);
+
+                        i++;
+                    }
+                }
+                else
+                    newAssignment = assigment;
+
+                return affinityBackupFilter.apply(node, newAssignment);
+            }
+            else if (backupFilter != null) {
                 if (tier == 0) {
                     List<ClusterNode> assigment = assignments.get(part);
 
@@ -891,6 +964,8 @@ public class FairAffinityFunction implements AffinityFunction {
                 else
                     return (backupFilter.apply(assignments.get(part).get(0), node));
             }
+            else
+                return true;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f175d3c6/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index 37258d4..990eba1 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -111,6 +111,11 @@ public class RendezvousAffinityFunction implements AffinityFunction,
Externaliza
     /** Optional backup filter. First node is primary, second node is a node being tested.
*/
     private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
 
+    /** Optional affinity backups filter. The first node is a node being tested,
+     *  the second is a list of nodes that are already assigned for a given partition (the
first node in the list
+     *  is primary). */
+    private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter;
+
     /** Hash ID resolver. */
     private AffinityNodeHashResolver hashIdRslvr = null;
 
@@ -277,12 +282,40 @@ public class RendezvousAffinityFunction implements AffinityFunction,
Externaliza
      * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code
true}.
      *
      * @param backupFilter Optional backup filter.
+     * @deprecated Use {@code affinityBackupFilter} instead.
      */
+    @Deprecated
     public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode>
backupFilter) {
         this.backupFilter = backupFilter;
     }
 
     /**
+     * Gets optional backup filter. If not {@code null}, backups will be selected
+     * from all nodes that pass this filter. First node passed to this filter is a node being
tested,
+     * and the second parameter is a list of nodes that are already assigned for a given
partition (primary node is the first in the list).
+     * <p>
+     * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set
to {@code true}.
+     *
+     * @return Optional backup filter.
+     */
+    @Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter()
{
+        return affinityBackupFilter;
+    }
+
+    /**
+     * Sets optional backup filter. If provided, then backups will be selected from all
+     * nodes that pass this filter. First node being passed to this filter is a node being
tested,
+     * and the second parameter is a list of nodes that are already assigned for a given
partition (primary node is the first in the list).
+     * <p>
+     * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set
to {@code true}.
+     *
+     * @param affinityBackupFilter Optional backup filter.
+     */
+    public void setAffinityBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, List<ClusterNode>>
affinityBackupFilter) {
+        this.affinityBackupFilter = affinityBackupFilter;
+    }
+
+    /**
      * Checks flag to exclude same-host-neighbors from being backups of each other (default
is {@code false}).
      * <p>
      * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code
true}.
@@ -384,7 +417,11 @@ public class RendezvousAffinityFunction implements AffinityFunction,
Externaliza
                     if (!allNeighbors.contains(node))
                         res.add(node);
                 }
-                else if (backupFilter == null || backupFilter.apply(primary, node))
+                else if (affinityBackupFilter != null && affinityBackupFilter.apply(node,
res))
+                    res.add(next.get2());
+                else if (backupFilter != null && backupFilter.apply(primary, node))
+                    res.add(next.get2());
+                else if (affinityBackupFilter == null && backupFilter == null)
                     res.add(next.get2());
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f175d3c6/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
index 3bf41c1..f01f5d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
@@ -18,6 +18,10 @@
 package org.apache.ignite.cache.affinity;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
@@ -29,6 +33,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -41,15 +46,18 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends
GridC
     /** Ip finder. */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
-    /** Backup count. */
-    private static final int BACKUPS = 1;
-
     /** Split attribute name. */
     private static final String SPLIT_ATTRIBUTE_NAME = "split-attribute";
 
     /** Split attribute value. */
     private String splitAttrVal;
 
+    /** Attribute value for first node group. */
+    public static final String FIRST_NODE_GROUP = "A";
+
+    /** Backup count. */
+    private int backups = 1;
+
     /** Test backup filter. */
     protected static final IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter
=
         new IgniteBiPredicate<ClusterNode, ClusterNode>() {
@@ -61,13 +69,63 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends
GridC
             }
         };
 
+    /** Test backup filter. */
+    protected static final IgniteBiPredicate<ClusterNode, List<ClusterNode>>
affinityBackupFilter =
+        new IgniteBiPredicate<ClusterNode, List<ClusterNode>>() {
+            @Override public boolean apply(ClusterNode node, List<ClusterNode> assigned)
{
+                assert node != null : "primary is null";
+                assert assigned != null : "backup is null";
+
+                Map<String, Integer> backupAssignedAttribute = getAttributeStatistic(assigned);
+
+                String nodeAttributeValue = node.attribute(SPLIT_ATTRIBUTE_NAME);
+
+                if (FIRST_NODE_GROUP.equals(nodeAttributeValue)
+                    && backupAssignedAttribute.get(FIRST_NODE_GROUP) < 2)
+                    return true;
+
+                return backupAssignedAttribute.get(nodeAttributeValue).equals(0);
+            }
+        };
+
+    /**
+     * @param nodes List of cluster nodes.
+     * @return Statistic.
+     */
+    @NotNull private static Map<String, Integer> getAttributeStatistic(Collection<ClusterNode>
nodes) {
+        Map<String, Integer> backupAssignedAttribute = new HashMap<>();
+
+        backupAssignedAttribute.put(FIRST_NODE_GROUP, 0);
+
+        backupAssignedAttribute.put("B", 0);
+
+        backupAssignedAttribute.put("C", 0);
+
+        for (ClusterNode assignedNode: nodes) {
+            if (assignedNode == null)
+                continue;
+
+            String val = assignedNode.attribute(SPLIT_ATTRIBUTE_NAME);
+
+            Integer count = backupAssignedAttribute.get(val);
+
+            backupAssignedAttribute.put(val, count + 1);
+        }
+        return backupAssignedAttribute;
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         CacheConfiguration cacheCfg = defaultCacheConfiguration();
 
         cacheCfg.setCacheMode(PARTITIONED);
-        cacheCfg.setBackups(BACKUPS);
-        cacheCfg.setAffinity(affinityFunction());
+        cacheCfg.setBackups(backups);
+
+        if (backups < 2)
+            cacheCfg.setAffinity(affinityFunction());
+        else
+            cacheCfg.setAffinity(affinityFunctionWithAffinityBackupFilter());
+
         cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
         cacheCfg.setRebalanceMode(SYNC);
         cacheCfg.setAtomicityMode(TRANSACTIONAL);
@@ -90,9 +148,15 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends
GridC
     protected abstract AffinityFunction affinityFunction();
 
     /**
+     * @return Affinity function for test.
+     */
+    protected abstract AffinityFunction affinityFunctionWithAffinityBackupFilter();
+
+    /**
      * @throws Exception If failed.
      */
     public void testPartitionDistribution() throws Exception {
+        backups = 1;
         try {
             for (int i = 0; i < 3; i++) {
                 splitAttrVal = "A";
@@ -135,4 +199,61 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends
GridC
             assertFalse(F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME)));
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionDistributionWithAffinityBackupFilter() throws Exception {
+        backups = 3;
+        try {
+            for (int i = 0; i < 2; i++) {
+                splitAttrVal = FIRST_NODE_GROUP;
+
+                startGrid(4 * i);
+
+                startGrid(4 * i + 3);
+
+                splitAttrVal = "B";
+
+                startGrid(4 * i + 1);
+
+                splitAttrVal = "C";
+
+                startGrid(4 * i + 2);
+
+                awaitPartitionMapExchange();
+
+                checkPartitionsWithAffinityBackupFilter();
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private void checkPartitionsWithAffinityBackupFilter() throws Exception {
+        AffinityFunction aff = cacheConfiguration(grid(0).configuration(), null).getAffinity();
+
+        int partCnt = aff.partitions();
+
+        IgniteCache<Object, Object> cache = grid(0).cache(null);
+
+        for (int i = 0; i < partCnt; i++) {
+            Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i);
+
+            assertEquals(backups + 1, nodes.size());
+
+            Map<String, Integer> stat = getAttributeStatistic(nodes);
+
+            assertEquals(stat.get(FIRST_NODE_GROUP), new Integer(2));
+
+            assertEquals(stat.get("B"), new Integer(1));
+
+            assertEquals(stat.get("C"), new Integer(1));
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f175d3c6/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java
index eedc9e4..7fddf30 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java
@@ -32,4 +32,13 @@ public class FairAffinityFunctionBackupFilterSelfTest extends AffinityFunctionBa
 
         return aff;
     }
+
+    /** {@inheritDoc} */
+    @Override protected AffinityFunction affinityFunctionWithAffinityBackupFilter() {
+        FairAffinityFunction aff = new FairAffinityFunction(false);
+
+        aff.setAffinityBackupFilter(affinityBackupFilter);
+
+        return aff;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f175d3c6/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java
index d5d8b8f..a78c383 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java
@@ -32,4 +32,13 @@ public class RendezvousAffinityFunctionBackupFilterSelfTest extends AffinityFunc
 
         return aff;
     }
+
+    /** {@inheritDoc} */
+    @Override protected AffinityFunction affinityFunctionWithAffinityBackupFilter() {
+        RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false);
+
+        aff.setAffinityBackupFilter(affinityBackupFilter);
+
+        return aff;
+    }
 }


Mime
View raw message