helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-381] ClusterStateVerifier can now work with a subset of resources
Date Thu, 20 Feb 2014 01:32:43 GMT
Repository: helix
Updated Branches:
  refs/heads/master ef6e9bbe0 -> c8a644f4b


[HELIX-381] ClusterStateVerifier can now work with a subset of resources


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c8a644f4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c8a644f4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c8a644f4

Branch: refs/heads/master
Commit: c8a644f4b72871de0186cb85338ddd6c39929a51
Parents: ef6e9bb
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Tue Feb 18 13:41:16 2014 -0800
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Wed Feb 19 17:32:26 2014 -0800

----------------------------------------------------------------------
 .../helix/tools/ClusterStateVerifier.java       |  45 ++++++-
 .../helix/tools/TestClusterStateVerifier.java   | 128 +++++++++++++++++++
 2 files changed, 169 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c8a644f4/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 1563769..da9d76e 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -53,10 +54,9 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -69,12 +69,15 @@ import org.apache.helix.model.builder.ResourceAssignmentBuilder;
 import org.apache.helix.util.ZKClientPool;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Sets;
+
 public class ClusterStateVerifier {
   public static String cluster = "cluster";
   public static String zkServerAddress = "zkSvr";
   public static String help = "help";
   public static String timeout = "timeout";
   public static String period = "period";
+  public static String resources = "resources";
 
   private static Logger LOG = Logger.getLogger(ClusterStateVerifier.class);
 
@@ -136,6 +139,7 @@ public class ClusterStateVerifier {
     private final String clusterName;
     private final Map<String, Map<String, String>> errStates;
     private final ZkClient zkClient;
+    private final Set<String> resources;
 
     public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) {
       this(zkAddr, clusterName, null);
@@ -143,6 +147,11 @@ public class ClusterStateVerifier {
 
     public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
         Map<String, Map<String, String>> errStates) {
+      this(zkAddr, clusterName, errStates, null);
+    }
+
+    public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
+        Map<String, Map<String, String>> errStates, Set<String> resources)
{
       if (zkAddr == null || clusterName == null) {
         throw new IllegalArgumentException("requires zkAddr|clusterName");
       }
@@ -150,6 +159,7 @@ public class ClusterStateVerifier {
       this.clusterName = clusterName;
       this.errStates = errStates;
       this.zkClient = ZKClientPool.getZkClient(zkAddr); // null;
+      this.resources = resources;
     }
 
     @Override
@@ -158,7 +168,8 @@ public class ClusterStateVerifier {
         HelixDataAccessor accessor =
             new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
 
-        return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName);
+        return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName,
+            resources);
       } catch (Exception e) {
         LOG.error("exception in verification", e);
       }
@@ -225,6 +236,11 @@ public class ClusterStateVerifier {
 
   static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
       Map<String, Map<String, String>> errStates, String clusterName) {
+    return verifyBestPossAndExtView(accessor, errStates, clusterName, null);
+  }
+
+  static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
+      Map<String, Map<String, String>> errStates, String clusterName, Set<String>
resources) {
     try {
       Builder keyBuilder = accessor.keyBuilder();
 
@@ -239,6 +255,12 @@ public class ClusterStateVerifier {
         extViews = Collections.emptyMap();
       }
 
+      // Filter resources if requested
+      if (resources != null && !resources.isEmpty()) {
+        idealStates.keySet().retainAll(resources);
+        extViews.keySet().retainAll(resources);
+      }
+
       // if externalView is not empty and idealState doesn't exist
       // add empty idealState for the resource
       for (String resource : extViews.keySet()) {
@@ -624,12 +646,15 @@ public class ClusterStateVerifier {
     long timeoutValue = 0;
     long periodValue = 1000;
 
+    Set<String> resourceSet = null;
     if (args.length > 0) {
       CommandLine cmd = processCommandLineArgs(args);
       zkServer = cmd.getOptionValue(zkServerAddress);
       clusterName = cmd.getOptionValue(cluster);
       String timeoutStr = cmd.getOptionValue(timeout);
       String periodStr = cmd.getOptionValue(period);
+      String resourceStr = cmd.getOptionValue(resources);
+
       if (timeoutStr != null) {
         try {
           timeoutValue = Long.parseLong(timeoutStr);
@@ -647,12 +672,24 @@ public class ClusterStateVerifier {
         }
       }
 
+      // Allow specifying resources explicitly
+      if (resourceStr != null) {
+        String[] resources = resourceStr.split(resourceStr);
+        resourceSet = Sets.newHashSet(resources);
+      }
+
     }
     // return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
     // timeoutValue,
     // periodValue);
 
-    return verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkServer, clusterName), timeoutValue);
+    ZkVerifier verifier;
+    if (resourceSet == null) {
+      verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName);
+    } else {
+      verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName, null, resourceSet);
+    }
+    return verifyByZkCallback(verifier, timeoutValue);
   }
 
   public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c8a644f4/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
