helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: HELIX-149: Allow clients to pass in preferred placement strategies
Date Thu, 01 Aug 2013 20:31:37 GMT
Updated Branches:
  refs/heads/master 9543a600e -> edd4db8b7


HELIX-149: Allow clients to pass in preferred placement strategies


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/edd4db8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/edd4db8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/edd4db8b

Branch: refs/heads/master
Commit: edd4db8b7a61e6f711938e475e28e11678ed4d40
Parents: 9543a60
Author: zzhang <zzhang5@uci.edu>
Authored: Thu Aug 1 13:31:32 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Thu Aug 1 13:31:32 2013 -0700

----------------------------------------------------------------------
 .../strategy/AutoRebalanceStrategy.java         | 90 +++++++++++++++-----
 .../strategy/TestAutoRebalanceStrategy.java     | 16 ++--
 2 files changed, 79 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/edd4db8b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 9a998eb..23b451f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -44,7 +44,6 @@ import org.apache.helix.model.IdealState.IdealStateModeProperty;
 import org.apache.log4j.Logger;
 
 public class AutoRebalanceStrategy implements Rebalancer {
-  @SuppressWarnings("unused")
   // These should be final, but are initialized in init rather than a constructor
   private HelixManager _manager;
   private AutoRebalanceModeAlgorithm _algorithm;
@@ -82,9 +81,10 @@ public class AutoRebalanceStrategy implements Rebalancer {
       LOG.info("allNodes: " + allNodes);
       LOG.info("maxPartition: " + maxPartition);
     }
-
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    placementScheme.init(_manager);
     _algorithm = new AutoRebalanceModeAlgorithm(resourceName, partitions, stateCountMap,
-        maxPartition);
+        maxPartition, placementScheme);
     ZNRecord newMapping = _algorithm.computePartitionAssignment(liveNodes,currentMapping,
         allNodes);
 
