helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [2/3] git commit: [HELIX-61]Support the concept of roles/groups for nodes
Date Tue, 26 Mar 2013 22:07:38 GMT
[HELIX-61]Support the concept of roles/groups for nodes

Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/8b13937a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/8b13937a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/8b13937a

Branch: refs/heads/master
Commit: 8b13937a00963538bb2116880c94b69c84bd1b5d
Parents: 72559b1
Author: slu2011 <lushi04@gmail.com>
Authored: Tue Mar 26 15:05:38 2013 -0700
Committer: slu2011 <lushi04@gmail.com>
Committed: Tue Mar 26 15:05:38 2013 -0700

----------------------------------------------------------------------
 .../helix/webapp/resources/IdealStateResource.java |   14 +-
 .../helix/webapp/resources/InstanceResource.java   |   20 +
 .../helix/webapp/resources/InstancesResource.java  |   21 +-
 .../helix/webapp/resources/JsonParameters.java     |   16 +
 .../helix/tools/TestHelixAdminScenariosRest.java   |   80 +
 .../src/main/java/org/apache/helix/HelixAdmin.java |   26 +-
 .../stages/BestPossibleStateCalcStage.java         |   15 +
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  105 ++-
 .../java/org/apache/helix/model/IdealState.java    |   15 +-
 .../org/apache/helix/model/InstanceConfig.java     |   50 +-
 .../java/org/apache/helix/tools/ClusterSetup.java  |   76 +-
 .../helix/integration/TestHelixInstanceTag.java    |   81 +
 .../org/apache/helix/tools/TestClusterSetup.java   |    1 +
 .../org/apache/helix/tools/TestHelixAdminCli.java  | 1288 +++++++++------
 14 files changed, 1256 insertions(+), 552 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/IdealStateResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/IdealStateResource.java
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/IdealStateResource.java
index 1986889..80645b9 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/IdealStateResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/IdealStateResource.java
@@ -149,17 +149,15 @@ public class IdealStateResource extends Resource
       {
         int replicas = 
             Integer.parseInt(jsonParameters.getParameter(JsonParameters.REPLICAS));
-        if (jsonParameters.getParameter(JsonParameters.RESOURCE_KEY_PREFIX) != null)
-        {
+        String keyPrefix = jsonParameters.getParameter(JsonParameters.RESOURCE_KEY_PREFIX);
+        String groupTag = jsonParameters.getParameter(ClusterSetup.instanceGroupTag);
+        
           setupTool.rebalanceStorageCluster(clusterName,
                                             resourceName,
                                             replicas,
-                                            jsonParameters.getParameter(JsonParameters.RESOURCE_KEY_PREFIX));
-        }
-        else
-        {
-          setupTool.rebalanceStorageCluster(clusterName, resourceName, replicas);
-        }
+                                            keyPrefix,
+                                            groupTag);
+       
       }
       else if (command.equalsIgnoreCase(ClusterSetup.expandResource))
       {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstanceResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstanceResource.java
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstanceResource.java
index 20d66e3..7964dfc 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstanceResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstanceResource.java
@@ -190,6 +190,26 @@ public class InstanceResource extends Resource
         setupTool.getClusterManagementTool().resetInstance(clusterName,
                                                            Arrays.asList(instanceName));
       }
