helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject helix git commit: [HELIX-574] fix bucketize resource bug in current state carryover, rb=31970
Date Thu, 12 Mar 2015 07:31:54 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x d704af442 -> 3a4ff21b4


[HELIX-574] fix bucketize resource bug in current state carryover, rb=31970


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3a4ff21b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3a4ff21b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3a4ff21b

Branch: refs/heads/helix-0.6.x
Commit: 3a4ff21b4424be359205d53e16e9504217bcae8f
Parents: d704af4
Author: zzhang <zzhang5@uci.edu>
Authored: Thu Mar 12 00:31:28 2015 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Thu Mar 12 00:31:28 2015 -0700

----------------------------------------------------------------------
 .../controller/rebalancer/CustomRebalancer.java |   4 +-
 .../util/ConstraintBasedAssignment.java         |   4 +-
 .../manager/zk/ParticipantManagerHelper.java    |  41 ++-
 .../helix/manager/zk/ZKHelixDataAccessor.java   |  10 +
 .../integration/TestBucketizedResource.java     | 100 ++++++
 .../helix/integration/TestSchedulerMessage.java | 330 +------------------
 .../integration/TestSchedulerMessage2.java      | 137 ++++++++
 .../integration/TestSchedulerMsgContraints.java | 190 +++++++++++
 .../integration/TestSchedulerMsgUsingQueue.java | 133 ++++++++
 9 files changed, 613 insertions(+), 336 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 0f3fbc4..21ad2ff 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -104,8 +104,8 @@ public class CustomRebalancer implements Rebalancer, MappingCalculator {
     if (currentStateMap != null) {
       for (String instance : currentStateMap.keySet()) {
         if ((idealStateMap == null || !idealStateMap.containsKey(instance))
-            && !disabledInstancesForPartition.contains(instance) && isResourceEnabled) {
-          // if dropped and not disabled, transit to DROPPED
+            && !disabledInstancesForPartition.contains(instance)) {
+          // if dropped (whether disabled or not), transit to DROPPED
           instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
         } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
             HelixDefinedState.ERROR.name()))

http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index bab357b..a520803 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -80,8 +80,8 @@ public class ConstraintBasedAssignment {
     if (currentStateMap != null) {
       for (String instance : currentStateMap.keySet()) {
         if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
-            && !disabledInstancesForPartition.contains(instance) && isResourceEnabled) {
-          // if dropped and not disabled, transit to DROPPED
+            && !disabledInstancesForPartition.contains(instance)) {
+          // if dropped (whether disabled or not), transit to DROPPED
           instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
         } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
             HelixDefinedState.ERROR.name()))

http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index 1bee2fe..36a669a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -20,11 +20,15 @@ package org.apache.helix.manager.zk;
  */
 
 import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixException;
@@ -33,8 +37,10 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.LiveInstanceInfoProvider;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordBucketizer;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CurrentState.CurrentStateProperty;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -242,12 +248,41 @@ public class ParticipantManagerHelper {
         StateModelDefinition stateModel =
             _dataAccessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
 
+        BaseDataAccessor<ZNRecord> baseAccessor = _dataAccessor.getBaseDataAccessor();
         String curStatePath =
             _keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName())
                 .getPath();
-        _dataAccessor.getBaseDataAccessor().update(curStatePath,
-            new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState),
-            AccessOption.PERSISTENT);
+
+        String initState = stateModel.getInitialState();
+        if (lastCurState.getBucketSize() > 0) {
+          // update parent node
+          ZNRecord metaRecord = new ZNRecord(lastCurState.getId());
+          metaRecord.setSimpleFields(lastCurState.getRecord().getSimpleFields());
+          DataUpdater<ZNRecord> metaRecordUpdater =
+              new CurStateCarryOverUpdater(_sessionId, initState, new CurrentState(metaRecord));
+          boolean success =
+              baseAccessor.update(curStatePath, metaRecordUpdater, AccessOption.PERSISTENT);
+          if (success) {
+            // update current state buckets
+            ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(lastCurState.getBucketSize());
+
+            Map<String, ZNRecord> map = bucketizer.bucketize(lastCurState.getRecord());
+            List<String> paths = new ArrayList<String>();
+            List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+            for (String bucketName : map.keySet()) {
+              paths.add(curStatePath + "/" + bucketName);
+              updaters.add(new CurStateCarryOverUpdater(_sessionId, initState, new CurrentState(map
+                  .get(bucketName))));
+            }
+
+            baseAccessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+          }
+
+        } else {
+          _dataAccessor.getBaseDataAccessor().update(curStatePath,
+              new CurStateCarryOverUpdater(_sessionId, initState, lastCurState),
+              AccessOption.PERSISTENT);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 8c9fc8d..ed434a1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -187,6 +187,11 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
 
           int bucketSize = property.getBucketSize();
           if (bucketSize > 0) {
+            // @see HELIX-574
+            // clean up list and map fields in case we write to parent node by mistake
+            property.getRecord().getMapFields().clear();
+            property.getRecord().getListFields().clear();
+
             List<ZNRecord> childRecords = _baseDataAccessor.getChildren(path, null, options);
             ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
 
@@ -239,6 +244,11 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
 
         int bucketSize = property.getBucketSize();
         if (bucketSize > 0) {
+          // @see HELIX-574
+          // clean up list and map fields in case we write to parent node by mistake
+          property.getRecord().getMapFields().clear();
+          property.getRecord().getListFields().clear();
+
           List<ZNRecord> childRecords = _baseDataAccessor.getChildren(path, null, options);
           ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
index 2b5e2bc..9b72e41 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
@@ -19,14 +19,18 @@ package org.apache.helix.integration;
  * under the License.
  */
 
+import java.util.Arrays;
 import java.util.Date;
+import java.util.List;
 
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.ExternalView;
@@ -34,6 +38,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -115,4 +120,99 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
+
+  @Test
+  public void testBounceDisableAndDrop() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    String dbName = "TestDB0";
+    int n = 5;
+    int r = 3;
+    List<String> instanceNames =
+        Arrays.asList("localhost_0", "localhost_1", "localhost_2", "localhost_3", "localhost_4");
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // create cluster and add nodes to cluster
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    _gSetupTool.addCluster(clusterName, true);
+    _gSetupTool.addInstancesToCluster(clusterName,
+        instanceNames.toArray(new String[instanceNames.size()]));
+
+    // add a bucketized resource
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    Builder keyBuilder = accessor.keyBuilder();
+    ZNRecord idealStateRec =
+        DefaultIdealStateCalculator.calculateIdealState(instanceNames, 10, r - 1, dbName, "MASTER",
+            "SLAVE");
+    IdealState idealState = new IdealState(idealStateRec);
+    idealState.setBucketSize(2);
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    idealState.setReplicas(Integer.toString(r));
+    accessor.setProperty(keyBuilder.idealStates(dbName), idealState);
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    for (int i = 0; i < n; i++) {
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(i));
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // bounce
+    participants[0].syncStop();
+    participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(0));
+    participants[0].syncStart();
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // make sure participants[0]'s current state is bucketzied correctly during carryover
+    String path =
+        keyBuilder.currentState(instanceNames.get(0), participants[0].getSessionId(), dbName)
+            .getPath();
+    ZNRecord record = baseAccessor.get(path, null, 0);
+    Assert.assertTrue(record.getMapFields().size() == 0);
+
+    // disable the bucketize resource
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.enableResource(clusterName, dbName, false);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // drop the bucketize resource
+    _gSetupTool.dropResourceFromCluster(clusterName, dbName);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // make sure external-view is cleaned up
+    path = keyBuilder.externalView(dbName).getPath();
+    result = baseAccessor.exists(path, 0);
+    Assert.assertFalse(result);
+
+    // clean up
+    controller.syncStop();
+    for (MockParticipantManager participant : participants) {
+      participant.syncStop();
+    }
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 70713f3..ef2e18d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -19,7 +19,6 @@ package org.apache.helix.integration;
  * under the License.
  */
 
-import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
@@ -55,8 +54,6 @@ import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
 import org.testng.Assert;
@@ -64,7 +61,7 @@ import org.testng.annotations.Test;
 
 public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
 
-  class MockAsyncCallback extends AsyncCallback {
+  public static class MockAsyncCallback extends AsyncCallback {
     Message _message;
 
     public MockAsyncCallback() {
@@ -189,95 +186,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
   }
 
   @Test()
-  public void testSchedulerMsgUsingQueue() throws Exception {
-    Logger.getRootLogger().setLevel(Level.INFO);
-    _factory._results.clear();
-    Thread.sleep(2000);
-    HelixManager manager = null;
-    for (int i = 0; i < NODE_NR; i++) {
-      _participants[i].getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
-
-      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
-    }
-
-    Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
-    schedulerMessage.setTgtSessionId("*");
-    schedulerMessage.setTgtName("CONTROLLER");
-    // TODO: change it to "ADMIN" ?
-    schedulerMessage.setSrcName("CONTROLLER");
-    schedulerMessage.getRecord().setSimpleField(
-        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
-    // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), "Template");
-    msg.setTgtSessionId("*");
-    msg.setMsgState(MessageState.NEW);
-
-    // Criteria to send individual messages
-    Criteria cr = new Criteria();
-    cr.setInstanceName("localhost_%");
-    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    cr.setSessionSpecific(false);
-    cr.setResource("%");
-    cr.setPartition("%");
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    StringWriter sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    String crString = sw.toString();
-
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
-    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    helixDataAccessor.createProperty(keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
-        schedulerMessage);
-
-    for (int i = 0; i < 30; i++) {
-      Thread.sleep(2000);
-      if (_PARTITIONS == _factory._results.size()) {
-        break;
-      }
-    }
-
-    Assert.assertEquals(_PARTITIONS, _factory._results.size());
-    PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
-            schedulerMessage.getMsgId());
-
-    int messageResultCount = 0;
-    for (int i = 0; i < 10; i++) {
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
-          .equals("" + (_PARTITIONS * 3)));
-      for (String key : statusUpdate.getMapFields().keySet()) {
-        if (key.startsWith("MessageResult ")) {
-          messageResultCount++;
-        }
-      }
-      if (messageResultCount == _PARTITIONS * 3) {
-        break;
-      } else {
-        Thread.sleep(2000);
-      }
-    }
-    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-    int count = 0;
-    for (Set<String> val : _factory._results.values()) {
-      count += val.size();
-    }
-    Assert.assertEquals(count, _PARTITIONS * 3);
-
-  }
-
-  @Test()
   public void testSchedulerMsg() throws Exception {
     Logger.getRootLogger().setLevel(Level.INFO);
     _factory._results.clear();
@@ -418,98 +326,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
   }
 
   @Test()
-  public void testSchedulerMsg2() throws Exception {
-    _factory._results.clear();
-    Thread.sleep(2000);
-    HelixManager manager = null;
-    for (int i = 0; i < NODE_NR; i++) {
-      _participants[i].getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
-
-      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
-    }
-
-    Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
-    schedulerMessage.setTgtSessionId("*");
-    schedulerMessage.setTgtName("CONTROLLER");
-    // TODO: change it to "ADMIN" ?
-    schedulerMessage.setSrcName("CONTROLLER");
-
-    // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), "Template");
-    msg.setTgtSessionId("*");
-    msg.setMsgState(MessageState.NEW);
-
-    // Criteria to send individual messages
-    Criteria cr = new Criteria();
-    cr.setInstanceName("localhost_%");
-    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    cr.setSessionSpecific(false);
-    cr.setResource("%");
-    cr.setPartition("%");
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    StringWriter sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    String crString = sw.toString();
-
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
-    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
-
-    Criteria cr2 = new Criteria();
-    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
-    cr2.setInstanceName("*");
-    cr2.setSessionSpecific(false);
-
-    schedulerMessage.getRecord().setSimpleField(
-        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg2");
-    MockAsyncCallback callback = new MockAsyncCallback();
-    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
-    String msgId =
-        callback._message.getResultMap()
-            .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
-
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    for (int i = 0; i < 10; i++) {
-      Thread.sleep(200);
-      PropertyKey controllerTaskStatus =
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      if (statusUpdate.getMapFields().containsKey("Summary")) {
-        break;
-      }
-    }
-
-    Assert.assertEquals(_PARTITIONS, _factory._results.size());
-    PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
-        .equals("" + (_PARTITIONS * 3)));
-    int messageResultCount = 0;
-    for (String key : statusUpdate.getMapFields().keySet()) {
-      if (key.startsWith("MessageResult ")) {
-        messageResultCount++;
-      }
-    }
-    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-
-    int count = 0;
-    for (Set<String> val : _factory._results.values()) {
-      count += val.size();
-    }
-    Assert.assertEquals(count, _PARTITIONS * 3);
-  }
-
-  @Test()
   public void testSchedulerZeroMsg() throws Exception {
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
     HelixManager manager = null;
@@ -849,148 +665,4 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
     // System.out.println(count);
     Assert.assertEquals(count, _PARTITIONS * 3 * 2);
   }
-
-  @Test
-  public void testSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException,
-      IOException, InterruptedException {
-    TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
-    HelixManager manager = null;
-    for (int i = 0; i < NODE_NR; i++) {
-      _participants[i].getMessagingService().registerMessageHandlerFactory(
-          factory.getMessageType(), factory);
-
-      _participants[i].getMessagingService().registerMessageHandlerFactory(
-          factory.getMessageType(), factory);
-
-      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
-    }
-
-    Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
-    schedulerMessage.setTgtSessionId("*");
-    schedulerMessage.setTgtName("CONTROLLER");
-    // TODO: change it to "ADMIN" ?
-    schedulerMessage.setSrcName("CONTROLLER");
-
-    // Template for the individual message sent to each participant
-    Message msg = new Message(factory.getMessageType(), "Template");
-    msg.setTgtSessionId("*");
-    msg.setMsgState(MessageState.NEW);
-
-    // Criteria to send individual messages
-    Criteria cr = new Criteria();
-    cr.setInstanceName("localhost_%");
-    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    cr.setSessionSpecific(false);
-    cr.setResource("%");
-    cr.setPartition("%");
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    StringWriter sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    String crString = sw.toString();
-
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
-    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
-    schedulerMessage.getRecord().setSimpleField(
-        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
-
-    Criteria cr2 = new Criteria();
-    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
-    cr2.setInstanceName("*");
-    cr2.setSessionSpecific(false);
-
-    MockAsyncCallback callback = new MockAsyncCallback();
-    mapper = new ObjectMapper();
-    serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-
-    // Set contraints that only 1 msg per participant
-    Map<String, String> constraints = new TreeMap<String, String>();
-    constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
-    constraints.put("TRANSITION", "OFFLINE-COMPLETED");
-    constraints.put("CONSTRAINT_VALUE", "1");
-    constraints.put("INSTANCE", ".*");
-    manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
-        ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints));
-
-    // Send scheduler message
-    crString = sw.toString();
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
-    String msgId =
-        callback._message.getResultMap()
-            .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
-
-    for (int j = 0; j < 10; j++) {
-      Thread.sleep(200);
-      PropertyKey controllerTaskStatus =
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
-        Assert.assertEquals(
-            statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""
-                + (_PARTITIONS * 3));
-        break;
-      }
-    }
-
-    for (int i = 0; i < _PARTITIONS * 3 / 5; i++) {
-      for (int j = 0; j < 10; j++) {
-        Thread.sleep(300);
-        if (factory._messageCount == 5 * (i + 1))
-          break;
-      }
-      Thread.sleep(300);
-      Assert.assertEquals(factory._messageCount, 5 * (i + 1));
-      factory.signal();
-      // System.err.println(i);
-    }
-
-    for (int j = 0; j < 10; j++) {
-      Thread.sleep(200);
-      PropertyKey controllerTaskStatus =
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      if (statusUpdate.getMapFields().containsKey("Summary")) {
-        break;
-      }
-    }
-
-    Assert.assertEquals(_PARTITIONS, factory._results.size());
-    PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
-        .equals("" + (_PARTITIONS * 3)));
-    int messageResultCount = 0;
-    for (String key : statusUpdate.getMapFields().keySet()) {
-      if (key.startsWith("MessageResult ")) {
-        messageResultCount++;
-      }
-    }
-    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-
-    int count = 0;
-    for (Set<String> val : factory._results.values()) {
-      count += val.size();
-    }
-    Assert.assertEquals(count, _PARTITIONS * 3);
-
-    manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
-        ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java
new file mode 100644
index 0000000..b0ee961
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java
@@ -0,0 +1,137 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.StringWriter;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSchedulerMessage2 extends ZkStandAloneCMTestBase {
+  TestSchedulerMessage.TestMessagingHandlerFactory _factory =
+      new TestSchedulerMessage.TestMessagingHandlerFactory();
+
+  @Test()
+  public void testSchedulerMsg2() throws Exception {
+    _factory._results.clear();
+    Thread.sleep(2000);
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++) {
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
+          _factory.getMessageType(), _factory);
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage =
+        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+
+    // Template for the individual message sent to each participant
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+
+    Criteria cr2 = new Criteria();
+    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+    cr2.setInstanceName("*");
+    cr2.setSessionSpecific(false);
+
+    schedulerMessage.getRecord().setSimpleField(
+        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg2");
+    TestSchedulerMessage.MockAsyncCallback callback = new TestSchedulerMessage.MockAsyncCallback();
+    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+    String msgId =
+        callback._message.getResultMap()
+            .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+    for (int i = 0; i < 10; i++) {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus =
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      if (statusUpdate.getMapFields().containsKey("Summary")) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(_PARTITIONS, _factory._results.size());
+    PropertyKey controllerTaskStatus =
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+        .equals("" + (_PARTITIONS * 3)));
+    int messageResultCount = 0;
+    for (String key : statusUpdate.getMapFields().keySet()) {
+      if (key.startsWith("MessageResult ")) {
+        messageResultCount++;
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+
+    int count = 0;
+    for (Set<String> val : _factory._results.values()) {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
new file mode 100644
index 0000000..51a225e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
@@ -0,0 +1,190 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSchedulerMsgContraints extends ZkStandAloneCMTestBase {
+
+  @Test
+  public void testSchedulerMsgContraints() throws Exception {
+    TestSchedulerMessage.TestMessagingHandlerFactoryLatch factory =
+        new TestSchedulerMessage.TestMessagingHandlerFactoryLatch();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++) {
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
+          factory.getMessageType(), factory);
+
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
+          factory.getMessageType(), factory);
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage =
+        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+
+    // Template for the individual message sent to each participant
+    Message msg = new Message(factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+    schedulerMessage.getRecord().setSimpleField(
+        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
+
+    Criteria cr2 = new Criteria();
+    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+    cr2.setInstanceName("*");
+    cr2.setSessionSpecific(false);
+
+    TestSchedulerMessage.MockAsyncCallback callback = new TestSchedulerMessage.MockAsyncCallback();
+    mapper = new ObjectMapper();
+    serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+    // Set constraints that only 1 message per participant
+    Map<String, String> constraints = new TreeMap<String, String>();
+    constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+    constraints.put("TRANSITION", "OFFLINE-COMPLETED");
+    constraints.put("CONSTRAINT_VALUE", "1");
+    constraints.put("INSTANCE", ".*");
+    manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
+        ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints));
+
+    // Send scheduler message
+    crString = sw.toString();
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+    String msgId =
+        callback._message.getResultMap()
+            .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+    for (int j = 0; j < 10; j++) {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus =
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
+        Assert.assertEquals(
+            statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""
+                + (_PARTITIONS * 3));
+        break;
+      }
+    }
+
+    for (int i = 0; i < _PARTITIONS * 3 / 5; i++) {
+      for (int j = 0; j < 10; j++) {
+        Thread.sleep(300);
+        if (factory._messageCount == 5 * (i + 1))
+          break;
+      }
+      Thread.sleep(300);
+      Assert.assertEquals(factory._messageCount, 5 * (i + 1));
+      factory.signal();
+      // System.err.println(i);
+    }
+
+    for (int j = 0; j < 10; j++) {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus =
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      if (statusUpdate.getMapFields().containsKey("Summary")) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(_PARTITIONS, factory._results.size());
+    PropertyKey controllerTaskStatus =
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+        .equals("" + (_PARTITIONS * 3)));
+    int messageResultCount = 0;
+    for (String key : statusUpdate.getMapFields().keySet()) {
+      if (key.startsWith("MessageResult ")) {
+        messageResultCount++;
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+
+    int count = 0;
+    for (Set<String> val : factory._results.values()) {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+
+    manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
+        ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
new file mode 100644
index 0000000..d5b5680
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
@@ -0,0 +1,133 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.StringWriter;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSchedulerMsgUsingQueue extends ZkStandAloneCMTestBase {
+  TestSchedulerMessage.TestMessagingHandlerFactory _factory =
+      new TestSchedulerMessage.TestMessagingHandlerFactory();
+
+  @Test()
+  public void testSchedulerMsgUsingQueue() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    _factory._results.clear();
+    Thread.sleep(2000);
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++) {
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
+          _factory.getMessageType(), _factory);
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage =
+        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+    schedulerMessage.getRecord().setSimpleField(
+        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
+    // Template for the individual message sent to each participant
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+    helixDataAccessor.createProperty(keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
+        schedulerMessage);
+
+    for (int i = 0; i < 30; i++) {
+      Thread.sleep(2000);
+      if (_PARTITIONS == _factory._results.size()) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(_PARTITIONS, _factory._results.size());
+    PropertyKey controllerTaskStatus =
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+            schedulerMessage.getMsgId());
+
+    int messageResultCount = 0;
+    for (int i = 0; i < 10; i++) {
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+          .equals("" + (_PARTITIONS * 3)));
+      for (String key : statusUpdate.getMapFields().keySet()) {
+        if (key.startsWith("MessageResult ")) {
+          messageResultCount++;
+        }
+      }
+      if (messageResultCount == _PARTITIONS * 3) {
+        break;
+      } else {
+        Thread.sleep(2000);
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+    int count = 0;
+    for (Set<String> val : _factory._results.values()) {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+
+  }
+}


Mime
View raw message