helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [1/2] [HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server
Date Wed, 19 Mar 2014 02:06:39 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-monitoring db4c10a28 -> c0b1780dc


http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannMonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannMonitoringServer.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannMonitoringServer.java
new file mode 100644
index 0000000..63e0f65
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannMonitoringServer.java
@@ -0,0 +1,74 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.MonitoringServer;
+import org.apache.log4j.Logger;
+
+import clojure.lang.RT;
+import clojure.lang.Symbol;
+
+/**
+ * A monitoring server implementation that uses Riemann
+ */
+public class RiemannMonitoringServer implements MonitoringServer {
+  private static final Logger LOG = Logger.getLogger(RiemannMonitoringServer.class);
+
+  private volatile boolean _isStarted;
+  private final RiemannConfigs _config;
+
+  /**
+   * Create a monitoring server
+   * @param config
+   */
+  public RiemannMonitoringServer(RiemannConfigs config) {
+    LOG.info("Construct RiemannMonitoringServer with configDir: " + config.getConfigDir());
+    _config = config;
+    config.persistConfigs();
+    _isStarted = false;
+  }
+
+  @Override
+  public synchronized void start() {
+    LOG.info("Starting Riemann server with configDir: " + _config.getConfigDir());
+
+    // start Riemann
+    RT.var("clojure.core", "require").invoke(Symbol.intern("riemann.bin"));
+    RT.var("clojure.core", "require").invoke(Symbol.intern(RiemannConfigs.DEFAULT_RIEMANN_CONFIG));
+    RT.var("riemann.bin", "-main").invoke(_config.getConfigDir());
+    _isStarted = true;
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (!_isStarted) {
+      LOG.error("Tried to stop Riemann when not started!");
+      return;
+    }
+    LOG.info("Stopping Riemann server");
+    RT.var("riemann.config", "stop!").invoke();
+    _isStarted = false;
+  }
+
+  @Override
+  public boolean isStarted() {
+    return _isStarted;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java
deleted file mode 100644
index 9d28dc3..0000000
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.MonitoringTestHelper;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.alert.AlertAction;
-import org.apache.helix.controller.alert.AlertName;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.manager.zk.DefaultAlertMsgHandlerFactory;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.model.AlertConfig;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.MonitoringConfig;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.Lists;
-
-public class IntegrationTest extends ZkUnitTestBase {
-  @Test
-  public void testBasic() throws Exception {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    // Set up monitoring cluster
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-        "localhost", // participant name prefix
-        "MonitoringService", // resource name prefix
-        1, // resources
-        8, // partitions per resource
-        0, // number of nodes
-        1, // replicas
-        "OnlineOffline", // pick a built-in state model
-        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
-        true); // do rebalance
-
-    // Enable auto-join
-    HelixConfigScope scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
-    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
-    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
-
-    // Start controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
-    controller.syncStart();
-
-    // Start helix proxy
-    int proxyPort = MonitoringTestHelper.availableTcpPort();
-    final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-    final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
-    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    RiemannAlertProxy proxy = new RiemannAlertProxy(proxyPort, baseAccessor);
-    proxy.start();
-
-    // Start monitoring server
-    int riemannPort = MonitoringTestHelper.availableTcpPort();
-    MonitoringConfig riemannConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
-    riemannConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(riemannPort));
-
-    MonitoringConfig latencyCheckConfig = new MonitoringConfig("check_latency_config.clj");
-    latencyCheckConfig.setConfig(MonitoringTestHelper.getLatencyCheckConfigString(proxyPort));
-
-    // Set monitoring config on zk
-    accessor.setProperty(keyBuilder.monitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG),
-        riemannConfig);
-    accessor.setProperty(keyBuilder.monitoringConfig("check_latency_config.clj"),
-        latencyCheckConfig);
-
-    RiemannConfigs.Builder riemannConfigBuilder =
-        new RiemannConfigs.Builder().addConfigs(Lists.newArrayList(riemannConfig,
-            latencyCheckConfig));
-    RiemannMonitoringServer server = new RiemannMonitoringServer(riemannConfigBuilder.build());
-    server.start();
-
-    // Start Riemann agent
-    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, riemannPort);
-    agent.start();
-
-    // Check live-instance
-    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
-    Assert.assertNotNull(liveInstances);
-    Assert.assertEquals(liveInstances.size(), 1);
-
-    // Check external-view
-    boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                clusterName));
-    Assert.assertTrue(result);
-
-    // Setup mock storage cluster to be monitored
-    String storageClusterName = clusterName + "_storage";
-    TestHelper.setupCluster(storageClusterName, ZK_ADDR, 12918, // participant port
-        "localhost", // participant name prefix
-        "TestDB", // resource name prefix
-        1, // resources
-        8, // partitions per resource
-        2, // number of nodes
-        1, // replicas
-        "MasterSlave", // pick a built-in state model
-        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
-        true); // do rebalance
-
-    // Add alert config
-    AlertConfig alertConfig = new AlertConfig("default");
-    AlertName alertName =
-        new AlertName.Builder().cluster(ClusterId.from(storageClusterName)).metric("latency95")
-            .largerThan("1000").build();
-    AlertAction alertAction =
-        new AlertAction.Builder().cmd("enableInstance").args("{cluster}", "{node}", "false")
-            .build();
-    alertConfig.putConfig(alertName, alertAction);
-    final HelixDataAccessor storageAccessor =
-        new ZKHelixDataAccessor(storageClusterName, baseAccessor);
-    final PropertyKey.Builder storageKeyBuilder = storageAccessor.keyBuilder();
-    storageAccessor.setProperty(storageKeyBuilder.alertConfig("default"), alertConfig);
-
-    // Start another controller for mock storage cluster
-    ClusterControllerManager storageController =
-        new ClusterControllerManager(ZK_ADDR, storageClusterName, "controller");
-    MessageHandlerFactory fty = new DefaultAlertMsgHandlerFactory();
-    storageController.getMessagingService()
-        .registerMessageHandlerFactory(fty.getMessageType(), fty);
-    storageController.syncStart();
-
-    // Check localhost_12918 is enabled
-    InstanceConfig instanceConfig =
-        storageAccessor.getProperty(storageKeyBuilder.instanceConfig("localhost_12918"));
-    Assert.assertTrue(instanceConfig.getInstanceEnabled());
-
-    // Connect monitoring client
-    final RiemannMonitoringClient rclient =
-        new RiemannMonitoringClient(ZK_ADDR, ClusterId.from(clusterName),
-            ResourceId.from("MonitoringService0"), 1);
-    rclient.connect();
-
-    MonitoringEvent event =
-        new MonitoringEvent().participant(ParticipantId.from("localhost_12918"))
-            .name("LatencyReport").attribute("latency95", "" + 2)
-            .attribute("cluster", storageClusterName);
-    rclient.send(ResourceId.from("TestDB0"), event, false);
-
-    // Check localhost_12918 is disabled
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        InstanceConfig instanceConfig =
-            storageAccessor.getProperty(storageKeyBuilder.instanceConfig("localhost_12918"));
-        return instanceConfig.getInstanceEnabled() == false;
-      }
-    }, 10 * 1000);
-    Assert.assertTrue(result, "localhost_12918 should be disabled");
-
-    // Cleanup
-    rclient.disconnect();
-    storageController.syncStop();
-    controller.syncStop();
-
-    agent.shutdown();
-    server.stop();
-    proxy.shutdown();
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/MonitoringTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/MonitoringTestHelper.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/MonitoringTestHelper.java
new file mode 100644
index 0000000..cfaa787
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/MonitoringTestHelper.java
@@ -0,0 +1,136 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import org.I0Itec.zkclient.NetworkUtil;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.monitoring.riemann.RiemannConfigs;
+import org.apache.helix.monitoring.riemann.RiemannMonitoringServer;
+
+public class MonitoringTestHelper {
+  static final int MAX_PORT = 65535;
+
+  /**
+   * generate a default riemann.config
+   * @param riemannPort
+   * @return
+   */
+  public static String getRiemannConfigString(int riemannPort) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("(logging/init :file \"/dev/null\")\n\n")
+        .append("(tcp-server :host \"0.0.0.0\" :port " + riemannPort + ")\n\n")
+        .append("(instrumentation {:interval 1})\n\n")
+        .append("; (udp-server :host \"0.0.0.0\" :port " + riemannPort + ")\n")
+        .append("; (ws-server :host \"0.0.0.0\")\n")
+        .append("; (repl-server :host \"0.0.0.0\")\n\n")
+        .append("(periodically-expire 1)\n\n")
+        .append(
+            "(let [index (default :ttl 3 (update-index (index)))]\n  (streams\n    (expired prn)\n    index))\n");
+
+    return sb.toString();
+  }
+
+  /**
+   * generate a test config for checking latency
+   * @param proxyPort
+   * @return
+   */
+  public static String getLatencyCheckConfigString(String zkAddr) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("(require 'riemann.config)\n")
+        .append(
+            "(def alert-proxy (new org.apache.helix.monitoring.riemann.HelixAlertMessenger \""
+                + zkAddr + "\"))\n\n")
+        .append("(defn parse-double\n  \"Convert a string into a double\"\n  ")
+        .append("[instr]\n  (Double/parseDouble instr))\n\n")
+        .append(
+            "(defn check-95th-latency\n  \"Check if the 95th percentile latency is within expectations\"\n  ")
+        .append("[e]\n  (let [latency (parse-double (:latency95 e))]\n    ")
+        .append(
+            "(if (> latency 1.0) \n      ; Report if the 95th percentile latency exceeds 1.0s\n      ")
+        .append(
+            "(do (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency)\n      ")
+        .append(
+            "(let [alert-name-str (str \"(\" (:cluster e) \".%.\" (:host e) \")(latency95)>(1000)\" )]\n        ")
+        .append("(.onAlert alert-proxy alert-name-str))))))\n\n")
+        .append("(streams\n  (where\n    ; Only process services containing LatencyReport\n    ")
+        .append("(and (service #\".*LatencyReport.*\") (not (state \"expired\")))\n    ")
+        .append("check-95th-latency))\n");
+
+    return sb.toString();
+  }
+
+  /**
+   * find an available tcp port
+   * @return
+   */
+  public static int availableTcpPort() {
+    ServerSocket ss = null;
+    try {
+      ss = new ServerSocket(0);
+      ss.setReuseAddress(true);
+      return ss.getLocalPort();
+    } catch (IOException e) {
+      // ok
+    } finally {
+      if (ss != null) {
+        try {
+          ss.close();
+        } catch (IOException e) {
+          // should not be thrown
+        }
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * find the first available port starting from startPort inclusive
+   * @param startPort
+   * @return
+   */
+  public static int availableTcpPort(int startPort) {
+    int port = startPort;
+    for (; port <= MAX_PORT; port++) {
+      if (NetworkUtil.isPortFree(port))
+        break;
+    }
+
+    return port > MAX_PORT ? -1 : port;
+  }
+
+  public static RiemannMonitoringServer startRiemannServer(int port) {
+    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+    String configDir =
+        String.format("%s/%s_%d", System.getProperty("java.io.tmpdir"),
+            RiemannConfigs.DEFAULT_CONFIG_DIR, port);
+    RiemannConfigs.Builder builder =
+        new RiemannConfigs.Builder(configDir).addConfig(monitoringConfig);
+    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+    server.start();
+
+    return server;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
deleted file mode 100644
index 588bc35..0000000
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
+++ /dev/null
@@ -1,188 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.MonitoringTestHelper;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.MonitoringConfig;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.junit.Assert;
-import org.testng.annotations.Test;
-
-import com.aphyr.riemann.Proto.Event;
-import com.aphyr.riemann.client.RiemannClient;
-
-public class TestClientServerMonitoring extends ZkUnitTestBase {
-  @Test
-  public void testMonitoring() throws Exception {
-    final int NUM_PARTICIPANTS = 0;
-    final int NUM_PARTITIONS = 8;
-    final int NUM_REPLICAS = 1;
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    // Set up monitoring cluster
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-        "localhost", // participant name prefix
-        "MonitoringService", // resource name prefix
-        1, // resources
-        NUM_PARTITIONS, // partitions per resource
-        NUM_PARTICIPANTS, // number of nodes
-        NUM_REPLICAS, // replicas
-        "OnlineOffline", // pick a built-in state model
-        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
-        true); // do rebalance
-
-    // Enable auto-join
-    HelixConfigScope scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
-    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
-    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
-
-    // Start controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
-    controller.syncStart();
-
-    // Start monitoring server
-    int port = MonitoringTestHelper.availableTcpPort();
-    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
-    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
-
-    RiemannConfigs.Builder builder = new RiemannConfigs.Builder().addConfig(monitoringConfig);
-    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
-    server.start();
-
-    // Start Riemann agent
-    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, port);
-    agent.start();
-
-    // Check live-instance
-    final HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
-    Assert.assertNotNull(liveInstances);
-    Assert.assertEquals(liveInstances.size(), 1);
-
-    // Check external-view
-    boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                clusterName));
-    Assert.assertTrue(result);
-
-    // Connect monitoring client
-    final RiemannMonitoringClient client =
-        new RiemannMonitoringClient(ZK_ADDR, ClusterId.from(clusterName),
-            ResourceId.from("MonitoringService0"), 1);
-    client.connect();
-
-    final RiemannClient rclient = RiemannClient.tcp("localhost", port);
-    rclient.connect();
-
-    // Test MonitoringEvent#send()
-    MonitoringEvent event = new MonitoringEvent().tag("test").ttl(5);
-    client.send(ResourceId.from("TestDB"), event, false);
-
-    // Check monitoring server has received the event with tag="test"
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        List<Event> events = rclient.query("tagged \"test\"");
-        return (events.size() == 1) && (events.get(0).getTagsCount() == 1)
-            && (events.get(0).getTags(0).equals("test"));
-      }
-    }, 5 * 1000);
-    Assert.assertTrue(result);
-
-    // Test MonitoringEvent#sendAndFlush()
-    MonitoringEvent event2 = new MonitoringEvent().tag("test2").ttl(5);
-    client.sendAndFlush(ResourceId.from("TestDB2"), event2);
-
-    // Check monitoring server has received the event with tag="test2"
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        List<Event> events = rclient.query("tagged \"test2\"");
-        return (events.size() == 1) && (events.get(0).getTagsCount() == 1)
-            && (events.get(0).getTags(0).equals("test2"));
-      }
-    }, 5 * 1000);
-    Assert.assertTrue(result);
-
-    // Test MonitoringEvent#every()
-    client.every(ResourceId.from("TestDB3"), 1, 0, TimeUnit.SECONDS, new Runnable() {
-
-      @Override
-      public void run() {
-        MonitoringEvent event3 =
-            new MonitoringEvent().tag("test3").resource(ResourceId.from("db" + System.currentTimeMillis())).ttl(5);
-        client.send(ResourceId.from("TestDB3"), event3, false);
-      }
-    });
-
-    // Check monitoring server has received at least 2 event2 with tag="test3"
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        List<Event> events = rclient.query("tagged \"test3\"");
-        return (events.size() > 2) && (events.get(0).getTagsCount() == 1)
-            && (events.get(0).getTags(0).equals("test3"));
-      }
-    }, 10 * 1000);
-    Assert.assertTrue(result);
-
-    // Stop client
-    client.disconnect();
-    rclient.disconnect();
-
-    // Stop controller
-    controller.syncStop();
-
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
deleted file mode 100644
index 39ece64..0000000
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.MonitoringTestHelper;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.MonitoringConfig;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.monitoring.RiemannConfigs.Builder;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestRiemannAgent extends ZkUnitTestBase {
-  @Test
-  public void testStartAndStop() throws Exception {
-    final int NUM_PARTICIPANTS = 0;
-    final int NUM_PARTITIONS = 4;
-    final int NUM_REPLICAS = 1;
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    // Set up monitoring cluster
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-        "localhost", // participant name prefix
-        "MonitoringService", // resource name prefix
-        1, // resources
-        NUM_PARTITIONS, // partitions per resource
-        NUM_PARTICIPANTS, // number of nodes
-        NUM_REPLICAS, // replicas
-        "OnlineOffline", // pick a built-in state model
-        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
-        true); // do rebalance
-
-    // Enable auto-join
-    HelixConfigScope scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
-    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
-    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
-
-    // start controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
-    controller.syncStart();
-
-    // Start monitoring server
-    int port = MonitoringTestHelper.availableTcpPort();
-    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
-    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
-
-    RiemannConfigs.Builder builder = new Builder().addConfig(monitoringConfig);
-    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
-    server.start();
-
-    // Start Riemann agent
-    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, port);
-    agent.start();
-
-    // Check live-instance
-    final HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
-    Assert.assertNotNull(liveInstances);
-    Assert.assertEquals(liveInstances.size(), 1);
-
-    // Check external-view
-    boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                clusterName));
-    Assert.assertTrue(result);
-
-    // Stop monitoring server
-    server.stop();
-
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
-        return liveInstances != null && liveInstances.size() == 0;
-      }
-    }, 15 * 1000);
-    Assert.assertTrue(result, "RiemannAgent should be disconnected if RiemannServer is stopped");
-
-    // Stop controller
-    controller.syncStop();
-
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java
deleted file mode 100644
index 700682a..0000000
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.MonitoringTestHelper;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.Attributes;
-import org.apache.helix.model.Message.MessageType;
-import org.eclipse.jetty.client.ContentExchange;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpExchange;
-import org.eclipse.jetty.io.ByteArrayBuffer;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestRiemannAlertProxy extends ZkUnitTestBase {
-  void sendAlert(int proxyPort, String alertNameStr) throws Exception {
-    HttpClient client = new HttpClient();
-    client.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
-    client.start();
-
-    ContentExchange exchange = new ContentExchange(true);
-    exchange.setMethod("POST");
-    exchange.setURL("http://localhost:" + proxyPort);
-    exchange.setRequestContent(new ByteArrayBuffer(alertNameStr));
-
-    client.send(exchange);
-
-    // Waits until the exchange is terminated
-    int exchangeState = exchange.waitForDone();
-    Assert.assertTrue(exchangeState == HttpExchange.STATUS_COMPLETED);
-
-    client.stop();
-  }
-
-  @Test
-  public void testBasic() throws Exception {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
-    admin.addCluster(clusterName);
-
-    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-    int proxyPort = MonitoringTestHelper.availableTcpPort();
-
-    RiemannAlertProxy proxy = new RiemannAlertProxy(proxyPort, baseAccessor);
-
-    proxy.start();
-
-    // Send a valid alert
-    String alertNameStr = String.format("(%s.%%.node1)(latency95)>(1000)", clusterName);
-    sendAlert(proxyPort, alertNameStr);
-
-    // Send an invalid alert
-    String inValidAlertNameStr = "IGNORABLE: invalid alert";
-    sendAlert(proxyPort, inValidAlertNameStr);
-
-    // Check only 1 alert controller message is sent
-    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    List<Message> messages = accessor.getChildValues(keyBuilder.controllerMessages());
-
-    Assert.assertEquals(messages.size(), 1);
-    Message message = messages.get(0);
-    Assert.assertEquals(message.getMsgType(), MessageType.ALERT.toString());
-    Assert.assertEquals(message.getAttribute(Attributes.ALERT_NAME), alertNameStr);
-
-    proxy.shutdown();
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
deleted file mode 100644
index f7d0585..0000000
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.util.Date;
-
-import org.apache.helix.MonitoringTestHelper;
-import org.apache.helix.TestHelper;
-import org.apache.helix.model.MonitoringConfig;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.aphyr.riemann.client.RiemannClient;
-
-public class TestRiemannMonitoringServer {
-
-  @Test
-  public void testBasic() throws IOException {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String testName = className + "_" + methodName;
-
-    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
-
-    int port = MonitoringTestHelper.availableTcpPort();
-    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
-    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
-
-    RiemannConfigs.Builder builder = new RiemannConfigs.Builder().addConfig(monitoringConfig);
-    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
-
-    // Check server starts
-    server.start();
-    Assert.assertTrue(server.isStarted());
-
-    RiemannClient rclient = null;
-    try {
-      rclient = RiemannClient.tcp("localhost", port);
-      rclient.connect();
-    } catch (IOException e) {
-      Assert.fail("Riemann server should start on port: " + port);
-    }
-
-    // Check server stops
-    Assert.assertNotNull(rclient);
-    rclient.disconnect();
-    server.stop();
-    Assert.assertFalse(server.isStarted());
-
-    try {
-      rclient = RiemannClient.tcp("localhost", port);
-      rclient.connect();
-      Assert.fail("Riemann server should be stopped on port: " + port);
-    } catch (IOException e) {
-      // ok
-    }
-
-    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/IntegrationTest.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/IntegrationTest.java
new file mode 100644
index 0000000..f356d18
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/IntegrationTest.java
@@ -0,0 +1,199 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.controller.alert.AlertAction;
+import org.apache.helix.controller.alert.AlertName;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.DefaultAlertMsgHandlerFactory;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.AlertConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.monitoring.MonitoringEvent;
+import org.apache.helix.monitoring.MonitoringTestHelper;
+import org.apache.helix.monitoring.riemann.RiemannAgent;
+import org.apache.helix.monitoring.riemann.RiemannConfigs;
+import org.apache.helix.monitoring.riemann.RiemannMonitoringServer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+public class IntegrationTest extends ZkUnitTestBase {
+  @Test
+  public void testBasic() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up monitoring cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "MonitoringService", // resource name prefix
+        1, // resources
+        8, // partitions per resource
+        0, // number of nodes
+        1, // replicas
+        "OnlineOffline", // pick a built-in state model
+        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+        true); // do rebalance
+
+    // Enable auto-join
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+    // Start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // Start helix proxy
+    final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // Start monitoring server
+    int riemannPort = MonitoringTestHelper.availableTcpPort();
+    MonitoringConfig riemannConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    riemannConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(riemannPort));
+
+    MonitoringConfig latencyCheckConfig = new MonitoringConfig("check_latency_config.clj");
+    latencyCheckConfig.setConfig(MonitoringTestHelper.getLatencyCheckConfigString(ZK_ADDR));
+
+    // Set monitoring config on zk
+    accessor.setProperty(keyBuilder.monitoringConfig(riemannConfig.getId()),
+        riemannConfig);
+    accessor.setProperty(keyBuilder.monitoringConfig(latencyCheckConfig.getId()),
+        latencyCheckConfig);
+
+    RiemannConfigs.Builder riemannConfigBuilder =
+        new RiemannConfigs.Builder().addConfigs(Lists.newArrayList(riemannConfig,
+            latencyCheckConfig));
+    RiemannMonitoringServer server = new RiemannMonitoringServer(riemannConfigBuilder.build());
+    server.start();
+
+    // Start Riemann agent
+    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, riemannPort);
+    agent.start();
+
+    // Check live-instance
+    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+    Assert.assertNotNull(liveInstances);
+    Assert.assertEquals(liveInstances.size(), 1);
+
+    boolean result;
+
+    // Setup mock storage cluster to be monitored
+    String storageClusterName = clusterName + "_storage";
+    TestHelper.setupCluster(storageClusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        8, // partitions per resource
+        2, // number of nodes
+        1, // replicas
+        "MasterSlave", // pick a built-in state model
+        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+        true); // do rebalance
+
+    // Add alert config
+    AlertConfig alertConfig = new AlertConfig("default");
+    AlertName alertName =
+        new AlertName.Builder().cluster(ClusterId.from(storageClusterName)).metric("latency95")
+            .largerThan("1000").build();
+    AlertAction alertAction =
+        new AlertAction.Builder().cmd("enableInstance").args("{cluster}", "{node}", "false")
+            .build();
+    alertConfig.putConfig(alertName, alertAction);
+    final HelixDataAccessor storageAccessor =
+        new ZKHelixDataAccessor(storageClusterName, baseAccessor);
+    final PropertyKey.Builder storageKeyBuilder = storageAccessor.keyBuilder();
+    storageAccessor.setProperty(storageKeyBuilder.alertConfig("default"), alertConfig);
+
+    // Start another controller for mock storage cluster
+    ClusterControllerManager storageController =
+        new ClusterControllerManager(ZK_ADDR, storageClusterName, "controller");
+    MessageHandlerFactory fty = new DefaultAlertMsgHandlerFactory();
+    storageController.getMessagingService()
+        .registerMessageHandlerFactory(fty.getMessageType(), fty);
+    storageController.syncStart();
+
+    // Check localhost_12918 is enabled
+    InstanceConfig instanceConfig =
+        storageAccessor.getProperty(storageKeyBuilder.instanceConfig("localhost_12918"));
+    Assert.assertTrue(instanceConfig.getInstanceEnabled());
+
+    // Connect monitoring client
+    final RiemannClientWrapper rclient =
+        new RiemannClientWrapper(Arrays.asList("localhost:" + riemannPort));
+    rclient.connect();
+
+    MonitoringEvent event =
+        new MonitoringEvent().participant(ParticipantId.from("localhost_12918"))
+            .name("LatencyReport").attribute("latency95", "" + 2)
+            .attribute("cluster", storageClusterName);
+    rclient.send(event);
+
+    // Check localhost_12918 is disabled
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        InstanceConfig instanceConfig =
+            storageAccessor.getProperty(storageKeyBuilder.instanceConfig("localhost_12918"));
+        return instanceConfig.getInstanceEnabled() == false;
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result, "localhost_12918 should be disabled");
+
+    // Cleanup
+    rclient.disconnect();
+    storageController.syncStop();
+    controller.syncStop();
+
+    agent.shutdown();
+    server.stop();
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestClientServerMonitoring.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestClientServerMonitoring.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestClientServerMonitoring.java
new file mode 100644
index 0000000..0f1fb5f
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestClientServerMonitoring.java
@@ -0,0 +1,181 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.monitoring.MonitoringEvent;
+import org.apache.helix.monitoring.MonitoringTestHelper;
+import org.apache.helix.monitoring.riemann.RiemannAgent;
+import org.apache.helix.monitoring.riemann.RiemannMonitoringServer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.aphyr.riemann.Proto.Event;
+import com.aphyr.riemann.client.RiemannClient;
+
+public class TestClientServerMonitoring extends ZkUnitTestBase {
+  @Test
+  public void test() throws Exception {
+    final int NUM_PARTICIPANTS = 0;
+    final int NUM_PARTITIONS = 8;
+    final int NUM_REPLICAS = 1;
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up monitoring cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "MonitoringService", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "OnlineOffline", // pick a built-in state model
+        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+        true); // do rebalance
+
+    // Enable auto-join
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+    // Start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // Start monitoring server
+    int port = MonitoringTestHelper.availableTcpPort();
+    RiemannMonitoringServer server = MonitoringTestHelper.startRiemannServer(port);
+
+    // Start Riemann agent
+    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, port);
+    agent.start();
+
+    // Check live-instance
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+    Assert.assertNotNull(liveInstances);
+    Assert.assertEquals(liveInstances.size(), 1);
+
+    // Connect monitoring client
+    final RiemannClientWrapper client =
+        new RiemannClientWrapper(Arrays.asList("localhost:" + port));
+    client.connect();
+
+    final RiemannClient rclient = RiemannClient.tcp("localhost", port);
+    rclient.connect();
+
+    // Test MonitoringEvent#send()
+    MonitoringEvent event = new MonitoringEvent().tag("test").ttl(5);
+    boolean result = client.send(event);
+    Assert.assertTrue(result);
+
+    // Check monitoring server has received the event with tag="test"
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test\"");
+        System.out.println("events=" + events);
+        return (events != null) && (events.size() == 1) && (events.get(0).getTagsCount() == 1)
+            && (events.get(0).getTags(0).equals("test"));
+      }
+    }, 5 * 1000);
+    System.out.println("result=" + result);
+    Assert.assertTrue(result);
+
+    // Test MonitoringEvent#sendAndFlush()
+    MonitoringEvent event2 = new MonitoringEvent().tag("test2").ttl(5);
+    client.sendAndFlush(event2);
+
+    // Check monitoring server has received the event with tag="test2"
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test2\"");
+        return (events != null) && (events.size() == 1) && (events.get(0).getTagsCount() == 1)
+            && (events.get(0).getTags(0).equals("test2"));
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(result);
+
+    // Test MonitoringEvent#every()
+    client.every(1, 0, TimeUnit.SECONDS, new Runnable() {
+
+      @Override
+      public void run() {
+        MonitoringEvent event3 =
+            new MonitoringEvent().tag("test3").resource(ResourceId.from("db" + System.currentTimeMillis())).ttl(5);
+        client.send(event3);
+      }
+    });
+
+    // Check monitoring server has received at least 2 event2 with tag="test3"
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test3\"");
+        return (events.size() > 2) && (events.get(0).getTagsCount() == 1)
+            && (events.get(0).getTags(0).equals("test3"));
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result);
+
+    // Stop client
+    client.disconnect();
+    rclient.disconnect();
+
+    // Stop controller
+    controller.syncStop();
+
+    server.stop();
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestHelixAlertMessenger.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestHelixAlertMessenger.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestHelixAlertMessenger.java
new file mode 100644
index 0000000..587e6f8
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestHelixAlertMessenger.java
@@ -0,0 +1,110 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.monitoring.riemann.HelixAlertMessenger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestHelixAlertMessenger extends ZkUnitTestBase {
+
+  @Test
+  public void testBasic() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName);
+
+    HelixAlertMessenger messenger = new HelixAlertMessenger(ZK_ADDR);
+
+    // Send a valid alert
+    String alertNameStr = String.format("(%s.%%.node1)(latency95)>(1000)", clusterName);
+    messenger.onAlert(alertNameStr);
+
+    // Send an invalid alert
+    String inValidAlertNameStr = "IGNORABLE: invalid alert";
+    messenger.onAlert(inValidAlertNameStr);
+
+    // Check only 1 alert controller message is sent
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<Message> messages = accessor.getChildValues(keyBuilder.controllerMessages());
+
+    Assert.assertEquals(messages.size(), 1);
+    Message message = messages.get(0);
+    Assert.assertEquals(message.getMsgType(), MessageType.ALERT.toString());
+    Assert.assertEquals(message.getAttribute(Attributes.ALERT_NAME), alertNameStr);
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testThrottle() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName);
+
+    HelixAlertMessenger messenger = new HelixAlertMessenger(ZK_ADDR);
+
+    long startT = System.currentTimeMillis();
+    String alertNameStr = String.format("(%s.%%.node1)(latency95)>(1000)", clusterName);
+    for (int i = 0; i < 10; i++) {
+      messenger.onAlert(alertNameStr);
+    }
+    long seconds = (System.currentTimeMillis() - startT) / 1000 + 1;
+
+    // Check no more than 1 alert per second
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<Message> messages = accessor.getChildValues(keyBuilder.controllerMessages());
+
+    Assert.assertTrue(messages.size() <= seconds, "Should not receive more than " + seconds
+        + " messages");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannAgent.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannAgent.java
new file mode 100644
index 0000000..11e411c
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannAgent.java
@@ -0,0 +1,117 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.monitoring.MonitoringTestHelper;
+import org.apache.helix.monitoring.riemann.RiemannAgent;
+import org.apache.helix.monitoring.riemann.RiemannMonitoringServer;
+import org.apache.helix.monitoring.riemann.RiemannConfigs.Builder;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestRiemannAgent extends ZkUnitTestBase {
+  @Test
+  public void testStartAndStop() throws Exception {
+    final int NUM_PARTICIPANTS = 0;
+    final int NUM_PARTITIONS = 4;
+    final int NUM_REPLICAS = 1;
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up monitoring cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "MonitoringService", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "OnlineOffline", // pick a built-in state model
+        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+        true); // do rebalance
+
+    // Enable auto-join
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // Start monitoring server
+    int port = MonitoringTestHelper.availableTcpPort();
+    RiemannMonitoringServer server = MonitoringTestHelper.startRiemannServer(port);
+
+    // Start Riemann agent
+    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, port);
+    agent.start();
+
+    // Check live-instance
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+    Assert.assertNotNull(liveInstances);
+    Assert.assertEquals(liveInstances.size(), 1);
+
+    // Stop monitoring server
+    server.stop();
+
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+        return liveInstances != null && liveInstances.size() == 0;
+      }
+    }, 15 * 1000);
+    Assert.assertTrue(result, "RiemannAgent should be disconnected if RiemannServer is stopped");
+
+    agent.shutdown();
+    controller.syncStop();
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannClientWrapper.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannClientWrapper.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannClientWrapper.java
new file mode 100644
index 0000000..24af69e
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannClientWrapper.java
@@ -0,0 +1,134 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.monitoring.MonitoringEvent;
+import org.apache.helix.monitoring.MonitoringTestHelper;
+import org.apache.helix.monitoring.riemann.RawRiemannClient.State;
+import org.apache.helix.monitoring.riemann.RiemannConfigs;
+import org.apache.helix.monitoring.riemann.RiemannMonitoringServer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.aphyr.riemann.Proto.Event;
+import com.aphyr.riemann.client.RiemannClient;
+
+public class TestRiemannClientWrapper {
+
+  @Test
+  public void testBasic() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    int port = MonitoringTestHelper.availableTcpPort();
+
+    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+    RiemannConfigs.Builder builder = new RiemannConfigs.Builder().addConfig(monitoringConfig);
+    RiemannConfigs config = builder.build();
+    System.out.println("configDir: " + config.getConfigDir());
+    RiemannMonitoringServer server = new RiemannMonitoringServer(config);
+    server.start();
+    Assert.assertTrue(server.isStarted());
+
+    // create client
+    final RawRiemannClient rawRclient = new RawRiemannClient("localhost", port);
+    rawRclient.connect();
+
+    final RiemannClient rclient = RiemannClient.tcp("localhost", port);
+    rclient.connect();
+
+    MonitoringEvent event = new MonitoringEvent().tag("test").ttl(3);
+    boolean ret;
+    ret = rawRclient.send(event);
+    Assert.assertTrue(ret);
+
+    // wait until we can query it from riemann server
+    ret = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test\"");
+
+        return events != null && events.size() == 1 && events.get(0).getTagsCount() == 1
+            && events.get(0).getTags(0).equals("test");
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(ret);
+
+    server.stop();
+
+    // wait until heartbeat detects server is down
+    ret = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        return rawRclient.getState() == State.RECONNECTING;
+      }
+    }, 20 * 1000);
+    Assert.assertTrue(ret);
+
+    ret = rawRclient.send(event);
+    Assert.assertFalse(ret);
+
+    server.start();
+    // wait until heartbeat detects server is up again
+    ret = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        return rawRclient.getState() == State.CONNECTED;
+      }
+    }, 20 * 1000);
+    Assert.assertTrue(ret);
+
+    MonitoringEvent event2 = new MonitoringEvent().tag("test2").ttl(3);
+    ret = rawRclient.send(event2);
+    Assert.assertTrue(ret);
+
+    // wait until we can query it from riemann server
+    ret = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test2\"");
+
+        return events != null && events.size() == 1 && events.get(0).getTagsCount() == 1
+            && events.get(0).getTags(0).equals("test2");
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(ret);
+
+    // clean up
+    rawRclient.disconnect();
+    server.stop();
+
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannMonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannMonitoringServer.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannMonitoringServer.java
new file mode 100644
index 0000000..37234e0
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/riemann/TestRiemannMonitoringServer.java
@@ -0,0 +1,82 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.monitoring.MonitoringTestHelper;
+import org.apache.helix.monitoring.riemann.RiemannConfigs;
+import org.apache.helix.monitoring.riemann.RiemannMonitoringServer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.aphyr.riemann.client.RiemannClient;
+
+public class TestRiemannMonitoringServer {
+
+  @Test
+  public void testBasic() throws IOException {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    int port = MonitoringTestHelper.availableTcpPort();
+    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+    RiemannConfigs.Builder builder = new RiemannConfigs.Builder().addConfig(monitoringConfig);
+    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+
+    // Check server starts
+    server.start();
+    Assert.assertTrue(server.isStarted());
+
+    RiemannClient rclient = null;
+    try {
+      rclient = RiemannClient.tcp("localhost", port);
+      rclient.connect();
+      rclient.event().sendWithAck();
+    } catch (IOException e) {
+      Assert.fail("Riemann server should start on port: " + port);
+    }
+
+    // Check server stops
+    Assert.assertNotNull(rclient);
+    rclient.disconnect();
+    server.stop();
+    Assert.assertFalse(server.isStarted());
+
+    try {
+      rclient = RiemannClient.tcp("localhost", port);
+      rclient.connect();
+      rclient.event().sendWithAck();
+      Assert.fail("Riemann server should be stopped on port: " + port);
+    } catch (IOException e) {
+      // ok
+    }
+
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+}


Mime
View raw message