new file mode 100644
index 0000000..9276dcd
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
@@ -0,0 +1,128 @@
+package org.apache.helix.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestClusterStateVerifier extends ZkUnitTestBase {
+  final String[] RESOURCES = {
+      "resource0", "resource1"
+  };
+  private HelixAdmin _admin;
+  private MockParticipantManager[] _participants;
+  private ClusterControllerManager _controller;
+  private String _clusterName;
+
+  @BeforeMethod
+  public void beforeMethod() throws InterruptedException {
+    final int NUM_PARTITIONS = 1;
+    final int NUM_REPLICAS = 1;
+
+    // Cluster and resource setup
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    _clusterName = className + "_" + methodName;
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    _admin = setupTool.getClusterManagementTool();
+    setupTool.addCluster(_clusterName, true);
+    setupTool.addResourceToCluster(_clusterName, RESOURCES[0], NUM_PARTITIONS, "OnlineOffline",
+        RebalanceMode.SEMI_AUTO.toString());
+    setupTool.addResourceToCluster(_clusterName, RESOURCES[1], NUM_PARTITIONS, "OnlineOffline",
+        RebalanceMode.SEMI_AUTO.toString());
+
+    // Configure and start the participants
+    _participants = new MockParticipantManager[RESOURCES.length];
+    for (int i = 0; i < _participants.length; i++) {
+      String host = "localhost";
+      int port = 12918 + i;
+      String id = host + '_' + port;
+      setupTool.addInstanceToCluster(_clusterName, id);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, id);
+      _participants[i].syncStart();
+    }
+
+    // Rebalance the resources
+    for (int i = 0; i < RESOURCES.length; i++) {
+      IdealState idealState = _admin.getResourceIdealState(_clusterName, RESOURCES[i]);
+      idealState.setReplicas(Integer.toString(NUM_REPLICAS));
+      idealState.setPreferenceList(RESOURCES[i] + "_0",
+          Arrays.asList(_participants[i].getInstanceName()));
+      _admin.setResourceIdealState(_clusterName, RESOURCES[i], idealState);
+    }
+
+    // Start the controller
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    _controller.syncStart();
+    Thread.sleep(1000);
+  }
+
+  @AfterMethod
+  public void afterMethod() {
+    // Cleanup
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    _admin.dropCluster(_clusterName);
+  }
+
+  @Test
+  public void testEntireCluster() {
+    // Just ensure that the entire cluster passes
+    // ensure that the external view coalesces
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            _clusterName));
+    Assert.assertTrue(result);
+  }
+
+  @Test
+  public void testResourceSubset() throws InterruptedException {
+    // Ensure that this passes even when one resource is down
+    _admin.enableInstance(_clusterName, "localhost_12918", false);
+    Thread.sleep(1000);
+    _admin.enableCluster(_clusterName, false);
+    _admin.enableInstance(_clusterName, "localhost_12918", true);
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            _clusterName, null, Sets.newHashSet(RESOURCES[1])));
+    Assert.assertTrue(result);
+
+    // But the full cluster verification should fail
+    boolean fullResult = new BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName).verify();
+    Assert.assertFalse(fullResult);
+    _admin.enableCluster(_clusterName, true);
+  }
+}


Mime
View raw message