helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: HELIX-200: helix controller send ERROR->DROPPED transition infinitely, rb=13462
Date Tue, 13 Aug 2013 20:34:02 GMT
Updated Branches:
  refs/heads/helix-0.6.1.5-release a97981813 -> fa7597e2d


HELIX-200: helix controller send ERROR->DROPPED transition infinitely, rb=13462


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/fa7597e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/fa7597e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/fa7597e2

Branch: refs/heads/helix-0.6.1.5-release
Commit: fa7597e2dd19cdf56884a085a5a94dd33679cd4a
Parents: a979818
Author: zzhang <zzhang@apache.org>
Authored: Tue Aug 13 13:33:15 2013 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Tue Aug 13 13:33:15 2013 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      |  95 ++++---
 .../helix/tools/ClusterStateVerifier.java       |  34 ++-
 .../cluster-manager-version.properties          |   1 +
 .../integration/TestDropErrorPartition.java     | 257 +++++++++++++++++++
 4 files changed, 346 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fa7597e2/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index aca0e74..f1f9c43 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -32,6 +32,7 @@ import java.util.TreeMap;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -49,7 +50,7 @@ import org.apache.log4j.Logger;
 /**
  * For partition compute best possible (instance,state) pair based on
  * IdealState,StateModel,LiveInstance
- * 
+ *
  */
 // TODO: refactor this
 public class BestPossibleStateCalcStage extends AbstractBaseStage
@@ -131,7 +132,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
                                         currentStateOutput);
       }
 
-      
+
       for (Partition partition : resource.getPartitions())
       {
         Map<String, String> currentStateMap =
@@ -150,20 +151,22 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
                                                      stateModelDef,
                                                      idealStateMap,
                                                      currentStateMap,
-                                                     disabledInstancesForPartition);
+                                                     disabledInstancesForPartition,
+                                                     manager.getProperties());
         }
         else
         // both AUTO and AUTO_REBALANCE mode
         {
           List<String> instancePreferenceList =
               getPreferenceList(cache, partition, idealState, stateModelDef);
-          
+
           bestStateForPartition =
               computeAutoBestStateForPartition(cache,
                                                stateModelDef,
                                                instancePreferenceList,
                                                currentStateMap,
-                                               disabledInstancesForPartition);
+                                               disabledInstancesForPartition,
+                                               manager.getProperties());
         }
         output.setState(resourceName, partition, bestStateForPartition);
       }
@@ -175,7 +178,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
    * Compute best state for resource in AUTO_REBALANCE ideal state mode. the algorithm
    * will make sure that the master partition are evenly distributed; Also when instances
    * are added / removed, the amount of diff in master partitions are minimized
-   * 
+   *
    * @param cache
    * @param idealState
    * @param instancePreferenceList
@@ -191,7 +194,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
     String topStateValue = stateModelDef.getStatesPriorityList().get(0);
     Set<String> liveInstances = cache._liveInstanceMap.keySet();
     Set<String> taggedInstances = new HashSet<String>();