+      else if (command.equalsIgnoreCase(ClusterSetup.addInstanceTag))
+      {
+        jsonParameters.verifyCommand(ClusterSetup.addInstanceTag);
+        String tag = 
+            jsonParameters.getParameter(ClusterSetup.instanceGroupTag);
+        ZkClient zkClient =
+            (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+        ClusterSetup setupTool = new ClusterSetup(zkClient);
+        setupTool.getClusterManagementTool().addInstanceTag(clusterName, instanceName, tag);
+      }
+      else if (command.equalsIgnoreCase(ClusterSetup.removeInstanceTag))
+      {
+        jsonParameters.verifyCommand(ClusterSetup.removeInstanceTag);
+        String tag = 
+            jsonParameters.getParameter(ClusterSetup.instanceGroupTag);
+        ZkClient zkClient =
+            (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+        ClusterSetup setupTool = new ClusterSetup(zkClient);
+        setupTool.getClusterManagementTool().removeInstanceTag(clusterName, instanceName,
tag);
+      }
       else
       {
         throw new HelixException("Unsupported command: " + command

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstancesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstancesResource.java
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstancesResource.java
index 246633d..2042827 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstancesResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstancesResource.java
@@ -20,7 +20,10 @@ package org.apache.helix.webapp.resources;
  */
 
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -111,17 +114,31 @@ public class InstancesResource extends Resource
         accessor.getChildValuesMap(accessor.keyBuilder().liveInstances());
     Map<String, InstanceConfig> instanceConfigsMap =
         accessor.getChildValuesMap(accessor.keyBuilder().instanceConfigs());
-
+    
+    Map<String, List<String>> tagInstanceLists = new TreeMap<String, List<String>>();
+    
     for (String instanceName : instanceConfigsMap.keySet())
     {
       boolean isAlive = liveInstancesMap.containsKey(instanceName);
       instanceConfigsMap.get(instanceName)
                         .getRecord()
                         .setSimpleField("Alive", isAlive + "");
+      InstanceConfig config = instanceConfigsMap.get(instanceName);
+      for(String tag : config.getTags())
+      {
+        if(!tagInstanceLists.containsKey(tag))
+        {
+          tagInstanceLists.put(tag, new LinkedList<String>());
+        }
+        if(!tagInstanceLists.get(tag).contains(instanceName))
+        {
+          tagInstanceLists.get(tag).add(instanceName);
+        }
+      }
     }
 
     StringRepresentation representation =
-        new StringRepresentation(ClusterRepresentationUtil.ObjectToJson(instanceConfigsMap.values()),
+        new StringRepresentation(ClusterRepresentationUtil.ObjectToJson(instanceConfigsMap.values())
+ ClusterRepresentationUtil.ObjectToJson(tagInstanceLists),
                                  MediaType.APPLICATION_JSON);
 
     return representation;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index fc77dcf..b201ffa 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -70,6 +70,8 @@ public class JsonParameters
   public static final String             NEW_IDEAL_STATE     = "newIdealState";
   public static final String             NEW_STATE_MODEL_DEF = "newStateModelDef";
 
+  public static final String TAG = "tag";
+
   // aliases for ClusterSetup commands
   public static Map<String, Set<String>> CLUSTERSETUP_COMMAND_ALIASES;
   static
@@ -232,6 +234,20 @@ public class JsonParameters
         throw new HelixException("Missing Json parameters: '" + CONFIGS + "'");
       }
     }
+    else if (command.equalsIgnoreCase(ClusterSetup.addInstanceTag))
+    {
+      if (!_parameterMap.containsKey(ClusterSetup.instanceGroupTag))
+      {
+        throw new HelixException("Missing Json parameters: '" + ClusterSetup.instanceGroupTag
+ "'");
+      }
+    }
+    else if (command.equalsIgnoreCase(ClusterSetup.removeInstanceTag))
+    {
+      if (!_parameterMap.containsKey(ClusterSetup.instanceGroupTag))
+      {
+        throw new HelixException("Missing Json parameters: '" + ClusterSetup.instanceGroupTag
+ "'");
+      }
+    }
     else if (command.equalsIgnoreCase(ClusterSetup.addCluster))
     {
       if (!_parameterMap.containsKey(CLUSTER_NAME))

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
index 073f35b..b0a87e5 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
@@ -39,6 +39,7 @@ import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState.IdealStateProperty;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -71,6 +72,9 @@ public class TestHelixAdminScenariosRest extends AdminTestBase
   Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
   RestAdminApplication       _adminApp;
   Component                  _component;
+  String _tag1 = "tag1123";
+  String _tag2 = "tag212334";
+  
 
   public static String ObjectToJson(Object object) throws JsonGenerationException,
       JsonMappingException,
@@ -354,6 +358,14 @@ public class TestHelixAdminScenariosRest extends AdminTestBase
     Assert.assertTrue(response.contains("db_11"));
 
     Assert.assertTrue(_gZkClient.exists("/clusterTest1/IDEALSTATES/db_11"));
+    
+    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_33");
+    response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
+    Assert.assertTrue(response.contains("db_33"));
+    
+    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_44");
+    response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
+    Assert.assertTrue(response.contains("db_44"));
   }
 
   private void testDeactivateCluster() throws Exception,
@@ -704,6 +716,56 @@ public class TestHelixAdminScenariosRest extends AdminTestBase
     Assert.assertTrue((((List<String>) (record.getListFields().values().toArray()[0]))).size()
== 2);
     Assert.assertTrue((((Map<String, String>) (record.getMapFields().values().toArray()[0]))).size()
== 2);
     Assert.assertTrue((((String) (record.getMapFields().keySet().toArray()[0]))).startsWith("alias_"));
+    Assert.assertFalse(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
+    resourceUrl = getResourceUrl("clusterTest1", "db_33");
+    ISUrl = resourceUrl + "/idealState";
+    paraMap.put(JsonParameters.REPLICAS, "2");
+    paraMap.remove(JsonParameters.RESOURCE_KEY_PREFIX);
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
+    paraMap.put(ClusterSetup.instanceGroupTag,_tag1);
+    response = assertSuccessPostOperation(ISUrl, paraMap, false);
+
+    Assert.assertTrue(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
+    Assert.assertTrue(response.contains(_tag1));
+    for (int i = 0; i < 6; i++)
+    {
+      String instance = "localhost_123"+i;
+      if(i<3)
+      {
+        Assert.assertTrue(response.contains(instance));
+      }
+      else
+      {
+        Assert.assertFalse(response.contains(instance));
+      }
+    }
+    
+    resourceUrl = getResourceUrl("clusterTest1", "db_44");
+    ISUrl = resourceUrl + "/idealState";
+    paraMap.put(JsonParameters.REPLICAS, "2");
+    paraMap.remove(JsonParameters.RESOURCE_KEY_PREFIX);
+    paraMap.put(JsonParameters.RESOURCE_KEY_PREFIX, "alias");
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
+    paraMap.put(ClusterSetup.instanceGroupTag,_tag1);
+    response = assertSuccessPostOperation(ISUrl, paraMap, false);
+    Assert.assertTrue(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
+    Assert.assertTrue(response.contains(_tag1));
+
+    record = JsonToObject(ZNRecord.class, response);
+    Assert.assertTrue((((String) (record.getMapFields().keySet().toArray()[0]))).startsWith("alias_"));
+    
+    for (int i = 0; i < 6; i++)
+    {
+      String instance = "localhost_123"+i;
+      if(i<3)
+      {
+        Assert.assertTrue(response.contains(instance));
+      }
+      else
+      {
+        Assert.assertFalse(response.contains(instance));
+      }
+    }
   }
 
   private void testAddInstance() throws Exception
@@ -765,5 +827,23 @@ public class TestHelixAdminScenariosRest extends AdminTestBase
     paraMap.remove(JsonParameters.INSTANCE_NAMES);
     paraMap.put(JsonParameters.INSTANCE_NAME, "localhost:1234");
     response = assertSuccessPostOperation(instancesUrl, paraMap, true);
+    
+    // add tags
+    
+    paraMap.clear();
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstanceTag);
+    paraMap.put(ClusterSetup.instanceGroupTag, _tag1);
+    for (int i = 0; i < 4; i++)
+    {
+      instanceUrl = instancesUrl + "/localhost_123"+i;
+      response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+      Assert.assertTrue(response.contains(_tag1));
+      
+    }
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.removeInstanceTag);
+    instanceUrl = instancesUrl + "/localhost_1233";
+    response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+    Assert.assertFalse(response.contains(_tag1));
+    
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 903516d..1c8a7f7 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -440,5 +440,29 @@ public interface HelixAdmin
    * @param keyPrefix
    */
   void rebalance(String clusterName, String resourceName, int replica,
-      String keyPrefix);
+      String keyPrefix, String group);
+  /**
+   * 
+   * @param clusterName
+   * @param tag
+   */
+  List<String> getInstancesInClusterWithTag(String clusterName, String tag);
+  
+  /**
+   * 
+   * @param clusterName
+   * @param instanceNames
+   * @param tag
+   * @return
+   */
+  void addInstanceTag(String clusterName, String instanceName, String tag);
+  
+  /**
+   * 
+   * @param clusterName
+   * @param instanceNames
+   * @param tag
+   * @return
+   */
+  void removeInstanceTag(String clusterName, String instanceName, String tag);
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 84092e4..11557d7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -188,6 +188,21 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
   {
     String topStateValue = stateModelDef.getStatesPriorityList().get(0);
     Set<String> liveInstances = cache._liveInstanceMap.keySet();
+    Set<String> taggedInstances = new HashSet<String>();
+    
+    // If there are instances tagged with resource name, use only those instances
+    for(String instanceName : liveInstances)
+    {
+      if(cache._instanceConfigMap.get(instanceName).containsTag(idealState.getResourceName()))
+      {
+        taggedInstances.add(instanceName);
+      }
+    }
+    if(taggedInstances.size() > 0)
+    {
+      logger.info("found the following instances with tag " + idealState.getResourceName()
+ " " + taggedInstances);
+      liveInstances = taggedInstances;
+    }
     // Obtain replica number
     int replicas = 1;
     try

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 3dee34e..f0722a4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -45,6 +46,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
@@ -628,6 +630,28 @@ public class ZKHelixAdmin implements HelixAdmin
     String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName);
     return _zkClient.getChildren(memberInstancesPath);
   }
+  
+  @Override
+  public List<String> getInstancesInClusterWithTag(String clusterName, String tag)
+  {
+    String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName);
+    List<String> instances =  _zkClient.getChildren(memberInstancesPath);
+    List<String> result = new ArrayList<String>();
+    
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    
+    for(String instanceName : instances)
+    {
+      InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
+      if(config.containsTag(tag))
+      {
+        result.add(instanceName);
+      }
+    }
+    return result;
+  }
 
   @Override
   public void addResource(String clusterName,
@@ -1114,19 +1138,38 @@ public class ZKHelixAdmin implements HelixAdmin
   @Override
   public void rebalance(String clusterName, String resourceName, int replica)
   {
-    List<String> instanceNames = getInstancesInCluster(clusterName);
-    rebalance(clusterName, resourceName, replica, resourceName, instanceNames);
+    rebalance(clusterName, resourceName, replica, resourceName, "");
   }
+  
   @Override
-  public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix)
+  public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix,
String group)
   {
-    List<String> instanceNames = getInstancesInCluster(clusterName);
-    rebalance(clusterName, resourceName, replica, keyPrefix, instanceNames);
+    List<String> instanceNames = new LinkedList<String>();
+    if(keyPrefix == null || keyPrefix.length() == 0)
+    {
+      keyPrefix = resourceName;
+    }
+    if(group != null && group.length() > 0)
+    {
+      instanceNames = getInstancesInClusterWithTag(clusterName, group);
+    }
+    if(instanceNames.size() == 0)
+    {
+      logger.info("No tags found for resource " + resourceName + ", use all instances");
+      instanceNames = getInstancesInCluster(clusterName);
+      group = "";
+    }
+    else
+    {
+      logger.info("Found instances with tag for " + resourceName + " " + instanceNames);
+    }
+    rebalance(clusterName, resourceName, replica, keyPrefix, instanceNames, group);
   }
