helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] helix git commit: Add state transition throttling logic into intermediateStateCalcStage.
Date Tue, 03 Oct 2017 06:01:24 GMT
Repository: helix
Updated Branches:
  refs/heads/master 79ebc0469 -> 55b844657


http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
new file mode 100644
index 0000000..7a87a0f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
@@ -0,0 +1,283 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase {
+  ConfigAccessor _configAccessor;
+
+  @Override
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NODE_NR; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < NODE_NR - 2; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      participant.setTransition(new DelayedTransition());
+      participant.syncStart();
+      _participants[i] = participant;
+    }
+
+    _configAccessor = new ConfigAccessor(_gZkClient);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+  }
+
+  @Test()
+  public void testResourceThrottle() throws Exception {
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+
+    StateTransitionThrottleConfig resourceThrottle =
+        new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.RESOURCE, 2);
+
+    StateTransitionThrottleConfig clusterThrottle =
+        new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 100);
+
+    clusterConfig.setStateTransitionThrottleConfigs(
+        Arrays.asList(resourceThrottle, clusterThrottle));
+    clusterConfig.setPersistIntermediateAssignment(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    List<String> dbs = new ArrayList<String>();
+
+    for (int i = 0; i < 5; i++) {
+      String dbName = "TestDB-" + i;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, dbName, 10, STATE_MODEL,
+          RebalanceMode.FULL_AUTO + "");
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
+      dbs.add(dbName);
+    }
+
+    HelixClusterVerifier _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    _clusterVerifier.verify(10000);
+
+    DelayedTransition.setDelay(50);
+    DelayedTransition.enableThrottleRecord();
+
+    // add 2 nodes
+    for (int i = NODE_NR - 2; i < NODE_NR; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':',
'_'));
+      participant.syncStart();
+      _participants[i] = participant;
+    }
+
+    _clusterVerifier.verify(20000);
+
+    for (String db : dbs) {
+      validateThrottle(DelayedTransition.getResourcePatitionTransitionTimes(), db, 2);
+    }
+  }
+
+  private void validateThrottle(
+      Map<String, List<PartitionTransitionTime>> partitionTransitionTimesMap,
+      String throttledItemName, int maxPendingTransition) {
+    List<PartitionTransitionTime> pTimeList = partitionTransitionTimesMap.get(throttledItemName);
+
+    Map<Long, List<PartitionTransitionTime>> startMap =
+        new HashMap<Long, List<PartitionTransitionTime>>();
+    Map<Long, List<PartitionTransitionTime>> endMap =
+        new HashMap<Long, List<PartitionTransitionTime>>();
+    List<Long> startEndPoints = new ArrayList<Long>();
+
+    if (pTimeList == null) {
+      System.out.println("no throttle result for :" + throttledItemName);
+      return;
+    }
+    Collections.sort(pTimeList, new Comparator<PartitionTransitionTime>() {
+      @Override
+      public int compare(PartitionTransitionTime o1, PartitionTransitionTime o2) {
+        return (int) (o1.start - o2.start);
+      }
+    });
+
+    for (PartitionTransitionTime interval : pTimeList) {
+      if (!startMap.containsKey(interval.start)) {
+        startMap.put(interval.start, new ArrayList<PartitionTransitionTime>());
+      }
+      startMap.get(interval.start).add(interval);
+
+      if (!endMap.containsKey(interval.end)) {
+        endMap.put(interval.end, new ArrayList<PartitionTransitionTime>());
+      }
+      endMap.get(interval.end).add(interval);
+      startEndPoints.add(interval.start);
+      startEndPoints.add(interval.end);
+    }
+
+    Collections.sort(startEndPoints);
+
+    List<PartitionTransitionTime> temp = new ArrayList<PartitionTransitionTime>();
+
+    int maxInParallel = 0;
+    for (long point : startEndPoints) {
+      if (startMap.containsKey(point)) {
+        temp.addAll(startMap.get(point));
+      }
+      int curSize = size(temp);
+      if (curSize > maxInParallel) {
+        maxInParallel = curSize;
+      }
+      if (endMap.containsKey(point)) {
+        temp.removeAll(endMap.get(point));
+      }
+    }
+
+    System.out.println(
+        "MaxInParallel: " + maxInParallel + " maxPendingTransition: " + maxPendingTransition);
+    Assert.assertTrue(maxInParallel <= maxPendingTransition,
+        "Throttle condition does not meet for " + throttledItemName);
+  }
+
+
+  private int size(List<PartitionTransitionTime> timeList) {
+    Set<String> partitions = new HashSet<String>();
+    for (PartitionTransitionTime p : timeList) {
+      partitions.add(p.partition);
+    }
+    return partitions.size();
+  }
+
+  private static class PartitionTransitionTime {
+    String partition;
+    long start;
+    long end;
+
+    public PartitionTransitionTime(String partition, long start, long end) {
+      this.partition = partition;
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override public String toString() {
+      return "[" +
+          "partition='" + partition + '\'' +
+          ", start=" + start +
+          ", end=" + end +
+          ']';
+    }
+  }
+
+  private static class DelayedTransition extends MockTransition {
+    private static long _delay = 0;
+    private static Map<String, List<PartitionTransitionTime>> resourcePatitionTransitionTimes
=
+        new HashMap<String, List<PartitionTransitionTime>>();
+    private static Map<String, List<PartitionTransitionTime>> instancePatitionTransitionTimes
=
+        new HashMap<String, List<PartitionTransitionTime>>();
+    private static boolean _recordThrottle = false;
+
+    public static void setDelay(long delay) {
+      _delay = delay;
+    }
+
+    public static Map<String, List<PartitionTransitionTime>> getResourcePatitionTransitionTimes()
{
+      return resourcePatitionTransitionTimes;
+    }
+
+    public static Map<String, List<PartitionTransitionTime>> getInstancePatitionTransitionTimes()
{
+      return instancePatitionTransitionTimes;
+    }
+
+    public static void enableThrottleRecord() {
+      _recordThrottle = true;
+    }
+
+    @Override public void doTransition(Message message, NotificationContext context)
+        throws InterruptedException {
+      long start = System.currentTimeMillis();
+      if (_delay > 0) {
+        Thread.sleep(_delay);
+      }
+      long end = System.currentTimeMillis();
+      if (_recordThrottle) {
+        PartitionTransitionTime partitionTransitionTime =
+            new PartitionTransitionTime(message.getPartitionName(), start, end);
+
+        /*System.out.println(String
+            .format("Transit resource %s partition %s from %s to %s at instance %s: %s",
+                message.getResourceName(), message.getPartitionName(), message.getFromState(),
+                message.getToState(), message.getTgtName(), partitionTransitionTime));
+        */
+        if (!resourcePatitionTransitionTimes.containsKey(message.getResourceName())) {
+          resourcePatitionTransitionTimes
+              .put(message.getResourceName(), new ArrayList<PartitionTransitionTime>());
+        }
+        resourcePatitionTransitionTimes.get(message.getResourceName()).add(partitionTransitionTime);
+
+        if (!instancePatitionTransitionTimes.containsKey(message.getTgtName())) {
+          instancePatitionTransitionTimes
+              .put(message.getTgtName(), new ArrayList<PartitionTransitionTime>());
+        }
+        instancePatitionTransitionTimes.get(message.getTgtName()).add(partitionTransitionTime);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index d89533a..641f13a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -286,7 +286,7 @@ public class TaskTestUtil {
       runStage(event, stage);
     }
 
-    return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
   }
 
   /**


Mime
View raw message