-    
+
     // If there are instances tagged with resource name, use only those instances
     if(idealState.getInstanceGroupTag() != null)
     {
@@ -275,7 +278,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
   /**
    * Given the current master assignment map and the partitions not hosted, generate an
    * evenly distributed partition assignment map
-   * 
+   *
    * @param masterAssignmentMap
    *          current master assignment map
    * @param orphanPartitions
@@ -342,7 +345,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
   /**
    * Generate full preference list from the master assignment map evenly distribute the
    * slave partitions mastered on a host to other hosts
-   * 
+   *
    * @param masterAssignmentMap
    *          current master assignment map
    * @param orphanPartitions
@@ -387,8 +390,36 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
   }
 
   /**
+   * Is participant version support error->dropped transition
+   */
+  private boolean isDropErrorSupported(HelixManagerProperties properties,
+                                       ClusterDataCache cache,
+                                       String instance)
+  {
+    if (properties == null)
+    {
+      return false;
+    }
+
+    LiveInstance liveInstance = cache.getLiveInstances().get(instance);
+    String participantVersion = null;
+    if (liveInstance != null) {
+      participantVersion = liveInstance.getHelixVersion();
+    }
+
+    return properties.isFeatureSupported("drop_error_partition", participantVersion);
+  }
+
+  private boolean isNotError(Map<String, String> currentStateMap, String instance)
+  {
+    return currentStateMap == null
+        || currentStateMap.get(instance) == null
+        || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
+  }
+
+  /**
    * compute best state for resource in AUTO ideal state mode
-   * 
+   *
    * @param cache
    * @param stateModelDef
    * @param instancePreferenceList
@@ -401,7 +432,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
                                                                StateModelDefinition stateModelDef,
                                                                List<String> instancePreferenceList,
                                                                Map<String, String>
currentStateMap,
-                                                               Set<String> disabledInstancesForPartition)
+                                                               Set<String> disabledInstancesForPartition,
+                                                               HelixManagerProperties properties)
   {
     Map<String, String> instanceStateMap = new HashMap<String, String>();
 
@@ -411,15 +443,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
     {
       for (String instance : currentStateMap.keySet())
       {
+        boolean isDropErrorSupported = isDropErrorSupported(properties, cache, instance);
+        boolean isNotError = isNotError(currentStateMap, instance);
+
         if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
             && !disabledInstancesForPartition.contains(instance))
         {
-          // if dropped and not disabled, transit to DROPPED
-          instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
+          if (isDropErrorSupported || isNotError)
+          {
+            // if dropped and not disabled, transit to DROPPED
+            instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
+          }
         }
-        else if ( (currentStateMap.get(instance) == null 
-            || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()))
-            && disabledInstancesForPartition.contains(instance))
+        else if ( isNotError && disabledInstancesForPartition.contains(instance))
         {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
           instanceStateMap.put(instance, stateModelDef.getInitialState());
@@ -470,9 +506,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
         {
           String instanceName = instancePreferenceList.get(i);
 
-          boolean notInErrorState = currentStateMap == null 
-              || currentStateMap.get(instanceName) == null
-              || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
+          boolean notInErrorState = isNotError(currentStateMap, instanceName);
 
           if (liveInstancesMap.containsKey(instanceName) && !assigned[i]
               && notInErrorState && !disabledInstancesForPartition.contains(instanceName))
@@ -493,7 +527,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
 
   /**
    * compute best state for resource in CUSTOMIZED ideal state mode
-   * 
+   *
    * @param cache
    * @param stateModelDef
    * @param idealStateMap
@@ -505,7 +539,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
                                                                      StateModelDefinition
stateModelDef,
                                                                      Map<String, String>
idealStateMap,
                                                                      Map<String, String>
currentStateMap,
-                                                                     Set<String> disabledInstancesForPartition)
+                                                                     Set<String> disabledInstancesForPartition,
+                                                                     HelixManagerProperties
properties)
   {
     Map<String, String> instanceStateMap = new HashMap<String, String>();
 
@@ -515,15 +550,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
     {
       for (String instance : currentStateMap.keySet())
       {
+        boolean isDropErrorSupported = isDropErrorSupported(properties, cache, instance);
+        boolean isNotError = isNotError(currentStateMap, instance);
+
         if ((idealStateMap == null || !idealStateMap.containsKey(instance))
             && !disabledInstancesForPartition.contains(instance))
         {
-          // if dropped and not disabled, transit to DROPPED
-          instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
+          if (isDropErrorSupported || isNotError)
+          {
+            // if dropped and not disabled, transit to DROPPED
+            instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
+          }
         }
-        else if ( (currentStateMap.get(instance) == null 
-            || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()))
-            && disabledInstancesForPartition.contains(instance))
+        else if (isNotError && disabledInstancesForPartition.contains(instance))
         {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
           instanceStateMap.put(instance, stateModelDef.getInitialState());
@@ -540,9 +579,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
     Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
     for (String instance : idealStateMap.keySet())
     {
-      boolean notInErrorState = currentStateMap == null 
-          || currentStateMap.get(instance) == null
-          || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
+      boolean notInErrorState = isNotError(currentStateMap, instance);
 
       if (liveInstancesMap.containsKey(instance) && notInErrorState
           && !disabledInstancesForPartition.contains(instance))

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fa7597e2/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index eaada16..6c55a56 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -41,6 +41,8 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
@@ -55,6 +57,7 @@ import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
@@ -179,7 +182,10 @@ public class ClusterStateVerifier
             new ZKHelixDataAccessor(clusterName,
                                     new ZkBaseDataAccessor<ZNRecord>(zkClient));
 
-        return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
+        return ClusterStateVerifier.verifyBestPossAndExtView(zkAddr,
+                                                             clusterName,
+                                                             accessor,
+                                                             errStates);
       }
       catch (Exception e)
       {
@@ -259,7 +265,9 @@ public class ClusterStateVerifier
 
   }
 
-  static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
+  static boolean verifyBestPossAndExtView(String zkAddr,
+                                          String clusterName,
+                                          HelixDataAccessor accessor,
                                           Map<String, Map<String, String>> errStates)
   {
     try
@@ -292,10 +300,10 @@ public class ClusterStateVerifier
           idealStates.put(resource, new IdealState(resource));
         }
       }
-      
+
       // calculate best possible state
       BestPossibleStateOutput bestPossOutput =
-          ClusterStateVerifier.calcBestPossState(cache);
+          ClusterStateVerifier.calcBestPossState(zkAddr, clusterName, cache);
       Map<String, Map<Partition, Map<String, String>>> bestPossStateMap
= bestPossOutput.getStateMap();
 
       // set error states
@@ -307,7 +315,7 @@ public class ClusterStateVerifier
           for (String partitionName : partErrStates.keySet())
           {
             String instanceName = partErrStates.get(partitionName);
-            
+
             if (!bestPossStateMap.containsKey(resourceName)) {
               bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String,
String>>());
             }
@@ -319,10 +327,10 @@ public class ClusterStateVerifier
           }
         }
       }
-      
+
       // System.out.println("stateMap: " + bestPossStateMap);
 
-      
+
       for (String resourceName : idealStates.keySet())
       {
         ExternalView extView = extViews.get(resourceName);
@@ -355,7 +363,7 @@ public class ClusterStateVerifier
               if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString()))
               {
                 insIter.remove();
-              }   
+              }
             }
           }
         }
@@ -370,7 +378,7 @@ public class ClusterStateVerifier
           LOG.info("exterView size (" + extViewSize
               + ") is different from bestPossState size (" + bestPossStateSize
               + ") for resource: " + resourceName);
-          
+
 //          System.err.println("exterView size (" + extViewSize
 //              + ") is different from bestPossState size (" + bestPossStateSize
 //              + ") for resource: " + resourceName);
@@ -395,7 +403,7 @@ public class ClusterStateVerifier
           {
             LOG.info("externalView is different from bestPossibleState for partition:"
                 + partition);
-            
+
 //            System.err.println("externalView is different from bestPossibleState for partition:
"
 //                + partition + ", actual: " + evInstanceStateMap + ", bestPoss: " + bpInstanceStateMap);
             return false;
@@ -477,16 +485,18 @@ public class ClusterStateVerifier
   /**
    * calculate the best possible state note that DROPPED states are not checked since when
    * kick off the BestPossibleStateCalcStage we are providing an empty current state map
-   * 
+   *
    * @param cache
    * @return
    * @throws Exception
    */
 
-  static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception
+  static BestPossibleStateOutput calcBestPossState(String zkAddr, String clusterName, ClusterDataCache
cache) throws Exception
   {
     ClusterEvent event = new ClusterEvent("sampleEvent");
     event.addAttribute("ClusterDataCache", cache);
+    HelixManager manager = new ZKHelixManager(clusterName, "verifier", InstanceType.SPECTATOR,
zkAddr);
+    event.addAttribute("helixmanager", manager);
 
     ResourceComputationStage rcState = new ResourceComputationStage();
     CurrentStateComputationStage csStage = new CurrentStateComputationStage();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fa7597e2/helix-core/src/main/resources/cluster-manager-version.properties
----------------------------------------------------------------------
diff --git a/helix-core/src/main/resources/cluster-manager-version.properties b/helix-core/src/main/resources/cluster-manager-version.properties
index 8a6db4c..c715beb 100644
--- a/helix-core/src/main/resources/cluster-manager-version.properties
+++ b/helix-core/src/main/resources/cluster-manager-version.properties
@@ -19,5 +19,6 @@
 
 clustermanager.version=${project.version}
 
+minimum_supported_version.drop_error_partition=0.6.1
 minimum_supported_version.batch_message=0.6.1
 minimum_supported_version.participant=0.4

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fa7597e2/helix-core/src/test/java/org/apache/helix/integration/TestDropErrorPartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDropErrorPartition.java
b/helix-core/src/test/java/org/apache/helix/integration/TestDropErrorPartition.java
new file mode 100644
index 0000000..6366c6f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDropErrorPartition.java
@@ -0,0 +1,257 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.Verifier;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.ErrTransition;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDropErrorPartition extends ZkUnitTestBase
+{
+  @Test
+  public void testDropErrorSupported() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName,
+                            ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            8, // partitions per resource
+                            n, // number of nodes
+                            2, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // enable batch message
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      if (i == 0)
+      {
+        Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+        errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_0"));
+        participants[i] =
+            new MockParticipant(clusterName,
+                                instanceName,
+                                ZK_ADDR,
+                                new ErrTransition(errPartitions));
+
+      } else
+      {
+        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      }
+      participants[i].syncStart();
+    }
+
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    Map<String, Map<String, String>> errStates = new HashMap<String, Map<String,
String>>();
+    errStates.put("TestDB0", new HashMap<String, String>());
+    errStates.get("TestDB0").put("TestDB0_0", "localhost_12918");
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName,
+                                                                                 errStates));
+
+    Assert.assertTrue(result);
+
+    // drop resource
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.dropResource(clusterName, "TestDB0");
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+
+    // check dropping partition in ERROR state
+    PropertyKey key = keyBuilder.externalView("TestDB0");
+    ExternalView externalView = accessor.getProperty(key);
+    Assert.assertNotNull(externalView);
+    Assert.assertEquals(externalView.getPartitionSet().size(), 0);
+
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testDropErrorNotSupported() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName,
+                            ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            8, // partitions per resource
+                            n, // number of nodes
+                            2, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // enable batch message
+    final ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final Builder keyBuilder = accessor.keyBuilder();
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      if (i == 0)
+      {
+        Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+        errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_0"));
+        participants[i] =
+            new MockParticipant(clusterName,
+                                instanceName,
+                                ZK_ADDR,
+                                new ErrTransition(errPartitions));
+
+      } else
+      {
+        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      }
+      participants[i].syncStart();
+
+      // change version to < 0.6.1
+      PropertyKey key = keyBuilder.liveInstance(instanceName);
+      LiveInstance liveInstance = accessor.getProperty(key);
+      liveInstance.setHelixVersion("0.5.32");
+      accessor.setProperty(key, liveInstance);
+    }
+
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    Map<String, Map<String, String>> errStates = new HashMap<String, Map<String,
String>>();
+    errStates.put("TestDB0", new HashMap<String, String>());
+    errStates.get("TestDB0").put("TestDB0_0", "localhost_12918");
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName,
+                                                                                 errStates));
+
+    Assert.assertTrue(result);
+
+    // drop resource
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.dropResource(clusterName, "TestDB0");
+
+    // check not dropping partition in ERROR state
+    result = TestHelper.verify(new Verifier()
+    {
+
+      @Override
+      public boolean verify() throws Exception
+      {
+        PropertyKey key = keyBuilder.externalView("TestDB0");
+        ExternalView externalView = accessor.getProperty(key);
+        if (externalView == null)
+        {
+          return false;
+        }
+
+        if (externalView.getPartitionSet().size() != 1)
+        {
+          return false;
+        }
+
+        if (!externalView.getStateMap("TestDB0_0").get("localhost_12918").equals("ERROR"))
+        {
+          return false;
+        }
+
+        return true;
+      }
+    }, 3000);
+    Assert.assertTrue(result);
+
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}


Mime
View raw message