helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-94] Add the ability to enable and disable a resource
Date Wed, 16 Apr 2014 18:38:07 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.2-release dea0f5be4 -> 731e32698


[HELIX-94] Add the ability to enable and disable a resource


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/731e3269
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/731e3269
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/731e3269

Branch: refs/heads/helix-0.6.2-release
Commit: 731e3269897f4382484b8739d535ec115d095afd
Parents: dea0f5b
Author: zzhang <zzhang5@uci.edu>
Authored: Wed Apr 16 11:32:29 2014 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Wed Apr 16 11:32:29 2014 -0700

----------------------------------------------------------------------
 .../helix/webapp/resources/JsonParameters.java  |   4 +
 .../webapp/resources/ResourceGroupResource.java |   9 +
 .../helix/webapp/TestDisableResource.java       |  84 ++++++
 .../main/java/org/apache/helix/HelixAdmin.java  |   7 +
 .../controller/rebalancer/AutoRebalancer.java   |   3 +-
 .../controller/rebalancer/CustomRebalancer.java |  15 +-
 .../rebalancer/SemiAutoRebalancer.java          |   3 +-
 .../util/ConstraintBasedAssignment.java         |  19 +-
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  93 +++++--
 .../java/org/apache/helix/model/IdealState.java |  20 +-
 .../participant/HelixCustomCodeRunner.java      |  24 +-
 .../org/apache/helix/tools/ClusterSetup.java    |  18 +-
 .../strategy/TestAutoRebalanceStrategy.java     |  24 +-
 .../TestDisableCustomCodeRunner.java            | 252 +++++++++++++++++
 .../helix/integration/TestDisableResource.java  | 268 +++++++++++++++++++
 .../helix/manager/zk/TestZkHelixAdmin.java      |  38 ++-
 .../mock/participant/MockMSModelFactory.java    |   2 -
 .../apache/helix/tools/TestClusterSetup.java    |  40 +++
 18 files changed, 852 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index c8e8fc6..6b33ec4 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -165,6 +165,10 @@ public class JsonParameters {
       if (!_parameterMap.containsKey(ENABLED)) {
         throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
       }
+    } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+      if (!_parameterMap.containsKey(ENABLED)) {
+        throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
+      }
     } else if (command.equalsIgnoreCase(ClusterSetup.enablePartition)) {
       if (!_parameterMap.containsKey(ENABLED)) {
         throw new HelixException("Missing Json parameters: '" + ENABLED + "'");

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
index 365b212..ba3584a 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
@@ -136,6 +136,15 @@ public class ResourceGroupResource extends Resource {
         ClusterSetup setupTool = new ClusterSetup(zkClient);
         setupTool.getClusterManagementTool()
             .resetResource(clusterName, Arrays.asList(resourceName));
+      } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+        jsonParameters.verifyCommand(ClusterSetup.enableResource);
+
+        boolean enabled = Boolean.parseBoolean(jsonParameters.getParameter(JsonParameters.ENABLED));
+
+        ZkClient zkClient =
+            (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+        ClusterSetup setupTool = new ClusterSetup(zkClient);
+        setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
       } else {
         throw new HelixException("Unsupported command: " + command + ". Should be one of ["
             + ClusterSetup.resetResource + "]");

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
new file mode 100644
index 0000000..9800179
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
@@ -0,0 +1,84 @@
+package org.apache.helix.webapp;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends AdminTestBase {
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        n, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    String instanceUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+            + "TestDB0";
+
+    // Disable TestDB0
+    Map<String, String> paramMap = new HashMap<String, String>();
+    paramMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableResource);
+    paramMap.put(JsonParameters.ENABLED, Boolean.toString(false));
+    TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertFalse(idealState.isEnabled());
+
+    // Re-enable TestDB0
+    paramMap.put(JsonParameters.ENABLED, Boolean.toString(true));
+    TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+    idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertTrue(idealState.isEnabled());
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index d5c62fa..f38123e 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -170,6 +170,13 @@ public interface HelixAdmin {
   void enableInstance(String clusterName, String instanceName, boolean enabled);
 
   /**
+   * Disable or enable a resource
+   * @param clusterName
+   * @param resourceName
+   */
+  void enableResource(String clusterName, String resourceName, boolean enabled);
+
+  /**
    * Disable or enable a list of partitions on an instance
    * @param enabled
    * @param clusterName

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 745a9c9..5d43989 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -236,7 +236,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
           ConstraintBasedAssignment.getPreferenceList(cache, partition, idealState, stateModelDef);
       Map<String, String> bestStateForPartition =
           ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, stateModelDef,
-              preferenceList, currentStateMap, disabledInstancesForPartition);
+              preferenceList, currentStateMap, disabledInstancesForPartition,
+              idealState.isEnabled());
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 69037d9..6e9471b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -77,7 +77,7 @@ public class CustomRebalancer implements Rebalancer, MappingCalculator {
           idealState.getInstanceStateMap(partition.getPartitionName());
       Map<String, String> bestStateForPartition =
           computeCustomizedBestStateForPartition(cache, stateModelDef, idealStateMap,
-              currentStateMap, disabledInstancesForPartition);
+              currentStateMap, disabledInstancesForPartition, idealState.isEnabled());
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
@@ -90,11 +90,13 @@ public class CustomRebalancer implements Rebalancer, MappingCalculator {
    * @param idealStateMap
    * @param currentStateMap
    * @param disabledInstancesForPartition
+   * @param isResourceEnabled
    * @return
    */
   private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
       StateModelDefinition stateModelDef, Map<String, String> idealStateMap,
-      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
+      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition,
+      boolean isResourceEnabled) {
     Map<String, String> instanceStateMap = new HashMap<String, String>();
 
     // if the ideal state is deleted, idealStateMap will be null/empty and
@@ -102,12 +104,12 @@ public class CustomRebalancer implements Rebalancer, MappingCalculator {
     if (currentStateMap != null) {
       for (String instance : currentStateMap.keySet()) {
         if ((idealStateMap == null || !idealStateMap.containsKey(instance))
-            && !disabledInstancesForPartition.contains(instance)) {
+            && !disabledInstancesForPartition.contains(instance) && isResourceEnabled) {
           // if dropped and not disabled, transit to DROPPED
           instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
         } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
             HelixDefinedState.ERROR.toString()))
-            && disabledInstancesForPartition.contains(instance)) {
+            && (!isResourceEnabled || disabledInstancesForPartition.contains(instance))) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
           instanceStateMap.put(instance, stateModelDef.getInitialState());
         }
@@ -125,8 +127,9 @@ public class CustomRebalancer implements Rebalancer, MappingCalculator {
           currentStateMap == null || currentStateMap.get(instance) == null
               || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
 
-      if (liveInstancesMap.containsKey(instance) && notInErrorState
-          && !disabledInstancesForPartition.contains(instance)) {
+      boolean enabled = !disabledInstancesForPartition.contains(instance) && isResourceEnabled;
+
+      if (liveInstancesMap.containsKey(instance) && notInErrorState && enabled) {
         instanceStateMap.put(instance, idealStateMap.get(instance));
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index 420e7ab..685357f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -76,7 +76,8 @@ public class SemiAutoRebalancer implements Rebalancer, MappingCalculator {
           ConstraintBasedAssignment.getPreferenceList(cache, partition, idealState, stateModelDef);
       Map<String, String> bestStateForPartition =
           ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, stateModelDef,
-              preferenceList, currentStateMap, disabledInstancesForPartition);
+              preferenceList, currentStateMap, disabledInstancesForPartition,
+              idealState.isEnabled());
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 3fd52f4..1891c6a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -43,9 +43,9 @@ import org.apache.log4j.Logger;
 public class ConstraintBasedAssignment {
   private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
 
-  public static List<String> getPreferenceList(ClusterDataCache cache, Partition resource,
+  public static List<String> getPreferenceList(ClusterDataCache cache, Partition partition,
       IdealState idealState, StateModelDefinition stateModelDef) {
-    List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+    List<String> listField = idealState.getPreferenceList(partition.getPartitionName());
 
     if (listField != null && listField.size() == 1
         && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) {
@@ -66,11 +66,13 @@ public class ConstraintBasedAssignment {
    * @param currentStateMap
    *          : instance->state for each partition
    * @param disabledInstancesForPartition
+   * @param isResourceEnabled
    * @return
    */
   public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
       StateModelDefinition stateModelDef, List<String> instancePreferenceList,
-      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
+      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition,
+      boolean isResourceEnabled) {
     Map<String, String> instanceStateMap = new HashMap<String, String>();
 
     // if the ideal state is deleted, instancePreferenceList will be empty and
@@ -78,12 +80,12 @@ public class ConstraintBasedAssignment {
     if (currentStateMap != null) {
       for (String instance : currentStateMap.keySet()) {
         if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
-            && !disabledInstancesForPartition.contains(instance)) {
+            && !disabledInstancesForPartition.contains(instance) && isResourceEnabled) {
           // if dropped and not disabled, transit to DROPPED
           instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
         } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
             HelixDefinedState.ERROR.toString()))
-            && disabledInstancesForPartition.contains(instance)) {
+            && (disabledInstancesForPartition.contains(instance) || !isResourceEnabled)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
           instanceStateMap.put(instance, stateModelDef.getInitialState());
         }
@@ -106,7 +108,7 @@ public class ConstraintBasedAssignment {
       if ("N".equals(num)) {
         Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
         liveAndEnabled.removeAll(disabledInstancesForPartition);
-        stateCount = liveAndEnabled.size();
+        stateCount = isResourceEnabled ? liveAndEnabled.size() : 0;
       } else if ("R".equals(num)) {
         stateCount = instancePreferenceList.size();
       } else {
@@ -125,8 +127,11 @@ public class ConstraintBasedAssignment {
               currentStateMap == null || currentStateMap.get(instanceName) == null
                   || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
 
+          boolean enabled =
+              !disabledInstancesForPartition.contains(instanceName) && isResourceEnabled;
+
           if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState
-              && !disabledInstancesForPartition.contains(instanceName)) {
+              && enabled) {
             instanceStateMap.put(instanceName, state);
             count = count + 1;
             assigned[i] = true;

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 169b2bb..4e9f642 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -33,13 +33,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants;
@@ -162,7 +162,7 @@ public class ZKHelixAdmin implements HelixAdmin {
           + clusterName);
     }
 
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -176,7 +176,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
 
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
     if (!baseAccessor.exists(path, 0)) {
       throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
           + ", instance config does not exist");
@@ -198,13 +198,39 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
+  public void enableResource(final String clusterName, final String resourceName,
+      final boolean enabled) {
+    String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
+
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    if (!baseAccessor.exists(path, 0)) {
+      throw new HelixException("Cluster " + clusterName + ", resource: " + resourceName
+          + ", ideal-state does not exist");
+    }
+
+    baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName
+              + ", ideal-state is null");
+        }
+
+        IdealState idealState = new IdealState(currentData);
+        idealState.enable(enabled);
+        return idealState.getRecord();
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  @Override
   public void enablePartition(final boolean enabled, final String clusterName,
       final String instanceName, final String resourceName, final List<String> partitionNames) {
     String path =
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
 
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     // check instanceConfig exists
     if (!baseAccessor.exists(path, 0)) {
@@ -295,7 +321,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void resetPartition(String clusterName, String instanceName, String resourceName,
       List<String> partitionNames) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -398,7 +424,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void resetInstance(String clusterName, List<String> instanceNames) {
     // TODO: not mp-safe
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -424,7 +450,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void resetResource(String clusterName, List<String> resourceNames) {
     // TODO: not mp-safe
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -655,7 +681,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public IdealState getResourceIdealState(String clusterName, String resourceName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -664,7 +690,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void setResourceIdealState(String clusterName, String resourceName, IdealState idealState) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -673,7 +699,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public ExternalView getResourceExternalView(String clusterName, String resourceName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.externalView(resourceName));
@@ -692,7 +718,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException("State model path " + stateModelPath + " already exists.");
     }
 
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
@@ -700,7 +726,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void dropResource(String clusterName, String resourceName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -715,7 +741,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -730,7 +756,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     String persistentStatsPath =
         PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
 
@@ -761,7 +787,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException("cluster " + clusterName + " is not setup yet");
     }
 
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
 
@@ -798,7 +824,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void dropCluster(String clusterName) {
     logger.info("Deleting cluster " + clusterName);
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -823,7 +849,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     String persistentStatsPath =
         PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
 
@@ -857,7 +883,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
 
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     if (!baseAccessor.exists(alertsPath, 0)) {
       throw new HelixException("No alerts node in ZK, nothing to drop");
@@ -906,7 +932,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       idealState.setPartitionState(clusterName, controllers.get(i), "STANDBY");
     }
 
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -1059,15 +1085,23 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     int size = (int) file.length();
     byte[] bytes = new byte[size];
-    DataInputStream dis = new DataInputStream(new FileInputStream(file));
-    int read = 0;
-    int numRead = 0;
-    while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
-      read = read + numRead;
+    DataInputStream dis = null;
+    try {
+      dis = new DataInputStream(new FileInputStream(file));
+      int read = 0;
+      int numRead = 0;
+      while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
+        read = read + numRead;
+      }
+      return bytes;
+    } finally {
+      if (dis != null) {
+        dis.close();
+      }
     }
-    return bytes;
   }
 
+  @Override
   public void addStateModelDef(String clusterName, String stateModelDefName,
       String stateModelDefFile) throws IOException {
     ZNRecord record =
@@ -1083,7 +1117,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void setConstraint(String clusterName, final ConstraintType constraintType,
       final String constraintId, final ConstraintItem constraintItem) {
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     Builder keyBuilder = new Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1104,7 +1138,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void removeConstraint(String clusterName, final ConstraintType constraintType,
       final String constraintId) {
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     Builder keyBuilder = new Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1202,7 +1236,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException("cluster " + clusterName + " instance " + instanceName
           + " is not setup yet");
     }
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -1221,7 +1255,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException("cluster " + clusterName + " instance " + instanceName
           + " is not setup yet");
     }
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -1230,6 +1264,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
   }
 
+  @Override
   public void close() {
     if (_zkClient != null) {
       _zkClient.close();

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 7a4fcad..ded9a8f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -50,7 +50,8 @@ public class IdealState extends HelixProperty {
     REBALANCE_TIMER_PERIOD,
     MAX_PARTITIONS_PER_INSTANCE,
     INSTANCE_GROUP_TAG,
-    REBALANCER_CLASS_NAME
+    REBALANCER_CLASS_NAME,
+    HELIX_ENABLED
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -505,4 +506,21 @@ public class IdealState extends HelixProperty {
     }
     return rebalanceMode;
   }
+
+  /**
+   * Get if the resource is enabled or not
+   * By default, it's enabled
+   * @return true if enabled; false otherwise
+   */
+  public boolean isEnabled() {
+    return _record.getBooleanField(IdealStateProperty.HELIX_ENABLED.name(), true);
+  }
+
+  /**
+   * Enable/Disable the resource
+   * @param enabled
+   */
+  public void enable(boolean enabled) {
+    _record.setSimpleField(IdealStateProperty.HELIX_ENABLED.name(), Boolean.toString(enabled));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index 6a2490a..6fea5ff 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -28,6 +28,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -48,7 +49,7 @@ import org.apache.log4j.Logger;
  *  .invoke(_callback)
  *  .on(ChangeType.LIVE_INSTANCE, ChangeType.IdealState)
  *  .usingLeaderStandbyModel("someUniqueId")
- *  .run()
+ *  .start()
  * </code>
  */
 public class HelixCustomCodeRunner {
@@ -103,6 +104,15 @@ public class HelixCustomCodeRunner {
   }
 
   /**
+   * Get resource name for the custom-code runner
+   * Used for retrieving the external view for the custom-code runner resource
+   * @return resource name for the custom-code runner
+   */
+  public String getResourceName() {
+    return _resourceName;
+  }
+
+  /**
    * This method will be invoked when there is a change in any subscribed
    * notificationTypes
    * @throws Exception
@@ -121,13 +131,13 @@ public class HelixCustomCodeRunner {
     stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, _resourceName);
     ZkClient zkClient = null;
     try {
-      // manually add ideal state for participant leader using LeaderStandby
-      // model
-
-      zkClient = new ZkClient(_zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
-      zkClient.setZkSerializer(new ZNRecordSerializer());
+      // manually add ideal state for participant leader using LeaderStandby model
+      zkClient =
+          new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+              ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
       HelixDataAccessor accessor =
-          new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor(zkClient));
+          new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor<ZNRecord>(
+              zkClient));
       Builder keyBuilder = accessor.keyBuilder();
 
       IdealState idealState = new IdealState(_resourceName);

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index a39e571..59b10ae 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -98,6 +98,8 @@ public class ClusterSetup {
   public static final String addInstanceTag = "addInstanceTag";
   public static final String removeInstanceTag = "removeInstanceTag";
 
+  public static final String enableResource = "enableResource";
+
   // Query info (TBD in V2)
   public static final String listClusterInfo = "listClusterInfo";
   public static final String listInstanceInfo = "listInstanceInfo";
@@ -741,6 +743,11 @@ public class ClusterSetup {
     dropResourceOption.setRequired(false);
     dropResourceOption.setArgName("clusterName resourceName");
 
+    Option enableResourceOption =
+        OptionBuilder.withLongOpt(enableResource).withDescription("Enable/disable a resource")
+            .hasArgs(3).isRequired(false).withArgName("clusterName resourceName true/false")
+            .create();
+
     Option rebalanceOption =
         OptionBuilder.withLongOpt(rebalance).withDescription("Rebalance a resource in a cluster")
             .create();
@@ -791,11 +798,11 @@ public class ClusterSetup {
     partitionInfoOption.setArgName("clusterName resourceName partitionName");
 
     Option enableInstanceOption =
-        OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable a Instance")
+        OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable an instance")
             .create();
     enableInstanceOption.setArgs(3);
     enableInstanceOption.setRequired(false);
-    enableInstanceOption.setArgName("clusterName InstanceName true/false");
+    enableInstanceOption.setArgName("clusterName instanceName true/false");
 
     Option enablePartitionOption =
         OptionBuilder.hasArgs().withLongOpt(enablePartition)
@@ -951,6 +958,7 @@ public class ClusterSetup {
     group.addOption(dropInstanceOption);
     group.addOption(swapInstanceOption);
     group.addOption(dropResourceOption);
+    group.addOption(enableResourceOption);
     group.addOption(instanceInfoOption);
     group.addOption(clusterInfoOption);
     group.addOption(resourceInfoOption);
@@ -1254,6 +1262,12 @@ public class ClusterSetup {
 
       setupTool.getClusterManagementTool().enableInstance(clusterName, instanceName, enabled);
       return 0;
+    } else if (cmd.hasOption(enableResource)) {
+      String clusterName = cmd.getOptionValues(enableResource)[0];
+      String resourceName = cmd.getOptionValues(enableResource)[1];
+      boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableResource)[2].toLowerCase());
+
+      setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
     } else if (cmd.hasOption(enablePartition)) {
       String[] args = cmd.getOptionValues(enablePartition);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index c93c51e..728e3ef 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -147,18 +147,18 @@ public class TestAutoRebalanceStrategy {
     private static final double P_RESURRECT = 0.45;
     private static final String RESOURCE_NAME = "resource";
 
-    private List<String> _partitions;
-    private LinkedHashMap<String, Integer> _states;
-    private List<String> _liveNodes;
-    private Set<String> _liveSet;
-    private Set<String> _removedSet;
-    private Set<String> _nonLiveSet;
+    private final List<String> _partitions;
+    private final LinkedHashMap<String, Integer> _states;
+    private final List<String> _liveNodes;
+    private final Set<String> _liveSet;
+    private final Set<String> _removedSet;
+    private final Set<String> _nonLiveSet;
     private Map<String, Map<String, String>> _currentMapping;
-    private List<String> _allNodes;
-    private int _maxPerNode;
-    private StateModelDefinition _stateModelDef;
-    private ReplicaPlacementScheme _placementScheme;
-    private Random _random;
+    private final List<String> _allNodes;
+    private final int _maxPerNode;
+    private final StateModelDefinition _stateModelDef;
+    private final ReplicaPlacementScheme _placementScheme;
+    private final Random _random;
 
     public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
         List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
@@ -230,7 +230,7 @@ public class TestAutoRebalanceStrategy {
         Set<String> disabled = Collections.emptySet();
         Map<String, String> assignment =
             ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef,
-                preferenceList, currentStateMap, disabled);
+                preferenceList, currentStateMap, disabled, true);
         mapResult.put(partition, assignment);
       }
       return mapResult;

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
new file mode 100644
index 0000000..3223e48
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
@@ -0,0 +1,252 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.participant.CustomCodeCallbackHandler;
+import org.apache.helix.participant.HelixCustomCodeRunner;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
+
+  private static final int N = 2;
+  private static final int PARTITION_NUM = 1;
+
+  class DummyCallback implements CustomCodeCallbackHandler {
+    private final Map<NotificationContext.Type, Boolean> _callbackInvokeMap =
+        new HashMap<NotificationContext.Type, Boolean>();
+
+    @Override
+    public void onCallback(NotificationContext context) {
+      NotificationContext.Type type = context.getType();
+      _callbackInvokeMap.put(type, Boolean.TRUE);
+    }
+
+    public void reset() {
+      _callbackInvokeMap.clear();
+    }
+
+    public boolean isInitTypeInvoked() {
+      return _callbackInvokeMap.containsKey(NotificationContext.Type.INIT);
+    }
+
+    public boolean isCallbackTypeInvoked() {
+      return _callbackInvokeMap.containsKey(NotificationContext.Type.CALLBACK);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    Map<String, MockParticipantManager> participants =
+        new HashMap<String, MockParticipantManager>();
+    Map<String, HelixCustomCodeRunner> customCodeRunners =
+        new HashMap<String, HelixCustomCodeRunner>();
+    Map<String, DummyCallback> callbacks =
+        new HashMap<String, DummyCallback>();
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants
+          .put(instanceName, new MockParticipantManager(ZK_ADDR, clusterName, instanceName));
+
+      customCodeRunners.put(instanceName, new HelixCustomCodeRunner(participants.get(instanceName),
+          ZK_ADDR));
+      callbacks.put(instanceName, new DummyCallback());
+
+      customCodeRunners.get(instanceName).invoke(callbacks.get(instanceName))
+          .on(ChangeType.LIVE_INSTANCE)
+          .usingLeaderStandbyModel("TestParticLeader").start();
+      participants.get(instanceName).syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // Make sure callback is registered
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    final String customCodeRunnerResource =
+        customCodeRunners.get("localhost_12918").getResourceName();
+    ExternalView extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+    Map<String, String> instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+    String leader = null;
+    for (String instance : instanceStates.keySet()) {
+      String state = instanceStates.get(instance);
+      if ("LEADER".equals(state)) {
+        leader = instance;
+        break;
+      }
+    }
+    Assert.assertNotNull(leader);
+    for (String instance : callbacks.keySet()) {
+      DummyCallback callback = callbacks.get(instance);
+      if (instance.equals(leader)) {
+        Assert.assertTrue(callback.isInitTypeInvoked());
+      } else {
+        Assert.assertFalse(callback.isInitTypeInvoked());
+      }
+      callback.reset();
+    }
+
+    // Disable custom-code runner resource
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.enableResource(clusterName, customCodeRunnerResource, false);
+
+    // Verify that states of custom-code runner are all OFFLINE
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+        ExternalView extView =
+            accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+        if (extView == null) {
+          return false;
+        }
+        Set<String> partitionSet = extView.getPartitionSet();
+        if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+          return false;
+        }
+        for (String partition : partitionSet) {
+          Map<String, String> instanceStates = extView.getStateMap(partition);
+          for (String state : instanceStates.values()) {
+            if (!"OFFLINE".equals(state)) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result);
+
+    // Change live-instance should not invoke any custom-code runner
+    LiveInstance fakeInstance = new LiveInstance("fakeInstance");
+    fakeInstance.setSessionId("fakeSessionId");
+    fakeInstance.setHelixVersion("0.6");
+    accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+    Thread.sleep(1000);
+
+    for (DummyCallback callback : callbacks.values()) {
+      Assert.assertFalse(callback.isInitTypeInvoked());
+      Assert.assertFalse(callback.isCallbackTypeInvoked());
+    }
+
+    // Remove fake instance
+    accessor.removeProperty(keyBuilder.liveInstance("fakeInstance"));
+
+    // Re-enable custom-code runner
+    admin.enableResource(clusterName, customCodeRunnerResource, true);
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Verify that custom-invoke is invoked again
+    extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+    instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+    leader = null;
+    for (String instance : instanceStates.keySet()) {
+      String state = instanceStates.get(instance);
+      if ("LEADER".equals(state)) {
+        leader = instance;
+        break;
+      }
+    }
+    Assert.assertNotNull(leader);
+    for (String instance : callbacks.keySet()) {
+      DummyCallback callback = callbacks.get(instance);
+      if (instance.equals(leader)) {
+        Assert.assertTrue(callback.isInitTypeInvoked());
+      } else {
+        Assert.assertFalse(callback.isInitTypeInvoked());
+      }
+      callback.reset();
+    }
+
+    // Add a fake instance should invoke custom-code runner
+    accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+    Thread.sleep(1000);
+    for (String instance : callbacks.keySet()) {
+      DummyCallback callback = callbacks.get(instance);
+      if (instance.equals(leader)) {
+        Assert.assertTrue(callback.isCallbackTypeInvoked());
+      } else {
+        Assert.assertFalse(callback.isCallbackTypeInvoked());
+      }
+    }
+
+    // Clean up
+    controller.syncStop();
+    for (MockParticipantManager participant : participants.values()) {
+      participant.syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
new file mode 100644
index 0000000..8a0fe5a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
@@ -0,0 +1,268 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends ZkUnitTestBase {
+  private static final int N = 2;
+  private static final int PARTITION_NUM = 1;
+
+  @Test
+  public void testDisableResourceInSemiAutoMode() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager participants[] = new MockParticipantManager[N];
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Disable TestDB0
+    enableResource(clusterName, false);
+    checkExternalView(clusterName);
+
+    // Re-enable TestDB0
+    enableResource(clusterName, true);
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // Clean up
+    controller.syncStop();
+    for (int i = 0; i < N; i++) {
+      participants[i].syncStop();
+    }
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testDisableResourceInFullAutoMode() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", RebalanceMode.FULL_AUTO, true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager participants[] = new MockParticipantManager[N];
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // disable TestDB0
+    enableResource(clusterName, false);
+    checkExternalView(clusterName);
+
+    // Re-enable TestDB0
+    enableResource(clusterName, true);
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // Clean up
+    controller.syncStop();
+    for (int i = 0; i < N; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testDisableResourceInCustomMode() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+    // set up custom ideal-state
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setPartitionState("TestDB0_0", "localhost_12918", "SLAVE");
+    idealState.setPartitionState("TestDB0_0", "localhost_12919", "SLAVE");
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager participants[] = new MockParticipantManager[N];
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Disable TestDB0
+    enableResource(clusterName, false);
+    checkExternalView(clusterName);
+
+    // Re-enable TestDB0
+    enableResource(clusterName, true);
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Clean up
+    controller.syncStop();
+    for (int i = 0; i < N; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  private void enableResource(String clusterName, boolean enabled) {
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.enableResource(clusterName, "TestDB0", enabled);
+  }
+
+  /**
+   * Check all partitions are in OFFLINE state
+   * @param accessor
+   * @throws Exception
+   */
+  private void checkExternalView(String clusterName) throws Exception {
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+
+    // verify that states of TestDB0 are all OFFLINE
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+        ExternalView extView = accessor.getProperty(keyBuilder.externalView("TestDB0"));
+        if (extView == null) {
+          return false;
+        }
+        Set<String> partitionSet = extView.getPartitionSet();
+        if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+          return false;
+        }
+        for (String partition : partitionSet) {
+          Map<String, String> instanceStates = extView.getStateMap(partition);
+          for (String state : instanceStates.values()) {
+            if (!"OFFLINE".equals(state)) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 4b3764f..658d22e 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -34,6 +34,7 @@ import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConfigScopeBuilder;
@@ -56,7 +57,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
       _gZkClient.deleteRecursive(rootPath);
     }
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
     tool.addCluster(clusterName, true);
@@ -199,7 +200,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
 
@@ -235,7 +236,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
 
@@ -288,4 +289,35 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
+
+  @Test
+  public void testDisableResource() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName, true);
+    Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
+
+    String resourceName = "TestDB";
+    admin.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(
+        StateModelConfigGenerator.generateConfigForMasterSlave()));
+    admin.addResource(clusterName, resourceName, 4, "MasterSlave");
+    admin.enableResource(clusterName, resourceName, false);
+
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+    Assert.assertFalse(idealState.isEnabled());
+
+    admin.enableResource(clusterName, resourceName, true);
+    idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+    Assert.assertTrue(idealState.isEnabled());
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
index ff7a455..9325934 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
@@ -19,8 +19,6 @@ package org.apache.helix.mock.participant;
  * under the License.
  */
 
-import java.util.Map;
-
 import org.apache.helix.participant.statemachine.StateModelFactory;
 
 // mock master slave state model factory

http://git-wip-us.apache.org/repos/asf/helix/blob/731e3269/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
index 7190968..66e8870 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Date;
 
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathConfig;
@@ -36,6 +38,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.tools.ClusterSetup;
@@ -440,4 +443,41 @@ public class TestClusterSetup extends ZkUnitTestBase {
 
   }
 
+  @Test
+  public void testDisableResource() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    // disable "TestDB0" resource
+    ClusterSetup.processCommandLineArgs(new String[] {
+        "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "false"
+    });
+
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertFalse(idealState.isEnabled());
+
+    // enable "TestDB0" resource
+    ClusterSetup.processCommandLineArgs(new String[] {
+        "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "true"
+    });
+    idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertTrue(idealState.isEnabled());
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
 }


Mime
View raw message