+  
   @Override
   public void rebalance(String clusterName, String resourceName, int replica, List<String>
instances)
   {
-    rebalance(clusterName, resourceName, replica, resourceName, instances);
+    rebalance(clusterName, resourceName, replica, resourceName, instances, "");
   }
   
   
@@ -1134,7 +1177,8 @@ public class ZKHelixAdmin implements HelixAdmin
                  String resourceName, 
                  int replica, 
                  String keyPrefix, 
-                 List<String> instanceNames)
+                 List<String> instanceNames,
+                 String groupId)
   {
     // ensure we get the same idealState with the same set of instances
     Collections.sort(instanceNames);
@@ -1145,6 +1189,10 @@ public class ZKHelixAdmin implements HelixAdmin
       throw new HelixException("Resource: " + resourceName + " has NOT been added yet");
     }
 
+    if(groupId != null && groupId.length() > 0)
+    {
+      idealState.setInstanceGroupTag(groupId);
+    }
     idealState.setReplicas(Integer.toString(replica));
     int partitions = idealState.getNumPartitions();
     String stateModelName = idealState.getStateModelDefRef();
@@ -1393,6 +1441,49 @@ public class ZKHelixAdmin implements HelixAdmin
     setResourceIdealState(clusterName, newIdealStateRecord.getId(), newIdealState);
   }
 
