helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject helix git commit: [HELIX-596] fix throttled messages still take constraints' quota
Date Thu, 21 May 2015 19:00:02 GMT
Repository: helix
Updated Branches:
  refs/heads/master a29ac9fb3 -> 9ddbefcac


[HELIX-596] fix throttled messages still take constraints' quota


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9ddbefca
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9ddbefca
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9ddbefca

Branch: refs/heads/master
Commit: 9ddbefcacff6b8e229e6413299d53d89f1cbcd43
Parents: a29ac9f
Author: Hang Qi <hangq.1985@gmail.com>
Authored: Sun May 17 23:06:20 2015 -0700
Committer: Hang Qi <hangqi@yahoo-inc.com>
Committed: Mon May 18 00:51:24 2015 -0700

----------------------------------------------------------------------
 .../controller/stages/MessageThrottleStage.java |   7 +-
 .../stages/TestMessageThrottleStage.java        | 126 +++++++++++++++++++
 .../apache/helix/testutil/HelixTestUtil.java    |   8 ++
 3 files changed, 140 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9ddbefca/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index 39bb228..635ac17 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -173,13 +173,14 @@ public class MessageThrottleStage extends AbstractBaseStage {
       matches = selectConstraints(matches, msgAttr);
 
       boolean msgThrottled = false;
+      Map<String, Integer> perMessageThrottleQuotaMap = new HashMap<String, Integer>();
       for (ConstraintItem item : matches) {
         String key = item.filter(msgAttr).toString();
         if (!throttleMap.containsKey(key)) {
           throttleMap.put(key, valueOf(item.getConstraintValue()));
         }
         int value = throttleMap.get(key);
-        throttleMap.put(key, --value);
+        perMessageThrottleQuotaMap.put(key, --value);
 
         if (needThrottle && value < 0) {
           msgThrottled = true;
@@ -193,6 +194,10 @@ public class MessageThrottleStage extends AbstractBaseStage {
 
       if (!msgThrottled) {
         throttleOutputMsgs.add(message);
+        // copy back perMessageThrottleQuotaMap to throttleMap
+        for (Map.Entry<String, Integer> entry: perMessageThrottleQuotaMap.entrySet())
{
+          throttleMap.put(entry.getKey(), entry.getValue());
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddbefca/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 29228e4..40823a8 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -329,6 +329,132 @@ public class TestMessageThrottleStage extends ZkTestBase {
 
   }
 
+  @Test()
+  public void testMsgThrottleConstraintsQuota() throws Exception {
+    String clusterName = "CLUSTER_" + _className + "_constraints_quota";
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+    TestHelper.setupEmptyCluster(_zkclient, clusterName);
+
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    List<IdealState> idealStates =
+        HelixTestUtil.setupIdealState(_baseAccessor, clusterName, new int[] {
+            0, 1
+        }, new String[] {
+          "TestDB"
+        }, 2, 2);
+    HelixTestUtil.setupLiveInstances(_baseAccessor, clusterName, new int[] {
+        0, 1
+    });
+    HelixTestUtil.setupStateModel(_baseAccessor, clusterName);
+
+    // setup constraints
+    ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
+
+    // constraint 0 & 1, per instance constraint
+    record.setMapField("constraint0", new TreeMap<String, String>());
+    record.getMapField("constraint0").put("MESSAGE_TYPE", "STATE_TRANSITION");
+    record.getMapField("constraint0").put("INSTANCE", "localhost_0");
+    record.getMapField("constraint0").put("CONSTRAINT_VALUE", "1");
+    ConstraintItem constraint0 = new ConstraintItem(record.getMapField("constraint0"));
+
+    record.setMapField("constraint1", new TreeMap<String, String>());
+    record.getMapField("constraint1").put("MESSAGE_TYPE", "STATE_TRANSITION");
+    record.getMapField("constraint1").put("INSTANCE", "localhost_1");
+    record.getMapField("constraint1").put("CONSTRAINT_VALUE", "1");
+    ConstraintItem constraint1 = new ConstraintItem(record.getMapField("constraint1"));
+
+    // constraint 2 & 3, per partition constraint
+    record.setMapField("constraint2", new TreeMap<String, String>());
+    record.getMapField("constraint2").put("MESSAGE_TYPE", "STATE_TRANSITION");
+    record.getMapField("constraint2").put("PARTITION", "TestDB_0");
+    record.getMapField("constraint2").put("CONSTRAINT_VALUE", "1");
+    ConstraintItem constraint2 = new ConstraintItem(record.getMapField("constraint2"));
+
+    record.setMapField("constraint3", new TreeMap<String, String>());
+    record.getMapField("constraint3").put("MESSAGE_TYPE", "STATE_TRANSITION");
+    record.getMapField("constraint3").put("PARTITION", "TestDB_1");
+    record.getMapField("constraint3").put("CONSTRAINT_VALUE", "1");
+    ConstraintItem constraint3 = new ConstraintItem(record.getMapField("constraint1"));
+
+    Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()),
+        new ClusterConstraints(record));
+
+    // ClusterConstraints constraint =
+    // accessor.getProperty(ClusterConstraints.class,
+    // PropertyType.CONFIGS,
+    // ConfigScopeProperty.CONSTRAINT.toString(),
+    // ConstraintType.MESSAGE_CONSTRAINT.toString());
+    ClusterConstraints constraint =
+        accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
+
+    MessageThrottleStage throttleStage = new MessageThrottleStage();
+
+    // test messageThrottleStage
+    ClusterEvent event = new ClusterEvent("testEvent");
+    event.addAttribute("helixmanager", manager);
+
+    // get an empty best possible output for the partitions
+    BestPossibleStateOutput bestPossOutput = getEmptyBestPossibleStateOutput(idealStates);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossOutput);
+
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+    HelixTestUtil.runPipeline(event, dataRefresh);
+    HelixTestUtil.runStage(event, new ResourceComputationStage());
+    MessageOutput msgSelectOutput = new MessageOutput();
+
+    Message msg1 =
+            HelixTestUtil.newMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-001"),
+                "OFFLINE", "SLAVE", "TestDB", "localhost_0", "TestDB_0");
+
+    Message msg2 =
+            HelixTestUtil.newMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-002"),
+                "OFFLINE", "SLAVE", "TestDB", "localhost_0", "TestDB_1");
+
+    Message msg3 =
+        HelixTestUtil.newMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-003"),
+            "OFFLINE", "SLAVE", "TestDB", "localhost_1", "TestDB_0");
+
+    Message msg4 =
+        HelixTestUtil.newMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-004"),
+            "OFFLINE", "SLAVE", "TestDB", "localhost_1", "TestDB_1");
+
+    List<Message> selectMessages0 = new ArrayList<Message>();
+    selectMessages0.add(msg1);
+    selectMessages0.add(msg2);
+    List<Message> selectMessages1 = new ArrayList<Message>();
+    selectMessages1.add(msg3);
+    selectMessages1.add(msg4);
+
+    msgSelectOutput.setMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_0"),
+        selectMessages0);
+    msgSelectOutput.setMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_1"),
+        selectMessages1);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
+
+    HelixTestUtil.runStage(event, throttleStage);
+
+    MessageOutput msgThrottleOutput =
+        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+    List<Message> throttleMessages =
+        msgThrottleOutput.getMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_0"));
+    Assert.assertEquals(throttleMessages.size(), 1);
+    Assert.assertTrue(throttleMessages.contains(msg1));
+
+    throttleMessages = msgThrottleOutput.getMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_1"));
+    Assert.assertEquals(throttleMessages.size(), 1);
+    Assert.assertTrue(throttleMessages.contains(msg4));
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+
   private boolean containsConstraint(Set<ConstraintItem> constraints, ConstraintItem
constraint) {
     for (ConstraintItem item : constraints) {
       if (item.toString().equals(constraint.toString())) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9ddbefca/helix-core/src/test/java/org/apache/helix/testutil/HelixTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/testutil/HelixTestUtil.java b/helix-core/src/test/java/org/apache/helix/testutil/HelixTestUtil.java
index 32106e1..edab4c4 100644
--- a/helix-core/src/test/java/org/apache/helix/testutil/HelixTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/testutil/HelixTestUtil.java
@@ -30,6 +30,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.Stage;
@@ -237,4 +238,11 @@ public class HelixTestUtil {
     msg.setTgtName(tgtName);
     return msg;
   }
+
+  public static Message newMessage(MessageType type, MessageId msgId, String fromState,
+      String toState, String resourceName, String tgtName, String partitionId) {
+    Message msg = newMessage(type, msgId, fromState, toState, resourceName, tgtName);
+    msg.setPartitionId(PartitionId.from(partitionId));
+    return msg;
+  }
 }


Mime
View raw message