helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject git commit: [HELIX-72] Allow pluggable rebalancer in controller
Date Wed, 03 Apr 2013 01:42:56 GMT
Updated Branches:
  refs/heads/master ab5c9b091 -> 24fd28c9c


[HELIX-72] Allow pluggable rebalancer in controller

Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/24fd28c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/24fd28c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/24fd28c9

Branch: refs/heads/master
Commit: 24fd28c9cc126e83af611fb0c2540c187d0ebf24
Parents: ab5c9b0
Author: slu2011 <lushi04@gmail.com>
Authored: Tue Apr 2 18:42:44 2013 -0700
Committer: slu2011 <lushi04@gmail.com>
Committed: Tue Apr 2 18:42:44 2013 -0700

----------------------------------------------------------------------
 .../helix/controller/GenericHelixController.java   |    2 +
 .../helix/controller/rebalancer/Rebalancer.java    |    7 +-
 .../stages/RebalanceIdealStateStage.java           |   79 ++++++
 .../java/org/apache/helix/model/IdealState.java    |   13 +-
 .../TestCustomizedIdealStateRebalancer.java        |  212 +++++++++++++++
 5 files changed, 309 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/24fd28c9/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 9a414cd..ff45c74 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -53,6 +53,7 @@ import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.RebalanceIdealStateStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.model.CurrentState;
@@ -204,6 +205,7 @@ public class GenericHelixController implements
       Pipeline rebalancePipeline = new Pipeline();
       rebalancePipeline.addStage(new ResourceComputationStage());
       rebalancePipeline.addStage(new CurrentStateComputationStage());
+      rebalancePipeline.addStage(new RebalanceIdealStateStage());
       rebalancePipeline.addStage(new BestPossibleStateCalcStage());
       rebalancePipeline.addStage(new MessageGenerationPhase());
       rebalancePipeline.addStage(new MessageSelectionStage());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/24fd28c9/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
index ac3edfd..62a73b3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
@@ -40,6 +40,7 @@ public interface Rebalancer
    * This allows one to compute the newIdealState according to app specific requirement.
    * 
    * @param resourceName Name of the resource to be rebalanced
+   * @param currentIdealState
    * @param currentStateOutput
    *          Provides the current state and pending state transition for all
    *          partitions