+  @Override
+  public void addInstanceTag(String clusterName, String instanceName, String tag)
+  {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+    
+    if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT))
+    {
+      throw new HelixException("cluster " + clusterName + " instance "+instanceName +" is
not setup yet");
+    }
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    InstanceConfig config =  accessor.getProperty(keyBuilder.instanceConfig(instanceName));
+    config.addTag(tag);
+    accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
+  }
+
+  @Override
+  public void removeInstanceTag(String clusterName, String instanceName,
+      String tag)
+  {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+    
+    if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT))
+    {
+      throw new HelixException("cluster " + clusterName + " instance "+instanceName +" is
not setup yet");
+    }
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    
+    InstanceConfig config =  accessor.getProperty(keyBuilder.instanceConfig(instanceName));
+    config.removeTag(tag);
+    accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
+  }
+
   
  
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index a2d2dc0..879010c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -39,7 +39,7 @@ public class IdealState extends HelixProperty
 {
   public enum IdealStateProperty
   {
-    NUM_PARTITIONS, STATE_MODEL_DEF_REF, STATE_MODEL_FACTORY_NAME, REPLICAS, IDEAL_STATE_MODE,
REBALANCE_TIMER_PERIOD, MAX_PARTITONS_PER_INSTANCE
+    NUM_PARTITIONS, STATE_MODEL_DEF_REF, STATE_MODEL_FACTORY_NAME, REPLICAS, IDEAL_STATE_MODE,
REBALANCE_TIMER_PERIOD, MAX_PARTITONS_PER_INSTANCE, INSTANCE_GROUP_TAG
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -356,5 +356,16 @@ public class IdealState extends HelixProperty
 
     return true;
   }
-
+  
+  public void setInstanceGroupTag(String groupTag)
+  {
+    _record.setSimpleField(
+        IdealStateProperty.INSTANCE_GROUP_TAG.toString(), groupTag);
+  }
+  
+  public String getInstanceGroupTag()
+  {
+    return _record.getSimpleField(
+        IdealStateProperty.INSTANCE_GROUP_TAG.toString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 816f3b9..ebde5df 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -41,7 +41,8 @@ public class InstanceConfig extends HelixProperty
     HELIX_HOST,
     HELIX_PORT,
     HELIX_ENABLED,
-    HELIX_DISABLED_PARTITION
+    HELIX_DISABLED_PARTITION,
+    TAG_LIST
   }
   private static final Logger _logger = Logger.getLogger(InstanceConfig.class.getName());
 
@@ -74,6 +75,53 @@ public class InstanceConfig extends HelixProperty
   {
     _record.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port);
   }
+  
+  public List<String> getTags()
+  {
+    List<String> tags = getRecord().getListField(InstanceConfigProperty.TAG_LIST.toString());
+    if(tags == null)
+    {
+      tags = new ArrayList<String>(0);
+    }
+    return tags;
+  }
+  
+  public void addTag(String tag)
+  {
+    List<String> tags = getRecord().getListField(InstanceConfigProperty.TAG_LIST.toString());
+    if(tags == null)
+    {
+      tags = new ArrayList<String>(0);
+    }
+    if(!tags.contains(tag))
+    {
+      tags.add(tag);
+    }
+    getRecord().setListField(InstanceConfigProperty.TAG_LIST.toString(), tags);
+  }
+  
+  public void removeTag(String tag)
+  {
+    List<String> tags = getRecord().getListField(InstanceConfigProperty.TAG_LIST.toString());
+    if(tags == null)
+    {
+      return;
+    }
+    if(tags.contains(tag))
+    {
+      tags.remove(tag);
+    }
+  }
+  
+  public boolean containsTag(String tag)
+  {
+    List<String> tags = getRecord().getListField(InstanceConfigProperty.TAG_LIST.toString());
+    if(tags == null)
+    {
+      return false;
+    }
+    return tags.contains(tag);
+  }
 
   public boolean getInstanceEnabled()
   {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index b4dec19..8c7cfbf 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -88,12 +88,16 @@ public class ClusterSetup
   public static final String expandCluster = "expandCluster";
   public static final String expandResource = "expandResource";
   public static final String mode = "mode";
+  public static final String instanceGroupTag = "instanceGroupTag";
   public static final String bucketSize = "bucketSize";
   public static final String resourceKeyPrefix = "key";
   public static final String maxPartitionsPerNode = "maxPartitionsPerNode";
   
   public static final String addResourceProperty = "addResourceProperty";
   public static final String removeResourceProperty = "removeResourceProperty";
+  
+  public static final String addInstanceTag = "addInstanceTag";
+  public static final String removeInstanceTag = "removeInstanceTag";
 
   // Query info (TBD in V2)
   public static final String listClusterInfo = "listClusterInfo";
@@ -537,7 +541,23 @@ public class ClusterSetup
                                       int replica,
                                       String keyPrefix)
   {
-    _admin.rebalance(clusterName, resourceName, replica, keyPrefix);
+    _admin.rebalance(clusterName, resourceName, replica, keyPrefix, "");
+  }
+  
+  
+  public void rebalanceStorageCluster(String clusterName,
+      String resourceName,
+      int replica,
+      String keyPrefix, String group)
+  {
+    _admin.rebalance(clusterName, resourceName, replica, keyPrefix, group);
+  }
+  
+  public void rebalanceStorageCluster(String clusterName,
+      String resourceName, String group,
+      int replica)
+  {
+    _admin.rebalance(clusterName, resourceName, replica, resourceName, group);
   }
 
   /**
@@ -770,6 +790,14 @@ public class ClusterSetup
     resourceKeyOption.setArgs(1);
     resourceKeyOption.setRequired(false);
     resourceKeyOption.setArgName("Resource key prefix");
+    
+    Option instanceGroupTagOption =
+        OptionBuilder.withLongOpt(instanceGroupTag)
+                     .withDescription("Specify instance group tag, used with rebalance command")
+                     .create();
+    instanceGroupTagOption.setArgs(1);
+    instanceGroupTagOption.setRequired(false);
+    instanceGroupTagOption.setArgName("Instance group tag");
 
     Option addStateModelDefOption =
         OptionBuilder.withLongOpt(addStateModelDef)
@@ -943,7 +971,20 @@ public class ClusterSetup
     addAlertOption.setArgs(2);
     addAlertOption.setRequired(false);
     addAlertOption.setArgName("clusterName alertName");
-
+    
+    Option addInstanceTagOption =
+        OptionBuilder.withLongOpt(addInstanceTag)
+                     .withDescription("Add a tag to instance")
+                     .create();
+    addInstanceTagOption.setArgs(3);
+    addInstanceTagOption.setRequired(false);
+    addInstanceTagOption.setArgName("clusterName instanceName tag");
+    Option removeInstanceTagOption =
+        OptionBuilder.withLongOpt(removeInstanceTag).withDescription("Remove tag from instance").create();
+    removeInstanceTagOption.setArgs(3);
+    removeInstanceTagOption.setRequired(false);
+    removeInstanceTagOption.setArgName("clusterName instanceName tag");
+    
     Option dropStatOption =
         OptionBuilder.withLongOpt(dropStat)
                      .withDescription("Drop a persistent stat")
@@ -1013,6 +1054,9 @@ public class ClusterSetup
     group.addOption(getConfOption);
     group.addOption(addResourcePropertyOption);
     group.addOption(removeResourcePropertyOption);
+    group.addOption(addInstanceTagOption);
+    group.addOption(removeInstanceTagOption);
+    group.addOption(instanceGroupTagOption);
 
     Options options = new Options();
     options.addOption(helpOption);
@@ -1129,15 +1173,17 @@ public class ClusterSetup
       String clusterName = cmd.getOptionValues(rebalance)[0];
       String resourceName = cmd.getOptionValues(rebalance)[1];
       int replicas = Integer.parseInt(cmd.getOptionValues(rebalance)[2]);
+      String keyPrefixVal = "";
+      String instanceGroupTagVal = "";
       if (cmd.hasOption(resourceKeyPrefix))
       {
-        setupTool.rebalanceStorageCluster(clusterName,
-                                          resourceName,
-                                          replicas,
-                                          cmd.getOptionValue(resourceKeyPrefix));
-        return 0;
+        keyPrefixVal = cmd.getOptionValue(resourceKeyPrefix);
       }
-      setupTool.rebalanceStorageCluster(clusterName, resourceName, replicas);
+      if (cmd.hasOption(instanceGroupTag))
+      {
+        instanceGroupTagVal = cmd.getOptionValue(instanceGroupTag);
+      }
+      setupTool.rebalanceStorageCluster(clusterName, resourceName, replicas, keyPrefixVal,
instanceGroupTagVal);
       return 0;
     }
 
@@ -1521,6 +1567,20 @@ public class ClusterSetup
       String keysStr = cmd.getOptionValues(getConfig)[1];
       setupTool.getConfig(scopeStr, keysStr);
     }
+    else if (cmd.hasOption(addInstanceTag))
+    {
+      String clusterName = cmd.getOptionValues(addInstanceTag)[0];
+      String instanceName = cmd.getOptionValues(addInstanceTag)[1];
+      String tag = cmd.getOptionValues(addInstanceTag)[2];
+      setupTool.getClusterManagementTool().addInstanceTag(clusterName, instanceName, tag);
+    }
+    else if (cmd.hasOption(removeInstanceTag))
+    {
+      String clusterName = cmd.getOptionValues(removeInstanceTag)[0];
+      String instanceName = cmd.getOptionValues(removeInstanceTag)[1];
+      String tag = cmd.getOptionValues(removeInstanceTag)[2];
+      setupTool.getClusterManagementTool().removeInstanceTag(clusterName, instanceName, tag);
+    }
     else if (cmd.hasOption(help))
     {
       printUsage(cliOptions);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
b/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
new file mode 100644
index 0000000..27e2c6c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
@@ -0,0 +1,81 @@
+package org.apache.helix.integration;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestHelixInstanceTag extends ZkStandAloneCMTestBase
+{
+  @Test
+  public void testInstanceTag() throws Exception
+  {
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    
+    String DB2 = "TestDB2";
+    int partitions = 100;
+    String DB2tag = "TestDB2_tag";
+    int replica = 2;
+    for(int i = 0; i < 2; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+      _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, instanceName, DB2tag);
+    }
+    _setupTool.addResourceToCluster(CLUSTER_NAME, DB2, partitions, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, DB2, DB2tag, replica);
+    
+    boolean result = ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+        CLUSTER_NAME)));
+    Assert.assertTrue(result, "Cluster verification fails");
+    
+    ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView(DB2));
+    Set<String> hosts = new HashSet<String>();
+    for(String p : ev.getPartitionSet())
+    {
+      for(String hostName : ev.getStateMap(p).keySet())
+      {
+        InstanceConfig config = accessor.getProperty(accessor.keyBuilder().instanceConfig(hostName));
+        Assert.assertTrue(config.containsTag(DB2tag));
+        hosts.add(hostName);
+      }
+    }
+    Assert.assertEquals(hosts.size(), 2);
+    
+    String DB3 = "TestDB3";
+    String DB3Tag = "TestDB3_tag";
+    partitions = 10;
+    replica = 3;
+    for(int i = 1; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+      _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, instanceName, DB3Tag);
+    }
+    _setupTool.addResourceToCluster(CLUSTER_NAME, DB3, partitions, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, DB3, DB3Tag, replica);
+    
+    result = ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+        CLUSTER_NAME)));
+    Assert.assertTrue(result, "Cluster verification fails");
+    
+    ev = accessor.getProperty(accessor.keyBuilder().externalView(DB3));
+    hosts = new HashSet<String>();
+    for(String p : ev.getPartitionSet())
+    {
+      for(String hostName : ev.getStateMap(p).keySet())
+      {
+        InstanceConfig config = accessor.getProperty(accessor.keyBuilder().instanceConfig(hostName));
+        Assert.assertTrue(config.containsTag(DB3Tag));
+        hosts.add(hostName);
+      }
+    }
+    Assert.assertEquals(hosts.size(), 4);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/8b13937a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
index cb4b686..5b9c350 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -495,4 +495,5 @@ public class TestClusterSetup extends ZkUnitTestBase
         + new Date(System.currentTimeMillis()));
 
   }
+  
 }


Mime
View raw message