helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [6/7] git commit: [HELIX-94] Add the ability to enable and disable a resource, rb=20401
Date Fri, 11 Jul 2014 17:13:20 GMT
[HELIX-94] Add the ability to enable and disable a resource, rb=20401


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c1af744a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c1af744a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c1af744a

Branch: refs/heads/master
Commit: c1af744af6088ad98fe2a8a5074f2c56199b6f82
Parents: 96aef71
Author: zzhang <zzhang@apache.org>
Authored: Wed May 21 14:56:47 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Fri Jul 11 10:04:37 2014 -0700

----------------------------------------------------------------------
 .../helix/webapp/resources/JsonParameters.java  |   4 +
 .../webapp/resources/ResourceGroupResource.java |   7 +
 .../helix/webapp/TestDisableResource.java       |  84 ++++++
 .../main/java/org/apache/helix/HelixAdmin.java  |   7 +
 .../controller/rebalancer/CustomRebalancer.java |   5 +-
 .../rebalancer/FallbackRebalancer.java          |   4 +-
 .../rebalancer/FullAutoRebalancer.java          |   5 +-
 .../rebalancer/SemiAutoRebalancer.java          |   5 +-
 .../config/SemiAutoRebalancerConfig.java        |   2 +-
 .../util/ConstraintBasedAssignment.java         |  26 +-
 .../stages/BestPossibleStateCalcStage.java      |   4 +-
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  84 ++++--
 .../java/org/apache/helix/model/IdealState.java |  21 +-
 .../participant/HelixCustomCodeRunner.java      |  16 +-
 .../org/apache/helix/tools/ClusterSetup.java    |  17 +-
 .../strategy/TestAutoRebalanceStrategy.java     |   2 +-
 .../strategy/TestNewAutoRebalanceStrategy.java  |   2 +-
 .../TestDisableCustomCodeRunner.java            | 252 +++++++++++++++++
 .../helix/integration/TestDisableResource.java  | 267 +++++++++++++++++++
 .../helix/manager/zk/TestZkHelixAdmin.java      |  34 ++-
 .../apache/helix/tools/TestClusterSetup.java    |  39 ++-
 21 files changed, 826 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index 5f405d8..19ac71a 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -165,6 +165,10 @@ public class JsonParameters {
       if (!_parameterMap.containsKey(ENABLED)) {
         throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
       }
+    } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+      if (!_parameterMap.containsKey(ENABLED)) {
+        throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
+      }
     } else if (command.equalsIgnoreCase(ClusterSetup.enablePartition)) {
       if (!_parameterMap.containsKey(ENABLED)) {
         throw new HelixException("Missing Json parameters: '" + ENABLED + "'");

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
index 055f64a..6dc721d 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
@@ -114,6 +114,13 @@ public class ResourceGroupResource extends ServerResource {
         ClusterSetup setupTool = new ClusterSetup(zkClient);
         setupTool.getClusterManagementTool()
             .resetResource(clusterName, Arrays.asList(resourceName));
+      } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+        jsonParameters.verifyCommand(ClusterSetup.enableResource);
+        boolean enabled = Boolean.parseBoolean(jsonParameters.getParameter(JsonParameters.ENABLED));
+        ZkClient zkClient =
+            (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+        ClusterSetup setupTool = new ClusterSetup(zkClient);
+        setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
       } else {
         throw new HelixException("Unsupported command: " + command + ". Should be one of ["
             + ClusterSetup.resetResource + "]");

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
new file mode 100644
index 0000000..9800179
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
@@ -0,0 +1,84 @@
+package org.apache.helix.webapp;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends AdminTestBase {
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        n, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    String instanceUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+            + "TestDB0";
+
+    // Disable TestDB0
+    Map<String, String> paramMap = new HashMap<String, String>();
+    paramMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableResource);
+    paramMap.put(JsonParameters.ENABLED, Boolean.toString(false));
+    TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertFalse(idealState.isEnabled());
+
+    // Re-enable TestDB0
+    paramMap.put(JsonParameters.ENABLED, Boolean.toString(true));
+    TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+    idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertTrue(idealState.isEnabled());
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 892bada..dce3893 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -177,6 +177,13 @@ public interface HelixAdmin {
   void enableInstance(String clusterName, String instanceName, boolean enabled);
 
   /**
+   * Disable or enable a resource
+   * @param clusterName
+   * @param resourceName
+   */
+  void enableResource(String clusterName, String resourceName, boolean enabled);
+
+  /**
    * Disable or enable a list of partitions on an instance
    * @param enabled
    * @param clusterName

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 6629bec..4d5c373 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -14,6 +14,7 @@ import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -51,6 +52,8 @@ public class CustomRebalancer implements HelixRebalancer {
       ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
     CustomRebalancerConfig config =
         BasicRebalancerConfig.convert(rebalancerConfig, CustomRebalancerConfig.class);
+    IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+    boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
@@ -65,7 +68,7 @@ public class CustomRebalancer implements HelixRebalancer {
       Map<ParticipantId, State> bestStateForPartition =
           ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
               .getLiveParticipantMap().keySet(), stateModelDef, config.getPreferenceMap(partition),
-              currentStateMap, disabledInstancesForPartition);
+              currentStateMap, disabledInstancesForPartition, isEnabled);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
index 3aa41d7..e00e57c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -160,7 +160,7 @@ public class FallbackRebalancer implements HelixRebalancer {
             ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
                 .getLiveParticipantMap().keySet(), stateModelDef, newIdealState
                 .getParticipantStateMap(partitionId), currentState.getCurrentStateMap(resourceId,
-                partitionId), disabledParticipants);
+                partitionId), disabledParticipants, idealState.isEnabled());
         assignment.addReplicaMap(partitionId, replicaMap);
       }
     } else {
@@ -176,7 +176,7 @@ public class FallbackRebalancer implements HelixRebalancer {
             ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
                 .getLiveParticipantMap().keySet(), stateModelDef, newIdealState
                 .getPreferenceList(partitionId), currentState.getCurrentStateMap(resourceId,
-                partitionId), disabledParticipants);
+                partitionId), disabledParticipants, idealState.isEnabled());
         assignment.addReplicaMap(partitionId, replicaMap);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 13616b3..4bf030b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -25,6 +25,7 @@ import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -67,6 +68,8 @@ public class FullAutoRebalancer implements HelixRebalancer {
       ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
     FullAutoRebalancerConfig config =
         BasicRebalancerConfig.convert(rebalancerConfig, FullAutoRebalancerConfig.class);
+    IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+    boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     // Compute a preference list based on the current ideal state
@@ -176,7 +179,7 @@ public class FullAutoRebalancer implements HelixRebalancer {
           ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
               liveParticipants.keySet(), stateModelDef, preferenceList,
               currentState.getCurrentStateMap(config.getResourceId(), partition),
-              disabledParticipantsForPartition);
+              disabledParticipantsForPartition, isEnabled);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index 51ca463..07f6337 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -15,6 +15,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -55,6 +56,8 @@ public class SemiAutoRebalancer implements HelixRebalancer {
       ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
     SemiAutoRebalancerConfig config =
         BasicRebalancerConfig.convert(rebalancerConfig, SemiAutoRebalancerConfig.class);
+    IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+    boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
@@ -77,7 +80,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
       Map<ParticipantId, State> bestStateForPartition =
           ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
               .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
-              disabledInstancesForPartition);
+              disabledInstancesForPartition, isEnabled);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
index 727c3df..60e30f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -116,7 +116,7 @@ public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig
         Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
         Map<ParticipantId, State> initialMap =
             ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
-                stateModelDef, preferenceList, emptyCurrentState, disabledParticipants);
+                stateModelDef, preferenceList, emptyCurrentState, disabledParticipants, true);
         currentMapping.put(partitionId, initialMap);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 7951784..addd652 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -117,23 +117,24 @@ public class ConstraintBasedAssignment {
    * @param participants participants selected to serve the partition
    * @param disabledParticipants participants that have been disabled for this partition
    * @param initialState the initial state of the resource state model
+   * @param isEnabled true if resource is enabled, false otherwise
    * @return map of participant id to state of dropped and disabled partitions
    */
   public static Map<ParticipantId, State> dropAndDisablePartitions(
       Map<ParticipantId, State> currentStateMap, Collection<ParticipantId> participants,
-      Set<ParticipantId> disabledParticipants, State initialState) {
+      Set<ParticipantId> disabledParticipants, boolean isEnabled, State initialState) {
     Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
     // if the resource is deleted, instancePreferenceList will be empty and
     // we should drop all resources.
     if (currentStateMap != null) {
       for (ParticipantId participantId : currentStateMap.keySet()) {
         if ((participants == null || !participants.contains(participantId))
-            && !disabledParticipants.contains(participantId)) {
+            && !disabledParticipants.contains(participantId) && isEnabled) {
           // if dropped and not disabled, transit to DROPPED
           participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
         } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
             participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipants.contains(participantId)) {
+            && (disabledParticipants.contains(participantId) || !isEnabled)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
           participantStateMap.put(participantId, initialState);
         }
@@ -151,16 +152,18 @@ public class ConstraintBasedAssignment {
    * @param currentStateMap
    *          : participant->state for each partition
    * @param disabledParticipantsForPartition
+   * @param isEnabled true if resource is enabled, false if disabled
    * @return
    */
   public static Map<ParticipantId, State> computeAutoBestStateForPartition(
       Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
       StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
-      Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+      Map<ParticipantId, State> currentStateMap,
+      Set<ParticipantId> disabledParticipantsForPartition, boolean isEnabled) {
     // drop and disable participants if necessary
     Map<ParticipantId, State> participantStateMap =
         dropAndDisablePartitions(currentStateMap, participantPreferenceList,
-            disabledParticipantsForPartition, stateModelDef.getTypedInitialState());
+            disabledParticipantsForPartition, isEnabled, stateModelDef.getTypedInitialState());
 
     // resource is deleted
     if (participantPreferenceList == null) {
@@ -176,7 +179,7 @@ public class ConstraintBasedAssignment {
       if ("N".equals(num)) {
         Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
         liveAndEnabled.removeAll(disabledParticipantsForPartition);
-        stateCount = liveAndEnabled.size();
+        stateCount = isEnabled ? liveAndEnabled.size() : 0;
       } else if ("R".equals(num)) {
         stateCount = participantPreferenceList.size();
       } else {
@@ -198,7 +201,7 @@ public class ConstraintBasedAssignment {
                       .equals(State.from(HelixDefinedState.ERROR));
 
           if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
-              && !disabledParticipantsForPartition.contains(participantId)) {
+              && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
             participantStateMap.put(participantId, state);
             count = count + 1;
             assigned[i] = true;
@@ -268,12 +271,13 @@ public class ConstraintBasedAssignment {
    * @param preferenceMap
    * @param currentStateMap
    * @param disabledParticipantsForPartition
+   * @param isEnabled
    * @return
    */
   public static Map<ParticipantId, State> computeCustomizedBestStateForPartition(
       Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
       Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
-      Set<ParticipantId> disabledParticipantsForPartition) {
+      Set<ParticipantId> disabledParticipantsForPartition, boolean isEnabled) {
     Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
 
     // if the resource is deleted, idealStateMap will be null/empty and
@@ -281,12 +285,12 @@ public class ConstraintBasedAssignment {
     if (currentStateMap != null) {
       for (ParticipantId participantId : currentStateMap.keySet()) {
         if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
-            && !disabledParticipantsForPartition.contains(participantId)) {
+            && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
           // if dropped and not disabled, transit to DROPPED
           participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
         } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
             participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipantsForPartition.contains(participantId)) {
+            && (disabledParticipantsForPartition.contains(participantId) || !isEnabled)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
           participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
         }
@@ -304,7 +308,7 @@ public class ConstraintBasedAssignment {
               || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
 
       if (liveParticipantSet.contains(participantId) && notInErrorState
-          && !disabledParticipantsForPartition.contains(participantId)) {
+          && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
         participantStateMap.put(participantId, preferenceMap.get(participantId));
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 2f93b7f..8c34f6b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -120,7 +120,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       partitionMapping.addReplicaMap(partitionId, ConstraintBasedAssignment
           .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
               stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
-              disabledParticipantsForPartition));
+              disabledParticipantsForPartition, true));
     }
     return partitionMapping;
   }
@@ -163,7 +163,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       Set<ParticipantId> participants = participantStateMap.keySet();
       Map<ParticipantId, State> droppedAndDisabledMap =
           ConstraintBasedAssignment.dropAndDisablePartitions(currentStateMap, participants,
-              disabledParticipants, initialState);
+              disabledParticipants, true, initialState);
 
       // don't map error participants
       for (ParticipantId participantId : errorParticipants) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 6dc5541..da0c80c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants;
@@ -130,7 +131,6 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
-    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
     String instanceConfigsPath =
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.PARTICIPANT.toString());
@@ -157,9 +157,6 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public InstanceConfig getInstanceConfig(String clusterName, String instanceName) {
-    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
-
-    // String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
     String instanceConfigPath =
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
@@ -168,7 +165,7 @@ public class ZKHelixAdmin implements HelixAdmin {
           + clusterName);
     }
 
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -182,7 +179,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
 
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
     if (!baseAccessor.exists(path, 0)) {
       throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
           + ", instance config does not exist");
@@ -204,13 +201,36 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
+  public void enableResource(final String clusterName, final String resourceName,
+      final boolean enabled) {
+    String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    if (!baseAccessor.exists(path, 0)) {
+      throw new HelixException("Cluster " + clusterName + ", resource: " + resourceName
+          + ", ideal-state does not exist");
+    }
+    baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName
+              + ", ideal-state is null");
+        }
+        IdealState idealState = new IdealState(currentData);
+        idealState.enable(enabled);
+        return idealState.getRecord();
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  @Override
   public void enablePartition(final boolean enabled, final String clusterName,
       final String instanceName, final String resourceName, final List<String> partitionNames) {
     String path =
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
 
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     // check instanceConfig exists
     if (!baseAccessor.exists(path, 0)) {
@@ -299,7 +319,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void resetPartition(String clusterName, String instanceName, String resourceName,
       List<String> partitionNames) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -404,7 +424,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void resetInstance(String clusterName, List<String> instanceNames) {
     // TODO: not mp-safe
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -430,7 +450,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void resetResource(String clusterName, List<String> resourceNames) {
     // TODO: not mp-safe
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -508,7 +528,6 @@ public class ZKHelixAdmin implements HelixAdmin {
     // IDEAL STATE
     _zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName));
     // CONFIGURATIONS
-    // _zkClient.createPersistent(HelixUtil.getConfigPath(clusterName));
     path =
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.CLUSTER.toString(), clusterName);
@@ -564,7 +583,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     List<String> instances = _zkClient.getChildren(memberInstancesPath);
     List<String> result = new ArrayList<String>();
 
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -662,7 +681,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public IdealState getResourceIdealState(String clusterName, String resourceName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -671,7 +690,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void setResourceIdealState(String clusterName, String resourceName, IdealState idealState) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -680,7 +699,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public ExternalView getResourceExternalView(String clusterName, String resourceName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.externalView(resourceName));
@@ -699,7 +718,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException("State model path " + stateModelPath + " already exists.");
     }
 
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
@@ -707,7 +726,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void dropResource(String clusterName, String resourceName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -722,7 +741,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -805,7 +824,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void dropCluster(String clusterName) {
     logger.info("Deleting cluster " + clusterName);
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -1067,15 +1086,23 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     int size = (int) file.length();
     byte[] bytes = new byte[size];
-    DataInputStream dis = new DataInputStream(new FileInputStream(file));
-    int read = 0;
-    int numRead = 0;
-    while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
-      read = read + numRead;
+    DataInputStream dis = null;
+    try {
+      dis = new DataInputStream(new FileInputStream(file));
+      int read = 0;
+      int numRead = 0;
+      while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
+        read = read + numRead;
+      }
+      return bytes;
+    } finally {
+      if (dis != null) {
+        dis.close();
+      }
     }
-    return bytes;
   }
 
+  @Override
   public void addStateModelDef(String clusterName, String stateModelDefName,
       String stateModelDefFile) throws IOException {
     ZNRecord record =
@@ -1091,7 +1118,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void setConstraint(String clusterName, final ConstraintType constraintType,
       final String constraintId, final ConstraintItem constraintItem) {
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     Builder keyBuilder = new Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1112,7 +1139,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void removeConstraint(String clusterName, final ConstraintType constraintType,
       final String constraintId) {
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     Builder keyBuilder = new Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1210,7 +1237,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException("cluster " + clusterName + " instance " + instanceName
           + " is not setup yet");
     }
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -1238,6 +1265,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
   }
 
+  @Override
   public void close() {
     if (_zkClient != null) {
       _zkClient.close();

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index ed17c5b..dc42a52 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -81,7 +81,8 @@ public class IdealState extends HelixProperty {
     MAX_PARTITIONS_PER_INSTANCE,
     INSTANCE_GROUP_TAG,
     REBALANCER_CLASS_NAME,
-    REBALANCER_CONFIG_NAME
+    REBALANCER_CONFIG_NAME,
+    HELIX_ENABLED
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -821,6 +822,7 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * <<<<<<< HEAD
    * Get the non-Helix simple fields from this property and add them to a UserConfig
    * @param userConfig the user config to update
    */
@@ -950,4 +952,21 @@ public class IdealState extends HelixProperty {
       Map<PartitionId, Map<ParticipantId, State>> participantStateMaps) {
     return ResourceAssignment.stringMapsFromReplicaMaps(participantStateMaps);
   }
+
+  /**
+   * Get if the resource is enabled or not
+   * By default, it's enabled
+   * @return true if enabled; false otherwise
+   */
+  public boolean isEnabled() {
+    return _record.getBooleanField(IdealStateProperty.HELIX_ENABLED.name(), true);
+  }
+
+  /**
+   * Enable/Disable the resource
+   * @param enabled
+   */
+  public void enable(boolean enabled) {
+    _record.setSimpleField(IdealStateProperty.HELIX_ENABLED.name(), Boolean.toString(enabled));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index bddd8d1..2f169cc 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -51,7 +51,7 @@ import org.apache.log4j.Logger;
  *  .invoke(_callback)
  *  .on(ChangeType.LIVE_INSTANCE, ChangeType.IdealState)
  *  .usingLeaderStandbyModel("someUniqueId")
- *  .run()
+ *  .start()
  * </code>
  */
 public class HelixCustomCodeRunner {
@@ -106,6 +106,15 @@ public class HelixCustomCodeRunner {
   }
 
   /**
+   * Get resource name for the custom-code runner
+   * Used for retrieving the external view for the custom-code runner resource
+   * @return resource name for the custom-code runner
+   */
+  public String getResourceName() {
+    return _resourceName;
+  }
+
+  /**
    * This method will be invoked when there is a change in any subscribed
    * notificationTypes
    * @throws Exception
@@ -127,8 +136,9 @@ public class HelixCustomCodeRunner {
       // manually add ideal state for participant leader using LeaderStandby
       // model
 
-      zkClient = new ZkClient(_zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
-      zkClient.setZkSerializer(new ZNRecordSerializer());
+      zkClient =
+          new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+              ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
       HelixDataAccessor accessor =
           new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor<ZNRecord>(
               zkClient));

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index d97a853..4479f97 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -98,6 +98,8 @@ public class ClusterSetup {
   public static final String addInstanceTag = "addInstanceTag";
   public static final String removeInstanceTag = "removeInstanceTag";
 
+  public static final String enableResource = "enableResource";
+
   // Query info (TBD in V2)
   public static final String listClusterInfo = "listClusterInfo";
   public static final String listInstanceInfo = "listInstanceInfo";
@@ -745,6 +747,11 @@ public class ClusterSetup {
     dropResourceOption.setRequired(false);
     dropResourceOption.setArgName("clusterName resourceName");
 
+    Option enableResourceOption =
+        OptionBuilder.withLongOpt(enableResource).withDescription("Enable/disable a resource")
+            .hasArgs(3).isRequired(false).withArgName("clusterName resourceName true/false")
+            .create();
+
     Option rebalanceOption =
         OptionBuilder.withLongOpt(rebalance).withDescription("Rebalance a resource in a cluster")
             .create();
@@ -795,11 +802,11 @@ public class ClusterSetup {
     partitionInfoOption.setArgName("clusterName resourceName partitionName");
 
     Option enableInstanceOption =
-        OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable a Instance")
+        OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable an instance")
             .create();
     enableInstanceOption.setArgs(3);
     enableInstanceOption.setRequired(false);
-    enableInstanceOption.setArgName("clusterName InstanceName true/false");
+    enableInstanceOption.setArgName("clusterName instanceName true/false");
 
     Option enablePartitionOption =
         OptionBuilder.hasArgs().withLongOpt(enablePartition)
@@ -955,6 +962,7 @@ public class ClusterSetup {
     group.addOption(dropInstanceOption);
     group.addOption(swapInstanceOption);
     group.addOption(dropResourceOption);
+    group.addOption(enableResourceOption);
     group.addOption(instanceInfoOption);
     group.addOption(clusterInfoOption);
     group.addOption(resourceInfoOption);
@@ -1258,6 +1266,11 @@ public class ClusterSetup {
 
       setupTool.getClusterManagementTool().enableInstance(clusterName, instanceName, enabled);
       return 0;
+    } else if (cmd.hasOption(enableResource)) {
+      String clusterName = cmd.getOptionValues(enableResource)[0];
+      String resourceName = cmd.getOptionValues(enableResource)[1];
+      boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableResource)[2].toLowerCase());
+      setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
     } else if (cmd.hasOption(enablePartition)) {
       String[] args = cmd.getOptionValues(enablePartition);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 7c74035..1322b40 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -239,7 +239,7 @@ public class TestAutoRebalanceStrategy {
         Map<ParticipantId, State> assignment =
             ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
                 liveParticipantSet, _stateModelDef, preferenceList, currentStateMap,
-                disabledParticipantsForPartition);
+                disabledParticipantsForPartition, true);
         mapResult.put(partition, IdealState.stringMapFromParticipantStateMap(assignment));
       }
       return mapResult;

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index 2c26d5d..c8ec90a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -265,7 +265,7 @@ public class TestNewAutoRebalanceStrategy {
         Map<ParticipantId, State> assignment =
             ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
                 liveParticipantMap.keySet(), _stateModelDef, participantPreferenceList, replicaMap,
-                disabledParticipantsForPartition);
+                disabledParticipantsForPartition, true);
         mapResult.put(partitionId, assignment);
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
new file mode 100644
index 0000000..3223e48
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
@@ -0,0 +1,252 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.participant.CustomCodeCallbackHandler;
+import org.apache.helix.participant.HelixCustomCodeRunner;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
+
+  private static final int N = 2;
+  private static final int PARTITION_NUM = 1;
+
+  class DummyCallback implements CustomCodeCallbackHandler {
+    private final Map<NotificationContext.Type, Boolean> _callbackInvokeMap =
+        new HashMap<NotificationContext.Type, Boolean>();
+
+    @Override
+    public void onCallback(NotificationContext context) {
+      NotificationContext.Type type = context.getType();
+      _callbackInvokeMap.put(type, Boolean.TRUE);
+    }
+
+    public void reset() {
+      _callbackInvokeMap.clear();
+    }
+
+    public boolean isInitTypeInvoked() {
+      return _callbackInvokeMap.containsKey(NotificationContext.Type.INIT);
+    }
+
+    public boolean isCallbackTypeInvoked() {
+      return _callbackInvokeMap.containsKey(NotificationContext.Type.CALLBACK);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    Map<String, MockParticipantManager> participants =
+        new HashMap<String, MockParticipantManager>();
+    Map<String, HelixCustomCodeRunner> customCodeRunners =
+        new HashMap<String, HelixCustomCodeRunner>();
+    Map<String, DummyCallback> callbacks =
+        new HashMap<String, DummyCallback>();
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants
+          .put(instanceName, new MockParticipantManager(ZK_ADDR, clusterName, instanceName));
+
+      customCodeRunners.put(instanceName, new HelixCustomCodeRunner(participants.get(instanceName),
+          ZK_ADDR));
+      callbacks.put(instanceName, new DummyCallback());
+
+      customCodeRunners.get(instanceName).invoke(callbacks.get(instanceName))
+          .on(ChangeType.LIVE_INSTANCE)
+          .usingLeaderStandbyModel("TestParticLeader").start();
+      participants.get(instanceName).syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // Make sure callback is registered
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    final String customCodeRunnerResource =
+        customCodeRunners.get("localhost_12918").getResourceName();
+    ExternalView extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+    Map<String, String> instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+    String leader = null;
+    for (String instance : instanceStates.keySet()) {
+      String state = instanceStates.get(instance);
+      if ("LEADER".equals(state)) {
+        leader = instance;
+        break;
+      }
+    }
+    Assert.assertNotNull(leader);
+    for (String instance : callbacks.keySet()) {
+      DummyCallback callback = callbacks.get(instance);
+      if (instance.equals(leader)) {
+        Assert.assertTrue(callback.isInitTypeInvoked());
+      } else {
+        Assert.assertFalse(callback.isInitTypeInvoked());
+      }
+      callback.reset();
+    }
+
+    // Disable custom-code runner resource
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.enableResource(clusterName, customCodeRunnerResource, false);
+
+    // Verify that states of custom-code runner are all OFFLINE
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+        ExternalView extView =
+            accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+        if (extView == null) {
+          return false;
+        }
+        Set<String> partitionSet = extView.getPartitionSet();
+        if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+          return false;
+        }
+        for (String partition : partitionSet) {
+          Map<String, String> instanceStates = extView.getStateMap(partition);
+          for (String state : instanceStates.values()) {
+            if (!"OFFLINE".equals(state)) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result);
+
+    // Change live-instance should not invoke any custom-code runner
+    LiveInstance fakeInstance = new LiveInstance("fakeInstance");
+    fakeInstance.setSessionId("fakeSessionId");
+    fakeInstance.setHelixVersion("0.6");
+    accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+    Thread.sleep(1000);
+
+    for (DummyCallback callback : callbacks.values()) {
+      Assert.assertFalse(callback.isInitTypeInvoked());
+      Assert.assertFalse(callback.isCallbackTypeInvoked());
+    }
+
+    // Remove fake instance
+    accessor.removeProperty(keyBuilder.liveInstance("fakeInstance"));
+
+    // Re-enable custom-code runner
+    admin.enableResource(clusterName, customCodeRunnerResource, true);
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Verify that custom-invoke is invoked again
+    extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+    instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+    leader = null;
+    for (String instance : instanceStates.keySet()) {
+      String state = instanceStates.get(instance);
+      if ("LEADER".equals(state)) {
+        leader = instance;
+        break;
+      }
+    }
+    Assert.assertNotNull(leader);
+    for (String instance : callbacks.keySet()) {
+      DummyCallback callback = callbacks.get(instance);
+      if (instance.equals(leader)) {
+        Assert.assertTrue(callback.isInitTypeInvoked());
+      } else {
+        Assert.assertFalse(callback.isInitTypeInvoked());
+      }
+      callback.reset();
+    }
+
+    // Add a fake instance should invoke custom-code runner
+    accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+    Thread.sleep(1000);
+    for (String instance : callbacks.keySet()) {
+      DummyCallback callback = callbacks.get(instance);
+      if (instance.equals(leader)) {
+        Assert.assertTrue(callback.isCallbackTypeInvoked());
+      } else {
+        Assert.assertFalse(callback.isCallbackTypeInvoked());
+      }
+    }
+
+    // Clean up
+    controller.syncStop();
+    for (MockParticipantManager participant : participants.values()) {
+      participant.syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
new file mode 100644
index 0000000..ea8974d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
@@ -0,0 +1,267 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends ZkUnitTestBase {
+  private static final int N = 2;
+  private static final int PARTITION_NUM = 1;
+
+  @Test
+  public void testDisableResourceInSemiAutoMode() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager participants[] = new MockParticipantManager[N];
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Disable TestDB0
+    enableResource(clusterName, false);
+    checkExternalView(clusterName);
+
+    // Re-enable TestDB0
+    enableResource(clusterName, true);
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // Clean up
+    controller.syncStop();
+    for (int i = 0; i < N; i++) {
+      participants[i].syncStop();
+    }
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testDisableResourceInFullAutoMode() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", RebalanceMode.FULL_AUTO, true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager participants[] = new MockParticipantManager[N];
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // disable TestDB0
+    enableResource(clusterName, false);
+    checkExternalView(clusterName);
+
+    // Re-enable TestDB0
+    enableResource(clusterName, true);
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // Clean up
+    controller.syncStop();
+    for (int i = 0; i < N; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testDisableResourceInCustomMode() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+    // set up custom ideal-state
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setPartitionState("TestDB0_0", "localhost_12918", "SLAVE");
+    idealState.setPartitionState("TestDB0_0", "localhost_12919", "SLAVE");
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager participants[] = new MockParticipantManager[N];
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Disable TestDB0
+    enableResource(clusterName, false);
+    checkExternalView(clusterName);
+
+    // Re-enable TestDB0
+    enableResource(clusterName, true);
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // Clean up
+    controller.syncStop();
+    for (int i = 0; i < N; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  private void enableResource(String clusterName, boolean enabled) {
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.enableResource(clusterName, "TestDB0", enabled);
+  }
+
+  /**
+   * Check all partitions are in OFFLINE state
+   * @param accessor
+   * @throws Exception
+   */
+  private void checkExternalView(String clusterName) throws Exception {
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+
+    // verify that states of TestDB0 are all OFFLINE
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+        ExternalView extView = accessor.getProperty(keyBuilder.externalView("TestDB0"));
+        if (extView == null) {
+          return false;
+        }
+        Set<String> partitionSet = extView.getPartitionSet();
+        if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+          return false;
+        }
+        for (String partition : partitionSet) {
+          Map<String, String> instanceStates = extView.getStateMap(partition);
+          for (String state : instanceStates.values()) {
+            if (!"OFFLINE".equals(state)) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 89a947f..d3925be 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
@@ -41,6 +43,7 @@ import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -62,7 +65,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
       _gZkClient.deleteRecursive(rootPath);
     }
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
     tool.addCluster(clusterName, true);
@@ -205,7 +208,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
 
@@ -241,7 +244,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
 
@@ -294,4 +297,29 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
+
+  @Test
+  public void testDisableResource() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName, true);
+    Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
+    String resourceName = "TestDB";
+    admin.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(
+        StateModelConfigGenerator.generateConfigForMasterSlave()));
+    admin.addResource(clusterName, resourceName, 4, "MasterSlave");
+    admin.enableResource(clusterName, resourceName, false);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+    Assert.assertFalse(idealState.isEnabled());
+    admin.enableResource(clusterName, resourceName, true);
+    idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+    Assert.assertTrue(idealState.isEnabled());
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
index d3d6736..a528a20 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -23,7 +23,10 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Date;
 
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
@@ -36,8 +39,8 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
-import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
@@ -46,8 +49,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class TestClusterSetup extends ZkUnitTestBase {
-  private static Logger LOG = Logger.getLogger(TestClusterSetup.class);
-
   protected static final String CLUSTER_NAME = "TestClusterSetup";
   protected static final String TEST_DB = "TestDB";
   protected static final String INSTANCE_PREFIX = "instance_";
@@ -437,4 +438,36 @@ public class TestClusterSetup extends ZkUnitTestBase {
 
   }
 
+  @Test
+  public void testDisableResource() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+    // disable "TestDB0" resource
+    ClusterSetup.processCommandLineArgs(new String[] {
+        "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "false"
+    });
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertFalse(idealState.isEnabled());
+    // enable "TestDB0" resource
+    ClusterSetup.processCommandLineArgs(new String[] {
+        "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "true"
+    });
+    idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertTrue(idealState.isEnabled());
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
 }


Mime
View raw message