@@ -107,6 +107,7 @@ public class AutoRebalanceStrategy implements Rebalancer {
     private final List<String> _partitions;
     private final LinkedHashMap<String, Integer> _states;
     private final int _maximumPerNode;
+    private final ReplicaPlacementScheme _placementScheme;
 
     private Map<String, Node> _nodeMap;
     private List<Node> _liveNodesList;
@@ -116,16 +117,22 @@ public class AutoRebalanceStrategy implements Rebalancer {
     private Set<Replica> _orphaned;
 
     public AutoRebalanceModeAlgorithm(String resourceName, final List<String> partitions,
-        final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+        final LinkedHashMap<String, Integer> states, int maximumPerNode,
+        ReplicaPlacementScheme placementScheme) {
       _resourceName = resourceName;
       _partitions = partitions;
       _states = states;
       _maximumPerNode = maximumPerNode;
+      if (placementScheme != null){
+        _placementScheme = placementScheme;
+      } else {
+        _placementScheme = new DefaultPlacementScheme();
+      }
     }
 
     public AutoRebalanceModeAlgorithm(String resourceName, final List<String> partitions,
         final LinkedHashMap<String, Integer> states) {
-      this(resourceName, partitions, states, Integer.MAX_VALUE);
+      this(resourceName, partitions, states, Integer.MAX_VALUE, new DefaultPlacementScheme());
     }
 
     public ZNRecord computePartitionAssignment(final List<String> liveNodes,
@@ -251,13 +258,11 @@ public class AutoRebalanceStrategy implements Rebalancer {
           while (it.hasNext()) {
             Replica replica = it.next();
             int startIndex = (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
-
             for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++)
{
               Node receiver = _liveNodesList.get(index % _liveNodesList.size());
               if (receiver.canAdd(replica)) {
                 receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
                 receiver.nonPreferred.add(replica);
-
                 donor.currentlyAssigned = donor.currentlyAssigned - 1;
                 it.remove();
                 break;
@@ -348,7 +353,8 @@ public class AutoRebalanceStrategy implements Rebalancer {
           for (int i = 0; i < count; i++) {
             int replicaId = _replicaIdMap.get(state).get(i);
             Replica replica = new Replica(partition, state, replicaId);
-            if (_preferredAssignment.get(replica).id != node.id) {
+            if (_preferredAssignment.get(replica).id != node.id
+                && !existingNonPreferredAssignment.containsKey(replica)) {
               existingNonPreferredAssignment.put(replica, node);
               node.nonPreferred.add(replica);
               break;
@@ -429,24 +435,15 @@ public class AutoRebalanceStrategy implements Rebalancer {
       Map<Replica, Node> preferredMapping;
       preferredMapping = new HashMap<Replica, Node>();
       int partitionId = 0;
+      int numReplicas = countStateReplicas();
       for (String partition : _partitions) {
         int replicaId = 0;
         for (String state : _states.keySet()) {
           for (int i = 0; i < _states.get(state); i++) {
             Replica replica = new Replica(partition, state, replicaId);
-            int index;
-            if (allNodes.size() > _partitions.size()) {
-              // assign replicas in partition order in case there are more nodes than partitions
-              index = (partitionId + replicaId * _partitions.size()) % allNodes.size();
-            } else if (allNodes.size() == _partitions.size()) {
-              // need a replica offset in case the sizes of these sets are the same
-              index = ((partitionId + replicaId * _partitions.size()) % allNodes.size()
-                  + replicaId) % allNodes.size();
-            } else {
-              // in all other cases, assigning a replica at a time for each partition is
reasonable
-              index = (partitionId + replicaId) % allNodes.size();
-            }
-            preferredMapping.put(replica, _nodeMap.get(allNodes.get(index)));
+            String nodeName = _placementScheme.getLocation(partitionId, replicaId,
+                _partitions.size(), numReplicas, allNodes);
+            preferredMapping.put(replica, _nodeMap.get(nodeName));
             replicaId = replicaId + 1;
           }
         }
@@ -664,4 +661,55 @@ public class AutoRebalanceStrategy implements Rebalancer {
     return map;
   }
 
+  /**
+   * Interface for providing a custom approach to computing a replica's affinity to a node.
+   */
+  public interface ReplicaPlacementScheme {
+    /**
+     * Initialize global state
+     * @param manager The instance to which this placement is associated
+     */
+    public void init(final HelixManager manager);
+
+    /**
+     * Given properties of this replica, determine the node it would prefer to be served
by
+     * @param partitionId The current partition
+     * @param replicaId The current replica with respect to the current partition
+     * @param numPartitions The total number of partitions
+     * @param numReplicas The total number of replicas per partition
+     * @param nodeNames A list of identifiers of all nodes, live and non-live
+     * @return The name of the node that would prefer to serve this replica
+     */
+    public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
+        final List<String> nodeNames);
+  }
+
+  /**
+   * Compute preferred placements based on a default strategy that assigns replicas to nodes
as
+   * evenly as possible while avoiding placing two replicas of the same partition on any
node.
+   */
+  public static class DefaultPlacementScheme implements ReplicaPlacementScheme {
+    @Override
+    public void init(final HelixManager manager) {
+      // do nothing since this is independent of the manager
+    }
+
+    @Override
+    public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
+        final List<String> nodeNames) {
+      int index;
+      if (nodeNames.size() > numPartitions) {
+        // assign replicas in partition order in case there are more nodes than partitions
+        index = (partitionId + replicaId * numPartitions) % nodeNames.size();
+      } else if (nodeNames.size() == numPartitions) {
+        // need a replica offset in case the sizes of these sets are the same
+        index = ((partitionId + replicaId * numPartitions) % nodeNames.size()
+            + replicaId) % nodeNames.size();
+      } else {
+        // in all other cases, assigning a replica at a time for each partition is reasonable
+        index = (partitionId + replicaId) % nodeNames.size();
+      }
+      return nodeNames.get(index);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/edd4db8b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 29b92e1..50a2f81 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -34,6 +34,7 @@ import java.util.TreeSet;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.AutoRebalanceModeAlgorithm;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
 import org.apache.log4j.Logger;
 import org.testng.annotations.Test;
 
@@ -75,7 +76,8 @@ public class TestAutoRebalanceStrategy {
     Map<String, Map<String, String>> currentMapping = new TreeMap<String,
Map<String, String>>();
 
     new AutoRebalanceTester(partitions, states, liveNodes, currentMapping,
-        allNodes, MAX_PER_NODE).runRepeatedly(NUM_ITERATIONS);
+        allNodes, MAX_PER_NODE,
+        new AutoRebalanceStrategy.DefaultPlacementScheme()).runRepeatedly(NUM_ITERATIONS);
   }
 
   class AutoRebalanceTester {
@@ -93,12 +95,13 @@ public class TestAutoRebalanceStrategy {
     private Map<String, Map<String, String>> _currentMapping;
     private List<String> _allNodes;
     private int _maxPerNode;
+    private ReplicaPlacementScheme _placementScheme;
     private Random _random;
 
     public AutoRebalanceTester(List<String> partitions,
         LinkedHashMap<String, Integer> states, List<String> liveNodes,
         Map<String, Map<String, String>> currentMapping, List<String> allNodes,
-        int maxPerNode) {
+        int maxPerNode, ReplicaPlacementScheme placementScheme) {
       _partitions = partitions;
       _states = states;
       _liveNodes = liveNodes;
@@ -116,6 +119,7 @@ public class TestAutoRebalanceStrategy {
         }
       }
       _maxPerNode = maxPerNode;
+      _placementScheme = placementScheme;
       _random = new Random();
     }
 
@@ -128,7 +132,7 @@ public class TestAutoRebalanceStrategy {
     public void runRepeatedly(int numIterations) {
       logger.info("~~~~ Initial State ~~~~~");
       ZNRecord initialResult = new AutoRebalanceStrategy.AutoRebalanceModeAlgorithm(
-          RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          RESOURCE_NAME, _partitions, _states, _maxPerNode, _placementScheme)
           .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
       _currentMapping = initialResult.getMapFields();
       logger.info(_currentMapping);
@@ -426,7 +430,7 @@ public class TestAutoRebalanceStrategy {
       _nonLiveSet.remove(node);
 
       return new AutoRebalanceStrategy.AutoRebalanceModeAlgorithm(
-          RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          RESOURCE_NAME, _partitions, _states, _maxPerNode, _placementScheme)
           .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
     }
 
@@ -462,7 +466,7 @@ public class TestAutoRebalanceStrategy {
       }
 
       return new AutoRebalanceStrategy.AutoRebalanceModeAlgorithm(
-          RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          RESOURCE_NAME, _partitions, _states, _maxPerNode, _placementScheme)
           .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
     }
 
@@ -490,7 +494,7 @@ public class TestAutoRebalanceStrategy {
       _liveSet.add(node);
 
       return new AutoRebalanceStrategy.AutoRebalanceModeAlgorithm(
-          RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          RESOURCE_NAME, _partitions, _states, _maxPerNode, _placementScheme)
           .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
     }
 


Mime
View raw message