helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-238] Updated accessors for a new cluster setup
Date Fri, 04 Oct 2013 00:28:53 GMT
Updated Branches:
  refs/heads/helix-logical-model dd701d6d6 -> 129590d45


[HELIX-238] Updated accessors for a new cluster setup


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/129590d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/129590d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/129590d4

Branch: refs/heads/helix-logical-model
Commit: 129590d456189a808349f4fb597ac107bba0c880
Parents: dd701d6
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Thu Oct 3 17:27:58 2013 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Thu Oct 3 17:27:58 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/api/Participant.java  |    2 +-
 .../helix/api/accessor/ClusterAccessor.java     |  249 +++--
 .../helix/api/accessor/ParticipantAccessor.java |   10 +
 .../helix/api/accessor/ResourceAccessor.java    |    9 +
 .../accessor/StateModelDefinitionAccessor.java  |   70 --
 .../rebalancer/context/RebalancerConfig.java    |    8 +
 .../util/NewConstraintBasedAssignment.java      |    2 +-
 .../apache/helix/model/HelixConfigScope.java    |   17 +
 .../org/apache/helix/tools/NewClusterSetup.java | 1041 ++++++++++++++++++
 9 files changed, 1238 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/129590d4/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 0e0de9d..53f4038 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -111,7 +111,7 @@ public class Participant {
    * Get disabled partition id's
    * @return set of disabled partition id's, or empty set if none
    */
-  public Set<PartitionId> getDisablePartitionIds() {
+  public Set<PartitionId> getDisabledPartitionIds() {
     return _config.getDisabledPartitions();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/129590d4/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 8768d8e..5c7df85 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -63,6 +63,8 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 import org.testng.internal.annotations.Sets;
 
+import com.google.common.collect.Maps;
+
 public class ClusterAccessor {
   private static Logger LOG = Logger.getLogger(ClusterAccessor.class);
 
@@ -179,55 +181,81 @@ public class ClusterAccessor {
    * @return cluster snapshot
    */
   public Cluster readCluster() {
-    /**
-     * map of instance-id to instance-config
-     */
-    Map<String, InstanceConfig> instanceConfigMap =
-        _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
 
     /**
-     * map of resource-id to ideal-state
+     * map of constraint-type to constraints
      */
-    Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+    Map<String, ClusterConstraints> constraintMap =
+        _accessor.getChildValuesMap(_keyBuilder.constraints());
 
-    /**
-     * map of instance-id to live-instance
-     */
-    Map<String, LiveInstance> liveInstanceMap =
-        _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+    // read all the resources
+    Map<ResourceId, Resource> resourceMap = readResources();
 
-    /**
-     * map of participant-id to map of message-id to message
-     */
-    Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
-    for (String instanceName : liveInstanceMap.keySet()) {
-      Map<String, Message> instanceMsgMap =
-          _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
-      messageMap.put(instanceName, instanceMsgMap);
+    // read all the participants
+    Map<ParticipantId, Participant> participantMap = readParticipants();
+
+    // read the controllers
+    Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
+    ControllerId leaderId = null;
+    if (leader != null) {
+      leaderId = ControllerId.from(leader.getId());
+      controllerMap.put(leaderId, new Controller(leaderId, leader, true));
     }
 
-    /**
-     * map of participant-id to map of resource-id to current-state
-     */
-    Map<String, Map<String, CurrentState>> currentStateMap =
-        new HashMap<String, Map<String, CurrentState>>();
-    for (String participantName : liveInstanceMap.keySet()) {
-      LiveInstance liveInstance = liveInstanceMap.get(participantName);
-      SessionId sessionId = liveInstance.getSessionId();
-      Map<String, CurrentState> instanceCurStateMap =
-          _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
-              sessionId.stringify()));
+    // read the constraints
+    Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
+        new HashMap<ConstraintType, ClusterConstraints>();
+    for (String constraintType : constraintMap.keySet()) {
+      clusterConstraintMap.put(ConstraintType.valueOf(constraintType),
+          constraintMap.get(constraintType));
+    }
 
-      currentStateMap.put(participantName, instanceCurStateMap);
+    // read the pause status
+    PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+    boolean isPaused = pauseSignal != null;
+
+    ClusterConfiguration clusterUserConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+    boolean autoJoinAllowed = false;
+    UserConfig userConfig;
+    if (clusterUserConfig != null) {
+      userConfig = UserConfig.from(clusterUserConfig);
+      autoJoinAllowed = clusterUserConfig.autoJoinAllowed();
+    } else {
+      userConfig = new UserConfig(Scope.cluster(_clusterId));
     }
 
-    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+    // read the state model definitions
+    Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions();
 
+    // create the cluster snapshot object
+    return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
+        clusterConstraintMap, stateModelMap, userConfig, isPaused, autoJoinAllowed);
+  }
+
+  /**
+   * Get all the state model definitions for this cluster
+   * @return map of state model def id to state model definition
+   */
+  public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = Maps.newHashMap();
+    List<StateModelDefinition> stateModelList =
+        _accessor.getChildValues(_keyBuilder.stateModelDefs());
+    for (StateModelDefinition stateModelDef : stateModelList) {
+      stateModelDefs.put(stateModelDef.getStateModelDefId(), stateModelDef);
+    }
+    return stateModelDefs;
+  }
+
+  /**
+   * Read all resource in the cluster
+   * @return map of resource id to resource
+   */
+  public Map<ResourceId, Resource> readResources() {
     /**
-     * map of constraint-type to constraints
+     * map of resource-id to ideal-state
      */
-    Map<String, ClusterConstraints> constraintMap =
-        _accessor.getChildValuesMap(_keyBuilder.constraints());
+    Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
 
     /**
      * Map of resource id to external view
@@ -251,7 +279,7 @@ public class ClusterAccessor {
     Set<String> allResources = Sets.newHashSet();
     allResources.addAll(idealStateMap.keySet());
     allResources.addAll(resourceConfigMap.keySet());
-    Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
+    Map<ResourceId, Resource> resourceMap = Maps.newHashMap();
     for (String resourceName : allResources) {
       ResourceId resourceId = ResourceId.from(resourceName);
       resourceMap.put(resourceId, ResourceAccessor.createResource(resourceId,
@@ -259,8 +287,53 @@ public class ClusterAccessor {
           externalViewMap.get(resourceName), resourceAssignmentMap.get(resourceName)));
     }
 
+    return resourceMap;
+  }
+
+  /**
+   * Read all participants in the cluster
+   * @return map of participant id to participant
+   */
+  public Map<ParticipantId, Participant> readParticipants() {
+    /**
+     * map of instance-id to instance-config
+     */
+    Map<String, InstanceConfig> instanceConfigMap =
+        _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+
+    /**
+     * map of instance-id to live-instance
+     */
+    Map<String, LiveInstance> liveInstanceMap =
+        _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+
+    /**
+     * map of participant-id to map of message-id to message
+     */
+    Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
+    for (String instanceName : liveInstanceMap.keySet()) {
+      Map<String, Message> instanceMsgMap =
+          _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
+      messageMap.put(instanceName, instanceMsgMap);
+    }
+
+    /**
+     * map of participant-id to map of resource-id to current-state
+     */
+    Map<String, Map<String, CurrentState>> currentStateMap =
+        new HashMap<String, Map<String, CurrentState>>();
+    for (String participantName : liveInstanceMap.keySet()) {
+      LiveInstance liveInstance = liveInstanceMap.get(participantName);
+      SessionId sessionId = liveInstance.getSessionId();
+      Map<String, CurrentState> instanceCurStateMap =
+          _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+              sessionId.stringify()));
+
+      currentStateMap.put(participantName, instanceCurStateMap);
+    }
+
     // read all the participants
-    Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
+    Map<ParticipantId, Participant> participantMap = Maps.newHashMap();
     for (String participantName : instanceConfigMap.keySet()) {
       InstanceConfig instanceConfig = instanceConfigMap.get(participantName);
       UserConfig userConfig = UserConfig.from(instanceConfig);
@@ -274,45 +347,7 @@ public class ClusterAccessor {
           currentStateMap.get(participantName)));
     }
 
-    // read the controllers
-    Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
-    ControllerId leaderId = null;
-    if (leader != null) {
-      leaderId = ControllerId.from(leader.getId());
-      controllerMap.put(leaderId, new Controller(leaderId, leader, true));
-    }
-
-    // read the constraints
-    Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
-        new HashMap<ConstraintType, ClusterConstraints>();
-    for (String constraintType : constraintMap.keySet()) {
-      clusterConstraintMap.put(ConstraintType.valueOf(constraintType),
-          constraintMap.get(constraintType));
-    }
-
-    // read the pause status
-    PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
-    boolean isPaused = pauseSignal != null;
-
-    ClusterConfiguration clusterUserConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
-    boolean autoJoinAllowed = false;
-    UserConfig userConfig;
-    if (clusterUserConfig != null) {
-      userConfig = UserConfig.from(clusterUserConfig);
-      autoJoinAllowed = clusterUserConfig.autoJoinAllowed();
-    } else {
-      userConfig = new UserConfig(Scope.cluster(_clusterId));
-    }
-
-    // read the state model definitions
-    StateModelDefinitionAccessor stateModelDefAccessor =
-        new StateModelDefinitionAccessor(_accessor);
-    Map<StateModelDefId, StateModelDefinition> stateModelMap =
-        stateModelDefAccessor.readStateModelDefinitions();
-
-    // create the cluster snapshot object
-    return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap, stateModelMap, userConfig, isPaused, autoJoinAllowed);
+    return participantMap;
   }
 
   /**
@@ -335,6 +370,14 @@ public class ClusterAccessor {
   }
 
   /**
+   * Clear any user-specified configuration from the cluster
+   * @return true if the config was cleared, false otherwise
+   */
+  public boolean dropUserConfig() {
+    return setUserConfig(new UserConfig(Scope.cluster(_clusterId)));
+  }
+
+  /**
    * Add user configuration to the existing cluster user configuration. Overwrites properties with
    * the same key
    * @param userConfig the user config key-value pairs to add
@@ -438,8 +481,18 @@ public class ClusterAccessor {
    * @return true if valid or false otherwise
    */
   public boolean isClusterStructureValid() {
-    List<String> paths = getRequiredPaths();
-    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    return isClusterStructureValid(_clusterId, _accessor.getBaseDataAccessor());
+  }
+
+  /**
+   * check if cluster structure is valid
+   * @param clusterId the cluster to check
+   * @param baseAccessor a base data accessor
+   * @return true if valid or false otherwise
+   */
+  private static boolean isClusterStructureValid(ClusterId clusterId,
+      BaseDataAccessor<?> baseAccessor) {
+    List<String> paths = getRequiredPaths(clusterId);
     if (baseAccessor != null) {
       boolean[] existsResults = baseAccessor.exists(paths, 0);
       for (boolean exists : existsResults) {
@@ -456,7 +509,7 @@ public class ClusterAccessor {
    */
   private void initClusterStructure() {
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
-    List<String> paths = getRequiredPaths();
+    List<String> paths = getRequiredPaths(_clusterId);
     for (String path : paths) {
       boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
       if (!status && LOG.isDebugEnabled()) {
@@ -467,25 +520,25 @@ public class ClusterAccessor {
 
   /**
    * Get all property paths that must be set for a cluster structure to be valid
+   * @param the cluster that the paths will be relative to
    * @return list of paths as strings
    */
-  private List<String> getRequiredPaths() {
+  private static List<String> getRequiredPaths(ClusterId clusterId) {
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterId.stringify());
     List<String> paths = new ArrayList<String>();
-    paths.add(_keyBuilder.cluster().getPath());
-    paths.add(_keyBuilder.idealStates().getPath());
-    paths.add(_keyBuilder.clusterConfigs().getPath());
-    paths.add(_keyBuilder.instanceConfigs().getPath());
-    paths.add(_keyBuilder.resourceConfigs().getPath());
-    paths.add(_keyBuilder.propertyStore().getPath());
-    paths.add(_keyBuilder.liveInstances().getPath());
-    paths.add(_keyBuilder.instances().getPath());
-    paths.add(_keyBuilder.externalViews().getPath());
-    paths.add(_keyBuilder.controller().getPath());
-    paths.add(_keyBuilder.stateModelDefs().getPath());
-    paths.add(_keyBuilder.controllerMessages().getPath());
-    paths.add(_keyBuilder.controllerTaskErrors().getPath());
-    paths.add(_keyBuilder.controllerTaskStatuses().getPath());
-    paths.add(_keyBuilder.controllerLeaderHistory().getPath());
+    paths.add(keyBuilder.cluster().getPath());
+    paths.add(keyBuilder.clusterConfigs().getPath());
+    paths.add(keyBuilder.instanceConfigs().getPath());
+    paths.add(keyBuilder.propertyStore().getPath());
+    paths.add(keyBuilder.liveInstances().getPath());
+    paths.add(keyBuilder.instances().getPath());
+    paths.add(keyBuilder.externalViews().getPath());
+    paths.add(keyBuilder.controller().getPath());
+    paths.add(keyBuilder.stateModelDefs().getPath());
+    paths.add(keyBuilder.controllerMessages().getPath());
+    paths.add(keyBuilder.controllerTaskErrors().getPath());
+    paths.add(keyBuilder.controllerTaskStatuses().getPath());
+    paths.add(keyBuilder.controllerLeaderHistory().getPath());
     return paths;
   }
 
@@ -578,8 +631,8 @@ public class ClusterAccessor {
       return false;
     }
 
-    StateModelDefinitionAccessor smdAccessor = new StateModelDefinitionAccessor(_accessor);
-    return smdAccessor.setStateModelDefinition(stateModelDef);
+    return _accessor
+        .createProperty(_keyBuilder.stateModelDef(stateModelDef.getId()), stateModelDef);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/129590d4/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 7e74fc7..e139c2e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -36,6 +36,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.RunningInstance;
+import org.apache.helix.api.Scope;
 import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.MessageId;
@@ -296,6 +297,15 @@ public class ParticipantAccessor {
   }
 
   /**
+   * Clear any user-specified configuration from the participant
+   * @param participantId the participant to update
+   * @return true if the config was cleared, false otherwise
+   */
+  public boolean dropUserConfig(ParticipantId participantId) {
+    return setUserConfig(participantId, new UserConfig(Scope.participant(participantId)));
+  }
+
+  /**
    * Update a participant configuration
    * @param participantId the participant to update
    * @param participantDelta changes to the participant

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/129590d4/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 0b39d36..e5c9443 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -196,6 +196,15 @@ public class ResourceAccessor {
   }
 
   /**
+   * Clear any user-specified configuration from the resource
+   * @param resourceId the resource to update
+   * @return true if the config was cleared, false otherwise
+   */
+  public boolean dropUserConfig(ResourceId resourceId) {
+    return setUserConfig(resourceId, new UserConfig(Scope.resource(resourceId)));
+  }
+
+  /**
    * Persist an existing resource's logical configuration
    * @param resourceConfig logical resource configuration
    * @return true if resource is set, false otherwise

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/129590d4/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
deleted file mode 100644
index 3816507..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package org.apache.helix.api.accessor;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.model.StateModelDefinition;
-
-import com.google.common.collect.ImmutableMap;
-
-public class StateModelDefinitionAccessor {
-  private final HelixDataAccessor _accessor;
-  private final PropertyKey.Builder _keyBuilder;
-
-  /**
-   * @param accessor
-   */
-  public StateModelDefinitionAccessor(HelixDataAccessor accessor) {
-    _accessor = accessor;
-    _keyBuilder = accessor.keyBuilder();
-  }
-
-  /**
-   * Get all of the state model definitions available to the cluster
-   * @return map of state model ids to state model definition objects
-   */
-  public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
-    Map<String, StateModelDefinition> stateModelDefs =
-        _accessor.getChildValuesMap(_keyBuilder.stateModelDefs());
-    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
-        new HashMap<StateModelDefId, StateModelDefinition>();
-
-    for (String stateModelDefName : stateModelDefs.keySet()) {
-      stateModelDefMap.put(StateModelDefId.from(stateModelDefName),
-          stateModelDefs.get(stateModelDefName));
-    }
-
-    return ImmutableMap.copyOf(stateModelDefMap);
-  }
-
-  /**
-   * Set a state model definition. Adds the state model definition if it does not exist
-   * @param stateModelDef fully initialized state model definition
-   * @return true if the model is persisted, false otherwise
-   */
-  public boolean setStateModelDefinition(StateModelDefinition stateModelDef) {
-    return _accessor.setProperty(_keyBuilder.stateModelDef(stateModelDef.getId()), stateModelDef);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/129590d4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
index e3ba6f0..c64941c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
@@ -142,6 +142,14 @@ public final class RebalancerConfig {
   }
 
   /**
+   * Get the rebalancer context serialized as a string
+   * @return string representing the context
+   */
+  public String getSerializedContext() {
+    return _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
+  }
+
+  /**
    * Convert this to a namespaced config
    * @return NamespacedConfig
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/129590d4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 4f2e10c..7bc2769 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -66,7 +66,7 @@ public class NewConstraintBasedAssignment {
           public boolean apply(ParticipantId participantId) {
             Participant participant = participantMap.get(participantId);
             return !participant.isEnabled()
-                || participant.getDisablePartitionIds().contains(partitionId);
+                || participant.getDisabledPartitionIds().contains(partitionId);
           }
         });
     return disabledParticipantsForPartition;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/129590d4/helix-core/src/main/java/org/apache/helix/model/HelixConfigScope.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/HelixConfigScope.java b/helix-core/src/main/java/org/apache/helix/model/HelixConfigScope.java
index 9d3b41a..95c9848 100644
--- a/helix-core/src/main/java/org/apache/helix/model/HelixConfigScope.java
+++ b/helix-core/src/main/java/org/apache/helix/model/HelixConfigScope.java
@@ -92,6 +92,8 @@ public class HelixConfigScope {
    */
   final String _participantName;
 
+  final String _resourceName;
+
   final String _zkPath;
   final String _mapKey;
 
@@ -131,6 +133,13 @@ public class HelixConfigScope {
       _participantName = null;
     }
 
+    // init resourceName
+    if (type == ConfigScopeProperty.RESOURCE && _isFullKey) {
+      _resourceName = zkPathKeys.get(1);
+    } else {
+      _resourceName = null;
+    }
+
     _zkPath = template.instantiate(type, zkPathKeys.toArray(new String[0]));
     _mapKey = mapKey;
   }
@@ -160,6 +169,14 @@ public class HelixConfigScope {
   }
 
   /**
+   * Get the resource name if it exists
+   * @return the resource name if the type is RESOURCE, or null
+   */
+  public String getResourceName() {
+    return _resourceName;
+  }
+
+  /**
    * Get the path to the corresponding ZNode
    * @return a Zookeeper path
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/129590d4/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
new file mode 100644
index 0000000..1f44e69
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -0,0 +1,1041 @@
+package org.apache.helix.tools;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.RunningInstance;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.Scope.ScopeType;
+import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.accessor.ParticipantAccessor;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ConstraintId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Parse command line and call helix-admin
+ */
+public class NewClusterSetup {
+
+  private static Logger LOG = Logger.getLogger(NewClusterSetup.class);
+
+  /**
+   * List all helix cluster setup options
+   */
+  public enum HelixOption {
+    // help
+    help(0, "", "Print command-line options"),
+
+    // zookeeper address
+    zkSvr(1, true, "zookeeperServerAddress", "Zookeeper address (host:port, required)"),
+
+    // list cluster/resource/instances
+    listClusters(0, "", "List clusters"),
+    listResources(1, "clusterId", "List resources in a cluster"),
+    listInstances(1, "clusterId", "List instances in a cluster"),
+
+    // add, drop, and rebalance cluster
+    addCluster(1, "clusterId", "Add a new cluster"),
+    activateCluster(3, "clusterId grandClusterId true/false",
+        "Enable/disable a cluster in distributed controller mode"),
+    dropCluster(1, "clusterId", "Delete a cluster"),
+    dropResource(2, "clusterId resourceId", "Drop a resource from a cluster"),
+    addInstance(2, "clusterId instanceId", "Add an instance to a cluster"),
+    addResource(4, "clusterId resourceId partitionNumber stateModelDefId",
+        "Add a resource to a cluster"),
+    addStateModelDef(2, "clusterId jsonFileName", "Add a state model definition to a cluster"),
+    addIdealState(2, "clusterId resourceId jsonfileName",
+        "Add an ideal state of a resource in cluster"),
+    swapInstance(3, "clusterId oldInstanceId newInstanceId",
+        "Swap an old instance in cluster with a new instance"),
+    dropInstance(2, "clusterId instanceId", "Drop an instance from a cluster"),
+    rebalance(3, "clusterId resourceId replicas", "Rebalance a resource in cluster"),
+    expandCluster(1, "clusterId", "Expand a cluster"),
+    expandResource(2, "clusterId resourceId", "Expand resource to additional nodes"),
+    @Deprecated
+    mode(1, "rebalancerMode", "Specify rebalancer mode, used with " + addResource + " command"),
+    rebalancerMode(1, "rebalancerMode", "Specify rebalancer mode, used with " + addResource
+        + " command"),
+    instanceGroupTag(1, "instanceGroupTag", "Specify instance group tag, used with " + rebalance
+        + " command"),
+    bucketSize(1, "bucketSize", "Specify bucket size, used with " + addResource + " command"),
+    resourceKeyPrefix(1, "resourceKeyPrefix", "Specify resource key prefix, used with " + rebalance
+        + " command"),
+    maxPartitionsPerNode(1, "maxPartitionsPerNode", "Specify max partitions per node, used with "
+        + addResource + " command"),
+    addResourceProperty(4, "clusterId resourceId propertyName propertyValue",
+        "Add a resource property"),
+    removeResourceProperty(3, "clusterId resourceId propertyName", "Remove a resource property"),
+    addInstanceTag(3, "clusterId instanceId tag", "Add a tag to instance"),
+    removeInstanceTag(3, "clusterId instanceId tag", "Remove a tag from instance"),
+
+    // query info
+    listClusterInfo(1, "clusterId", "Query informaton of a cluster"),
+    listInstanceInfo(2, "clusterId instanceId", "Query information of an instance in cluster"),
+    listResourceInfo(2, "clusterId resourceId", "Query information of a resource"),
+    listPartitionInfo(3, "clusterId resourceId partitionId", "Query information of a partition"),
+    listStateModels(1, "clusterId", "Query information of state models in a cluster"),
+    listStateModel(2, "clusterId stateModelDefId", "Query information of a state model in cluster"),
+
+    // enable/disable/reset instances/cluster/resource/partition
+    enableInstance(3, "clusterId instanceId true/false", "Enable/disable an instance"),
+    enablePartition(-1, "true/false clusterId instanceId resourceId partitionId...",
+        "Enable/disable partitions"),
+    enableCluster(2, "clusterId true/false", "Pause/resume the controller of a cluster"),
+    resetPartition(4, "clusterId instanceId resourceId partitionName",
+        "Reset a partition in error state"),
+    resetInstance(2, "clusterId instanceId", "Reset all partitions in error state for an instance"),
+    resetResource(2, "clusterId resourceId", "Reset all partitions in error state for a resource"),
+
+    // stats/alerts
+    addStat(2, "clusterId statName", "Add a persistent stat"),
+    addAlert(2, "clusterId alertName", "Add an alert"),
+    dropStat(2, "clusterId statName", "Drop a persistent stat"),
+    dropAlert(2, "clusterId alertName", "Drop an alert"),
+
+    // set/set/remove configs
+    getConfig(3, "scope(e.g. RESOURCE) configScopeArgs(e.g. myCluster,testDB) keys(e.g. k1,k2)",
+        "Get configs"),
+    setConfig(3,
+        "scope(e.g. RESOURCE) configScopeArgs(e.g. myCluster,testDB) keyValues(e.g. k1=v1,k2=v2)",
+        "Set configs"),
+    removeConfig(3, "scope(e.g. RESOURCE) configScopeArgs(e.g. myCluster,testDB) keys(e.g. k1,k2)",
+        "Remove configs"),
+
+    // get/set/remove constraints
+    getConstraints(2, "clusterId constraintType(e.g. MESSAGE_CONSTRAINT)", "Get constraints"),
+    setConstraint(
+        4,
+        "clusterId constraintType(e.g. MESSAGE_CONSTRAINT) constraintId keyValues(e.g. k1=v1,k2=v2)",
+        "Set a constraint, create if not exist"),
+    removeConstraint(3, "clusterId constraintType(e.g. MESSAGE_CONSTRAINT) constraintId",
+        "Remove a constraint");
+
+    final int _argNum;
+    final boolean _isRequired;
+    final String _argName;
+    final String _description;
+
+    private HelixOption(int argNum, boolean isRequired, String argName, String description) {
+      _argNum = argNum;
+      _isRequired = isRequired;
+      _argName = argName;
+      _description = description;
+    }
+
+    private HelixOption(int argNum, String argName, String description) {
+      this(argNum, false, argName, description);
+    }
+  }
+
+  private final ZkClient _zkclient;
+  private final BaseDataAccessor<ZNRecord> _baseAccessor;
+
+  private NewClusterSetup(ZkClient zkclient) {
+    _zkclient = zkclient;
+    _baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
+  }
+
+  @SuppressWarnings("static-access")
+  static Options constructCommandLineOptions() {
+    Options options = new Options();
+
+    OptionGroup optionGroup = new OptionGroup();
+    for (HelixOption option : HelixOption.values()) {
+      Option opt =
+          OptionBuilder.withLongOpt(option.name()).hasArgs(option._argNum)
+              .isRequired(option._isRequired).withArgName(option._argName)
+              .withDescription(option._description).create();
+      if (option == HelixOption.help || option == HelixOption.zkSvr) {
+        options.addOption(opt);
+      } else {
+        optionGroup.addOption(opt);
+      }
+    }
+    options.addOptionGroup(optionGroup);
+    return options;
+  }
+
+  /**
+   * Check if we have the right number of arguments
+   * @param opt
+   * @param optValues
+   */
+  static void checkArgNum(HelixOption opt, String[] optValues) {
+
+    if (opt._argNum != -1 && opt._argNum < optValues.length) {
+      throw new IllegalArgumentException(opt + " should have no less than " + opt._argNum
+          + " arguments, but was: " + optValues.length + ", " + Arrays.asList(optValues));
+    }
+  }
+
+  static void printUsage(Options cliOptions) {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + NewClusterSetup.class.getName(), cliOptions);
+  }
+
+  ClusterAccessor clusterAccessor(String clusterName) {
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+    return new ClusterAccessor(ClusterId.from(clusterName), accessor);
+  }
+
+  ParticipantAccessor participantAccessor(String clusterName) {
+    return new ParticipantAccessor(new ZKHelixDataAccessor(clusterName, _baseAccessor));
+  }
+
+  ResourceAccessor resourceAccessor(String clusterName) {
+    return new ResourceAccessor(new ZKHelixDataAccessor(clusterName, _baseAccessor));
+  }
+
+  void addCluster(String[] optValues) {
+    String clusterName = optValues[0];
+
+    List<StateModelDefinition> defaultStateModelDefs = new ArrayList<StateModelDefinition>();
+    defaultStateModelDefs.add(new StateModelDefinition(StateModelConfigGenerator
+        .generateConfigForMasterSlave()));
+
+    ClusterConfig.Builder builder =
+        new ClusterConfig.Builder(ClusterId.from(clusterName))
+            .addStateModelDefinitions(defaultStateModelDefs);
+
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    accessor.createCluster(builder.build());
+  }
+
+  void addResource(String[] optValues, String[] rebalancerModeValues, String[] bucketSizeValues,
+      String[] maxPartitionsPerNodeValues) {
+    String clusterName = optValues[0];
+    String resourceName = optValues[1];
+    int partitionNumber = Integer.parseInt(optValues[2]);
+    String stateModelDefName = optValues[3];
+    RebalanceMode rebalancerMode =
+        rebalancerModeValues == null ? RebalanceMode.SEMI_AUTO : RebalanceMode
+            .valueOf(rebalancerModeValues[0]);
+
+    int bucketSize = bucketSizeValues == null ? 0 : Integer.parseInt(bucketSizeValues[0]);
+
+    int maxPartitionsPerNode =
+        maxPartitionsPerNodeValues == null ? -1 : Integer.parseInt(maxPartitionsPerNodeValues[0]);
+
+    ResourceId resourceId = ResourceId.from(resourceName);
+    StateModelDefId stateModelDefId = StateModelDefId.from(stateModelDefName);
+
+    IdealState idealState = new IdealState(resourceName);
+    idealState.setRebalanceMode(rebalancerMode);
+    idealState.setNumPartitions(partitionNumber);
+    idealState.setMaxPartitionsPerInstance(maxPartitionsPerNode);
+    idealState.setStateModelDefId(stateModelDefId);
+
+    RebalancerContext rebalancerCtx = PartitionedRebalancerContext.from(idealState);
+    ResourceConfig.Builder builder =
+        new ResourceConfig.Builder(resourceId).rebalancerContext(rebalancerCtx).bucketSize(
+            bucketSize);
+
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    accessor.addResourceToCluster(builder.build());
+
+  }
+
+  void addInstance(String[] optValues) {
+    String clusterName = optValues[0];
+    String[] instanceIds = optValues[1].split(";");
+
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    for (String instanceId : instanceIds) {
+      ParticipantConfig.Builder builder =
+          new ParticipantConfig.Builder(ParticipantId.from(instanceId));
+
+      accessor.addParticipantToCluster(builder.build());
+    }
+  }
+
+  void dropCluster(String[] optValues) {
+    String clusterName = optValues[0];
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    accessor.dropCluster();
+  }
+
+  void dropResource(String[] optValues) {
+    String clusterName = optValues[0];
+    String resourceName = optValues[1];
+
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    accessor.dropResourceFromCluster(ResourceId.from(resourceName));
+  }
+
+  void dropInstance(String[] optValues) {
+    String clusterName = optValues[0];
+    String[] instanceIds = optValues[1].split(";");
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    for (String instanceId : instanceIds) {
+      accessor.dropParticipantFromCluster(ParticipantId.from(instanceId));
+    }
+
+  }
+
+  private static byte[] readFile(String filePath) throws IOException {
+    File file = new File(filePath);
+
+    int size = (int) file.length();
+    byte[] bytes = new byte[size];
+    DataInputStream dis = null;
+    try {
+      dis = new DataInputStream(new FileInputStream(file));
+      int read = 0;
+      int numRead = 0;
+      while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
+        read = read + numRead;
+      }
+      return bytes;
+    } finally {
+      if (dis != null) {
+        dis.close();
+      }
+    }
+  }
+
+  void addStateModelDef(String[] optValues) {
+    String clusterName = optValues[0];
+    String stateModelDefJsonFile = optValues[1];
+
+    try {
+      StateModelDefinition stateModelDef =
+          new StateModelDefinition(
+              (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefJsonFile))));
+      ClusterAccessor accessor = clusterAccessor(clusterName);
+      accessor.addStateModelDefinitionToCluster(stateModelDef);
+
+    } catch (IOException e) {
+      LOG.error("Could not parse the state model", e);
+    }
+
+  }
+
+  void addIdealState(String[] optValues) {
+    String clusterName = optValues[0];
+    String resourceName = optValues[1];
+    String idealStateJsonFile = optValues[2];
+
+    try {
+      IdealState idealState =
+          new IdealState(
+              (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateJsonFile))));
+
+      RebalancerContext rebalancerCtx = PartitionedRebalancerContext.from(idealState);
+      ResourceConfig.Builder builder =
+          new ResourceConfig.Builder(ResourceId.from(resourceName))
+              .rebalancerContext(rebalancerCtx).bucketSize(idealState.getBucketSize());
+
+      ClusterAccessor accessor = clusterAccessor(clusterName);
+      accessor.addResourceToCluster(builder.build());
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  void addInstanceTag(String[] optValues) {
+    String clusterName = optValues[0];
+    String participantName = optValues[1];
+    String tag = optValues[2];
+
+    ParticipantAccessor accessor = participantAccessor(clusterName);
+    ParticipantId participantId = ParticipantId.from(participantName);
+
+    ParticipantConfig.Delta delta = new ParticipantConfig.Delta(participantId);
+    delta.addTag(tag);
+    accessor.updateParticipant(participantId, delta);
+  }
+
+  void removeInstanceTag(String[] optValues) {
+    String clusterName = optValues[0];
+    String participantName = optValues[1];
+    String tag = optValues[2];
+
+    ParticipantAccessor accessor = participantAccessor(clusterName);
+    ParticipantId participantId = ParticipantId.from(participantName);
+
+    ParticipantConfig.Delta delta = new ParticipantConfig.Delta(participantId);
+    delta.removeTag(tag);
+    accessor.updateParticipant(participantId, delta);
+  }
+
+  void listPartitionInfo(String[] optValues) {
+    String clusterName = optValues[0];
+    String resourceName = optValues[1];
+    String partitionName = optValues[2];
+
+    ResourceId resourceId = ResourceId.from(resourceName);
+    PartitionId partitionId = PartitionId.from(partitionName);
+    ResourceAccessor accessor = resourceAccessor(clusterName);
+    Resource resource = accessor.readResource(resourceId);
+
+    StringBuilder sb = new StringBuilder();
+    Map<ParticipantId, State> stateMap = resource.getExternalView().getStateMap(partitionId);
+    sb.append(resourceName + "/" + partitionName + ", externalView: " + stateMap);
+    PartitionedRebalancerContext partitionedContext =
+        resource.getRebalancerConfig().getRebalancerContext(PartitionedRebalancerContext.class);
+    if (partitionedContext != null) {
+      // for partitioned contexts, check the mode and apply mode-specific information if possible
+      if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+        SemiAutoRebalancerContext semiAutoContext =
+            resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
+        sb.append(", preferenceList: " + semiAutoContext.getPreferenceList(partitionId));
+      } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+        CustomRebalancerContext customContext =
+            resource.getRebalancerConfig().getRebalancerContext(CustomRebalancerContext.class);
+        sb.append(", preferenceMap: " + customContext.getPreferenceMap(partitionId));
+      }
+      if (partitionedContext.anyLiveParticipant()) {
+        sb.append(", anyLiveParticipant: " + partitionedContext.anyLiveParticipant());
+      } else {
+        sb.append(", replicaCount: " + partitionedContext.getReplicaCount());
+      }
+    }
+
+    System.out.println(sb.toString());
+  }
+
+  void enableInstance(String[] optValues) {
+    String clusterName = optValues[0];
+    String instanceId = optValues[1];
+    if (instanceId.indexOf(":") != -1) {
+      instanceId = instanceId.replaceAll(":", "_");
+    }
+    boolean enabled = Boolean.parseBoolean(optValues[2].toLowerCase());
+
+    ParticipantAccessor accessor = participantAccessor(clusterName);
+    if (enabled) {
+      accessor.enableParticipant(ParticipantId.from(instanceId));
+    } else {
+      accessor.disableParticipant(ParticipantId.from(instanceId));
+    }
+  }
+
+  void enablePartition(String[] optValues) {
+    boolean enabled = Boolean.parseBoolean(optValues[0].toLowerCase());
+    String clusterName = optValues[1];
+    ParticipantId participantId = ParticipantId.from(optValues[2]);
+    ResourceId resourceId = ResourceId.from(optValues[3]);
+
+    Set<PartitionId> partitionIdSet = new HashSet<PartitionId>();
+    for (int i = 4; i < optValues.length; i++) {
+      partitionIdSet.add(PartitionId.from(optValues[i]));
+    }
+
+    ParticipantAccessor accessor = participantAccessor(clusterName);
+    if (enabled) {
+      accessor.enablePartitionsForParticipant(participantId, resourceId, partitionIdSet);
+    } else {
+      accessor.disablePartitionsForParticipant(participantId, resourceId, partitionIdSet);
+    }
+  }
+
+  void enableCluster(String[] optValues) {
+    String clusterName = optValues[0];
+    boolean enabled = Boolean.parseBoolean(optValues[1].toLowerCase());
+
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    if (enabled) {
+      accessor.resumeCluster();
+    } else {
+      accessor.pauseCluster();
+    }
+  }
+
+  /**
+   * Convert user config to key value map
+   * @param userConfig
+   * @param mapKey
+   * @param keys
+   * @return
+   */
+  private Map<String, String> keyValueMap(UserConfig userConfig, String mapKey, String[] keys) {
+    Map<String, String> results = new HashMap<String, String>();
+
+    for (String key : keys) {
+      if (mapKey == null) {
+        results.put(key, userConfig.getSimpleField(key));
+      } else {
+        results.put(key, userConfig.getMapField(mapKey).get(key));
+      }
+    }
+    return results;
+  }
+
+  void getConfig(String[] optValues) {
+    ScopeType scopeType = ScopeType.valueOf(optValues[0].toUpperCase());
+    String[] scopeArgs = optValues[1].split("[\\s,]");
+    String[] keys = optValues[2].split("[\\s,]");
+
+    String clusterName = scopeArgs[0];
+    Map<String, String> results = null;
+    switch (scopeType) {
+    case CLUSTER: {
+      ClusterAccessor accessor = clusterAccessor(clusterName);
+      results = keyValueMap(accessor.readUserConfig(), null, keys);
+      break;
+    }
+    case PARTICIPANT: {
+      ParticipantId participantId = ParticipantId.from(scopeArgs[1]);
+      ParticipantAccessor accessor = participantAccessor(clusterName);
+      results = keyValueMap(accessor.readUserConfig(participantId), null, keys);
+      break;
+    }
+    case RESOURCE: {
+      ResourceId resourceId = ResourceId.from(scopeArgs[1]);
+      ResourceAccessor accessor = resourceAccessor(clusterName);
+      results = keyValueMap(accessor.readUserConfig(resourceId), null, keys);
+      break;
+    }
+    case PARTITION: {
+      ResourceId resourceId = ResourceId.from(scopeArgs[1]);
+      String partitionId = scopeArgs[2];
+      ResourceAccessor accessor = resourceAccessor(clusterName);
+      results = keyValueMap(accessor.readUserConfig(resourceId), partitionId, keys);
+      break;
+    }
+    default:
+      System.err.println("Non-recognized scopeType: " + scopeType);
+      break;
+    }
+
+    System.out.println(results);
+  }
+
+  /**
+   * Convert key-value map to user-config
+   * @param scope
+   * @param mapKey
+   * @param keyValues
+   * @return
+   */
+  private UserConfig userConfig(Scope<?> scope, String mapKey, String[] keyValues) {
+    UserConfig userConfig = new UserConfig(scope);
+
+    for (String keyValue : keyValues) {
+      String[] splits = keyValue.split("=");
+      String key = splits[0];
+      String value = splits[1];
+      if (mapKey == null) {
+        userConfig.setSimpleField(key, value);
+      } else {
+        if (userConfig.getMapField(mapKey) == null) {
+          userConfig.setMapField(mapKey, new TreeMap<String, String>());
+        }
+        userConfig.getMapField(mapKey).put(key, value);
+      }
+    }
+    return userConfig;
+  }
+
+  void setConfig(String[] optValues) {
+    ScopeType scopeType = ScopeType.valueOf(optValues[0].toUpperCase());
+    String[] scopeArgs = optValues[1].split("[\\s,]");
+    String[] keyValues = optValues[2].split("[\\s,]");
+
+    String clusterName = scopeArgs[0];
+    Map<String, String> results = new HashMap<String, String>();
+    switch (scopeType) {
+    case CLUSTER: {
+      ClusterAccessor accessor = clusterAccessor(clusterName);
+      Scope<ClusterId> scope = Scope.cluster(ClusterId.from(clusterName));
+      UserConfig userConfig = userConfig(scope, null, keyValues);
+      accessor.setUserConfig(userConfig);
+      break;
+    }
+    case PARTICIPANT: {
+      ParticipantId participantId = ParticipantId.from(scopeArgs[1]);
+      ParticipantAccessor accessor = participantAccessor(clusterName);
+      Scope<ParticipantId> scope = Scope.participant(participantId);
+      UserConfig userConfig = userConfig(scope, null, keyValues);
+      accessor.setUserConfig(participantId, userConfig);
+      break;
+    }
+    case RESOURCE: {
+      ResourceId resourceId = ResourceId.from(scopeArgs[1]);
+      ResourceAccessor accessor = resourceAccessor(clusterName);
+      Scope<ResourceId> scope = Scope.resource(resourceId);
+      UserConfig userConfig = userConfig(scope, null, keyValues);
+      accessor.setUserConfig(resourceId, userConfig);
+      break;
+    }
+    case PARTITION: {
+      ResourceId resourceId = ResourceId.from(scopeArgs[1]);
+      String partitionId = scopeArgs[2];
+      ResourceAccessor accessor = resourceAccessor(clusterName);
+      Scope<ResourceId> scope = Scope.resource(resourceId);
+      UserConfig userConfig = userConfig(scope, partitionId, keyValues);
+      accessor.setUserConfig(resourceId, userConfig);
+      break;
+    }
+    default:
+      System.err.println("Non-recognized scopeType: " + scopeType);
+      break;
+    }
+
+    System.out.println(results);
+  }
+
+  void setConstraint(String[] optValues) {
+    String clusterName = optValues[0];
+    String constraintType = optValues[1];
+    String constraintId = optValues[2];
+    String constraintAttributesMap = optValues[3];
+    if (clusterName == null || constraintType == null || constraintId == null
+        || constraintAttributesMap == null) {
+      System.err
+          .println("fail to set constraint. missing clusterName|constraintType|constraintId|constraintAttributesMap");
+      return;
+    }
+    ClusterId clusterId = ClusterId.from(clusterName);
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    Map<String, String> constraintAttributes =
+        HelixUtil.parseCsvFormatedKeyValuePairs(constraintAttributesMap);
+    ConstraintItem item = new ConstraintItem(constraintAttributes);
+    ClusterConfig.Delta delta =
+        new ClusterConfig.Delta(clusterId).addConstraintItem(
+            ConstraintType.valueOf(constraintType), ConstraintId.from(constraintId), item);
+    accessor.updateCluster(delta);
+  }
+
+  void listClusterInfo(String[] optValues) {
+    String clusterName = optValues[0];
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    Set<ResourceId> resources = accessor.readResources().keySet();
+    StringBuilder sb =
+        new StringBuilder("Existing resources in cluster ").append(clusterName).append(":\n");
+    for (ResourceId resourceId : resources) {
+      sb.append(resourceId.stringify()).append('\n');
+    }
+    Set<ParticipantId> participants = accessor.readParticipants().keySet();
+    sb.append("Participants in cluster ").append(clusterName).append(":\n");
+    for (ParticipantId participantId : participants) {
+      sb.append(participantId.stringify()).append('\n');
+    }
+    System.out.print(sb.toString());
+  }
+
+  void listParticipantInfo(String[] optValues) {
+    String clusterName = optValues[0];
+    String participantName = optValues[1];
+    ParticipantAccessor accessor = participantAccessor(clusterName);
+    ParticipantId participantId = ParticipantId.from(participantName);
+    Participant participant = accessor.readParticipant(participantId);
+    StringBuilder sb =
+        new StringBuilder("Participant ").append(participantName).append(" in cluster ")
+            .append(clusterName).append(":\n").append("hostName: ")
+            .append(participant.getHostName()).append(", port: ").append(participant.getPort())
+            .append(", enabled: ").append(participant.isEnabled()).append(", disabledPartitions: ")
+            .append(participant.getDisabledPartitionIds().toString()).append(", tags:")
+            .append(participant.getTags().toString()).append(", currentState: ")
+            .append(", messages: ").append(participant.getMessageMap().toString())
+            .append(participant.getCurrentStateMap().toString()).append(", alive: ")
+            .append(participant.isAlive()).append(", userConfig: ")
+            .append(participant.getUserConfig().toString());
+    if (participant.isAlive()) {
+      RunningInstance runningInstance = participant.getRunningInstance();
+      sb.append(", sessionId: ").append(runningInstance.getSessionId().stringify())
+          .append(", processId: ").append(runningInstance.getPid().stringify())
+          .append(", helixVersion: ").append(runningInstance.getVersion().toString());
+    }
+    System.out.println(sb.toString());
+  }
+
+  void listResourceInfo(String[] optValues) {
+    String clusterName = optValues[0];
+    String resourceName = optValues[1];
+    ResourceAccessor accessor = resourceAccessor(clusterName);
+    ResourceId resourceId = ResourceId.from(resourceName);
+    Resource resource = accessor.readResource(resourceId);
+    StringBuilder sb =
+        new StringBuilder("Resource ").append(resourceName).append(" in cluster ")
+            .append(clusterName).append(":\n").append("externalView: ")
+            .append(resource.getExternalView()).append(", userConfig: ")
+            .append(resource.getUserConfig()).append(", rebalancerContext: ")
+            .append(resource.getRebalancerConfig().getSerializedContext());
+    System.out.println(sb.toString());
+  }
+
+  void listResources(String[] optValues) {
+    String clusterName = optValues[0];
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    Set<ResourceId> resources = accessor.readResources().keySet();
+    StringBuilder sb =
+        new StringBuilder("Existing resources in cluster ").append(clusterName).append(":\n");
+    for (ResourceId resourceId : resources) {
+      sb.append(resourceId.stringify()).append('\n');
+    }
+    System.out.print(sb.toString());
+  }
+
+  void listParticipants(String[] optValues) {
+    String clusterName = optValues[0];
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    Set<ParticipantId> participants = accessor.readParticipants().keySet();
+    StringBuilder sb =
+        new StringBuilder("Participants in cluster ").append(clusterName).append(":\n");
+    for (ParticipantId participantId : participants) {
+      sb.append(participantId.stringify()).append('\n');
+    }
+    System.out.print(sb.toString());
+  }
+
+  void listStateModels(String[] optValues) {
+    String clusterName = optValues[0];
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    Set<StateModelDefId> stateModelDefs = accessor.readStateModelDefinitions().keySet();
+    StringBuilder sb =
+        new StringBuilder("State models in cluster ").append(clusterName).append(":\n");
+    for (StateModelDefId stateModelDefId : stateModelDefs) {
+      sb.append(stateModelDefId.stringify()).append('\n');
+    }
+    System.out.print(sb.toString());
+  }
+
+  void listStateModel(String[] optValues) {
+    String clusterName = optValues[0];
+    String stateModel = optValues[1];
+    StateModelDefId stateModelDefId = StateModelDefId.from(stateModel);
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs =
+        accessor.readStateModelDefinitions();
+    StateModelDefinition stateModelDef = stateModelDefs.get(stateModelDefId);
+    StringBuilder sb = new StringBuilder("StateModelDefinition: ").append(stateModelDef.toString());
+    System.out.println(sb.toString());
+  }
+
+  void listClusters(String[] optValues) {
+    List<ClusterId> result = Lists.newArrayList();
+    List<String> clusterNames = _baseAccessor.getChildNames("/", 0);
+    for (String clusterName : clusterNames) {
+      ClusterAccessor accessor = clusterAccessor(clusterName);
+      if (accessor.isClusterStructureValid()) {
+        result.add(ClusterId.from(clusterName));
+      }
+    }
+    System.out.println("Existing clusters: " + result);
+  }
+
+  void removeConfig(String[] optValues) {
+    ScopeType type = ScopeType.valueOf(optValues[0].toUpperCase());
+    String[] scopeArgs = optValues[1].split("[\\s,]");
+    String[] keys = optValues[2].split("[\\s,]");
+    String clusterName = scopeArgs[0];
+    UserConfig userConfig;
+    switch (type) {
+    case CLUSTER:
+      ClusterAccessor clusterAccessor = clusterAccessor(clusterName);
+      userConfig = clusterAccessor.readUserConfig();
+      removeKeysFromUserConfig(userConfig, keys);
+      clusterAccessor.setUserConfig(userConfig);
+      break;
+    case RESOURCE:
+      ResourceAccessor resourceAccessor = resourceAccessor(clusterName);
+      ResourceId resourceId = ResourceId.from(scopeArgs[1]);
+      userConfig = resourceAccessor.readUserConfig(resourceId);
+      removeKeysFromUserConfig(userConfig, keys);
+      resourceAccessor.setUserConfig(resourceId, userConfig);
+      break;
+    case PARTICIPANT:
+      ParticipantAccessor participantAccessor = participantAccessor(clusterName);
+      ParticipantId participantId = ParticipantId.from(scopeArgs[1]);
+      userConfig = participantAccessor.readUserConfig(participantId);
+      removeKeysFromUserConfig(userConfig, keys);
+      participantAccessor.setUserConfig(participantId, userConfig);
+      break;
+    case PARTITION:
+      ResourceAccessor resourcePartitionAccessor = resourceAccessor(clusterName);
+      PartitionId partitionId = PartitionId.from(scopeArgs[1]);
+      userConfig = resourcePartitionAccessor.readUserConfig(partitionId.getResourceId());
+      removePartitionFromResourceUserConfig(userConfig, partitionId, keys);
+      resourcePartitionAccessor.setUserConfig(partitionId.getResourceId(), userConfig);
+      break;
+    }
+  }
+
+  private void removeKeysFromUserConfig(UserConfig userConfig, String[] keys) {
+    Map<String, String> simpleFields = Maps.newHashMap(userConfig.getSimpleFields());
+    for (String key : keys) {
+      simpleFields.remove(key);
+    }
+    userConfig.setSimpleFields(simpleFields);
+  }
+
+  private void removePartitionFromResourceUserConfig(UserConfig userConfig,
+      PartitionId partitionId, String[] keys) {
+    Map<String, String> fields = Maps.newHashMap(userConfig.getMapField(partitionId.stringify()));
+    for (String key : keys) {
+      fields.remove(key);
+    }
+    userConfig.setMapField(partitionId.stringify(), fields);
+  }
+
+  static int processCommandLineArgs(String[] cliArgs) {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    CommandLine cmd = null;
+
+    try {
+      cmd = cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe) {
+      System.err.println("CommandLineClient: failed to parse command-line options: " + pe);
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+
+    String zkAddr = cmd.getOptionValue(HelixOption.zkSvr.name());
+    ZkClient zkclient = null;
+
+    try {
+      zkclient =
+          new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_SESSION_TIMEOUT,
+              new ZNRecordSerializer());
+
+      NewClusterSetup setup = new NewClusterSetup(zkclient);
+
+      Option[] options = cmd.getOptions();
+
+      for (Option option : options) {
+        if (option.getLongOpt().equals(HelixOption.zkSvr.name())) {
+          continue;
+        }
+
+        HelixOption opt = HelixOption.valueOf(option.getLongOpt());
+        String[] optValues = cmd.getOptionValues(option.getLongOpt());
+
+        checkArgNum(opt, optValues);
+
+        switch (opt) {
+        case listClusters:
+          setup.listClusters(optValues);
+          break;
+        case listResources:
+          setup.listResources(optValues);
+          break;
+        case listInstances:
+          setup.listParticipants(optValues);
+          break;
+        case addCluster:
+          setup.addCluster(optValues);
+          break;
+        case activateCluster:
+          break;
+        case dropCluster:
+          setup.dropCluster(optValues);
+          break;
+        case dropResource:
+          setup.dropResource(optValues);
+          break;
+        case addInstance:
+          setup.addInstance(optValues);
+          break;
+        case addResource:
+          String[] rebalancerModeValues = null;
+          if (cmd.hasOption(HelixOption.rebalancerMode.name())) {
+            rebalancerModeValues = cmd.getOptionValues(HelixOption.rebalancerMode.name());
+            checkArgNum(HelixOption.rebalancerMode, rebalancerModeValues);
+          }
+          String[] bucketSizeValues = null;
+          if (cmd.hasOption(HelixOption.bucketSize.name())) {
+            bucketSizeValues = cmd.getOptionValues(HelixOption.bucketSize.name());
+            checkArgNum(HelixOption.bucketSize, bucketSizeValues);
+          }
+          String[] maxPartitionsPerNodeValues = null;
+          if (cmd.hasOption(HelixOption.maxPartitionsPerNode.name())) {
+            maxPartitionsPerNodeValues =
+                cmd.getOptionValues(HelixOption.maxPartitionsPerNode.name());
+            checkArgNum(HelixOption.maxPartitionsPerNode, maxPartitionsPerNodeValues);
+          }
+          setup.addResource(optValues, rebalancerModeValues, bucketSizeValues,
+              maxPartitionsPerNodeValues);
+          break;
+        case addStateModelDef:
+          setup.addStateModelDef(optValues);
+          break;
+        case addIdealState:
+          setup.addIdealState(optValues);
+          break;
+        case swapInstance:
+          // TODO impl ClusterAccessor#swapParticipantsInCluster()
+          break;
+        case dropInstance:
+          setup.dropInstance(optValues);
+          break;
+        case rebalance:
+          // TODO impl this using ResourceAccessor
+          break;
+        case expandCluster:
+          // TODO impl this
+          break;
+        case expandResource:
+          // TODO impl this
+          break;
+        case mode:
+        case rebalancerMode:
+        case bucketSize:
+        case maxPartitionsPerNode:
+          // always used with addResource command
+          continue;
+        case instanceGroupTag:
+        case resourceKeyPrefix:
+          // always used with rebalance command
+          continue;
+        case addResourceProperty:
+          throw new UnsupportedOperationException(HelixOption.addResourceProperty
+              + " is not supported, please use setConfig");
+        case removeResourceProperty:
+          throw new UnsupportedOperationException(HelixOption.removeResourceProperty
+              + " is not supported, please use removeConfig");
+        case addInstanceTag:
+          setup.addInstanceTag(optValues);
+          break;
+        case removeInstanceTag:
+          setup.removeInstanceTag(optValues);
+          break;
+        case listClusterInfo:
+          setup.listClusterInfo(optValues);
+          break;
+        case listInstanceInfo:
+          setup.listParticipantInfo(optValues);
+          break;
+        case listResourceInfo:
+          setup.listResourceInfo(optValues);
+          break;
+        case listPartitionInfo:
+          setup.listPartitionInfo(optValues);
+          break;
+        case listStateModels:
+          setup.listStateModels(optValues);
+          break;
+        case listStateModel:
+          setup.listStateModel(optValues);
+          break;
+        case enableInstance:
+          setup.enableInstance(optValues);
+          break;
+        case enablePartition:
+          setup.enablePartition(optValues);
+          break;
+        case enableCluster:
+          setup.enableCluster(optValues);
+          break;
+        case resetPartition:
+          // TODO impl ResoourceAccessor#resetPartitions()
+          break;
+        case resetInstance:
+          // TODO impl ParticipantAccessor#resetInstance()
+          break;
+        case resetResource:
+          // TODO impl ResourceAccessor#resetResource()
+          break;
+        case addStat:
+          // TODO impl ClusterAccessor.addStat()
+          break;
+        case addAlert:
+          // TODO impl ClusterAccessor#addAlert()
+          break;
+        case dropStat:
+          // TODO impl ClusterAccessor.dropStat()
+          break;
+        case dropAlert:
+          // TODO impl ClusterAccessor#dropAlert()
+          break;
+        case getConfig:
+          setup.getConfig(optValues);
+          break;
+        case setConfig:
+          setup.setConfig(optValues);
+          break;
+        case removeConfig:
+          setup.removeConfig(optValues);
+          break;
+        case getConstraints:
+          break;
+        case setConstraint:
+          setup.setConstraint(optValues);
+          break;
+        case removeConstraint:
+          break;
+        default:
+          System.err.println("Non-recognized option: " + opt);
+          break;
+        }
+
+        // process 1 option only
+        break;
+      }
+
+      return 0;
+    } finally {
+      if (zkclient != null) {
+        zkclient.close();
+      }
+    }
+  }
+
+  public static void main(String[] args) {
+    // if (args.length == 1 && args[0].equals("setup-test-cluster")) {
+    // System.out
+    // .println("By default setting up TestCluster with 6 instances, 10 partitions, Each partition will have 3 replicas");
+    // new ClusterSetup("localhost:2181").setupTestCluster("TestCluster");
+    // System.exit(0);
+    // }
+
+    int ret = processCommandLineArgs(args);
+    System.exit(ret);
+
+  }
+}


Mime
View raw message