@@ -47,8 +48,8 @@ public interface Rebalancer
    * @return
    */
   IdealState computeNewIdealState(String resourceName,
-                                  String currentIdealState,
-                                  CurrentStateOutput currentStateOutput, 
-                                  ClusterDataCache clusterData);
+                                  IdealState currentIdealState,
+                                  final CurrentStateOutput currentStateOutput, 
+                                  final ClusterDataCache clusterData);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/24fd28c9/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
new file mode 100644
index 0000000..9cd38c0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
@@ -0,0 +1,79 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.log4j.Logger;
+
+/**
+ * Check and invoke custom implementation idealstate rebalancers.<br/>
+ * If the resourceConfig has specified className of the customized rebalancer, <br/>
+ * the rebalancer will be invoked to re-write the idealstate of the resource<br/>
+ * 
+ */
+public class RebalanceIdealStateStage extends AbstractBaseStage
+{
+  private static final Logger LOG =
+      Logger.getLogger(RebalanceIdealStateStage.class.getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    HelixManager manager = event.getAttribute("helixmanager");
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    Map<String, IdealState> idealStateMap = cache.getIdealStates();
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    
+    Map<String, IdealState> updatedIdealStates = new HashMap<String, IdealState>();
+    for(String resourceName : idealStateMap.keySet())
+    {
+      IdealState currentIdealState = idealStateMap.get(resourceName);
+      if(currentIdealState.getRecord().getSimpleFields().containsKey(IdealStateProperty.REBALANCER_CLASS_NAME.toString()))
+      {
+        String rebalancerClassName = currentIdealState.getRebalancerClassName();
+        LOG.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
+        try
+        {
+          Rebalancer balancer = (Rebalancer) (Class.forName(rebalancerClassName).newInstance());
+          balancer.init(manager);
+          IdealState newIdealState = balancer.computeNewIdealState(resourceName, idealStateMap.get(resourceName),
currentStateOutput, cache);
+          updatedIdealStates.put(resourceName, newIdealState);
+        }
+        catch(Exception e)
+        {
+          LOG.error("", e);
+        }
+      }
+    }
+    if(updatedIdealStates.size() > 0)
+    {
+      cache.getIdealStates().putAll(updatedIdealStates);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/24fd28c9/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 879010c..39c669a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -39,7 +39,7 @@ public class IdealState extends HelixProperty
 {
   public enum IdealStateProperty
   {
-    NUM_PARTITIONS, STATE_MODEL_DEF_REF, STATE_MODEL_FACTORY_NAME, REPLICAS, IDEAL_STATE_MODE,
REBALANCE_TIMER_PERIOD, MAX_PARTITONS_PER_INSTANCE, INSTANCE_GROUP_TAG
+    NUM_PARTITIONS, STATE_MODEL_DEF_REF, STATE_MODEL_FACTORY_NAME, REPLICAS, IDEAL_STATE_MODE,
REBALANCE_TIMER_PERIOD, MAX_PARTITONS_PER_INSTANCE, INSTANCE_GROUP_TAG, REBALANCER_CLASS_NAME
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -90,6 +90,17 @@ public class IdealState extends HelixProperty
     return Integer.MAX_VALUE;
   }
   
+  public void setRebalancerClassName(String rebalancerClassName)
+  {
+    _record
+    .setSimpleField(IdealStateProperty.REBALANCER_CLASS_NAME.toString(), rebalancerClassName);
+  }
+  
+  public String getRebalancerClassName()
+  {
+    return _record.getSimpleField(IdealStateProperty.REBALANCER_CLASS_NAME.toString());
+  }
+  
   public void setMaxPartitionsPerInstance(int max)
   {
     _record

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/24fd28c9/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
new file mode 100644
index 0000000..df32c4e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -0,0 +1,212 @@
+package org.apache.helix.integration;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestCustomizedIdealStateRebalancer extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+  String db2 = TEST_DB+"2";
+  static boolean testRebalancerCreated = false;
+  public static class TestRebalancer implements Rebalancer
+  {
+
+    @Override
+    public void init(HelixManager manager)
+    {
+      testRebalancerCreated = true;
+    }
+
+    @Override
+    public IdealState computeNewIdealState(String resourceName,
+        IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+        ClusterDataCache clusterData)
+    {
+      for(String partition : currentIdealState.getPartitionSet())
+      {
+        String instance = currentIdealState.getPreferenceList(partition).get(0);
+        currentIdealState.getPreferenceList(partition).clear();
+        currentIdealState.getPreferenceList(partition).add(instance);
+        
+        currentIdealState.getInstanceStateMap(partition).clear();
+        currentIdealState.getInstanceStateMap(partition).put(instance, "MASTER");
+      }
+      currentIdealState.setReplicas("1");
+      return currentIdealState;
+    }
+  }
+  
+  @Test
+  public void testCustomizedIdealStateRebalancer() throws InterruptedException
+  {
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db2, 60, "MasterSlave");
+    _setupTool.addResourceProperty(CLUSTER_NAME, db2, IdealStateProperty.REBALANCER_CLASS_NAME.toString(),

+        TestCustomizedIdealStateRebalancer.TestRebalancer.class.getName()
+        );
+
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 3);
+    
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+                                                                              CLUSTER_NAME,
db2));
+    Assert.assertTrue(result);
+    Thread.sleep(1000);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor( CLUSTER_NAME, new ZkBaseDataAccessor(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
+    Assert.assertEquals(ev.getPartitionSet().size(), 60);
+    for(String partition: ev.getPartitionSet())
+    {
+      Assert.assertEquals(ev.getStateMap(partition).size(), 1);
+    }
+    IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
+    for(String partition: is.getPartitionSet())
+    {
+      Assert.assertEquals(is.getPreferenceList(partition).size(), 3);
+      Assert.assertEquals(is.getInstanceStateMap(partition).size(), 3);
+    }
+    Assert.assertTrue(testRebalancerCreated);
+  }
+  
+  public static class ExternalViewBalancedVerifier implements ZkVerifier
+  {
+    ZkClient _client;
+    String _clusterName;
+    String _resourceName;
+    
+    public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName)
+    {
+      _client = client;
+      _clusterName = clusterName;
+      _resourceName = resourceName;
+    }
+    @Override
+    public boolean verify()
+    {
+      try
+      {
+        HelixDataAccessor accessor = new ZKHelixDataAccessor( _clusterName, new ZkBaseDataAccessor(_client));
+        Builder keyBuilder = accessor.keyBuilder();
+        int numberOfPartitions = accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields().size();
+        ClusterDataCache cache = new ClusterDataCache();
+        cache.refresh(accessor);
+        String masterValue = cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef()).getStatesPriorityList().get(0);
+        int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
+        String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
+        int instances = 0;
+        for(String  liveInstanceName : cache.getLiveInstances().keySet())
+        {
+          if(cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag))
+          {
+            instances ++;
+          }
+        }
+        if(instances == 0)
+        {
+          instances = cache.getLiveInstances().size();
+        }
+        return verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(),
numberOfPartitions, masterValue, replicas, instances);
+      }
+      catch(Exception e)
+      {
+        return false;
+      }
+    }
+
+    @Override
+    public ZkClient getZkClient()
+    {
+      return _client;
+    }
+
+    @Override
+    public String getClusterName()
+    {
+      return _clusterName;
+    }
+    
+  }
+  
+
+  static boolean verifyBalanceExternalView(ZNRecord externalView, int partitionCount, String
masterState, int replica, int instances)
+  {
+    Map<String, Integer> masterPartitionsCountMap = new HashMap<String, Integer>();
+    for(String partitionName : externalView.getMapFields().keySet())
+    {
+      Map<String, String> assignmentMap = externalView.getMapField(partitionName);
+      //Assert.assertTrue(assignmentMap.size() >= replica);
+      for(String instance : assignmentMap.keySet())
+      {
+        if(assignmentMap.get(instance).equals(masterState))
+        {
+          if(!masterPartitionsCountMap.containsKey(instance))
+          {
+            masterPartitionsCountMap.put(instance, 0);
+          }
+          masterPartitionsCountMap.put(instance, masterPartitionsCountMap.get(instance) +
1);
+        }
+      }
+    }
+    
+    int perInstancePartition = partitionCount / instances;
+    
+    int totalCount = 0;
+    for(String instanceName : masterPartitionsCountMap.keySet())
+    {
+      int instancePartitionCount = masterPartitionsCountMap.get(instanceName);
+      totalCount += instancePartitionCount;
+      if(!(instancePartitionCount == perInstancePartition || instancePartitionCount == perInstancePartition
+1 ))
+      {
+        return false;
+      }
+      if(instancePartitionCount == perInstancePartition +1)
+      {
+        if(partitionCount % instances == 0)
+        {
+          return false;
+        }
+      }
+    }
+    if(partitionCount != totalCount)
+    {
+      return false;
+    }
+    return true;
+    
+  }
+}


Mime
View raw message