helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [1/2] [HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server, rb=18455
Date Thu, 27 Feb 2014 00:52:51 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-monitoring b182690f8 -> 3505beadb


http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
index 8b7f839..588bc35 100644
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
@@ -19,204 +19,170 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
-import java.net.InetAddress;
 import java.util.Date;
 import java.util.List;
-import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import org.apache.helix.MonitoringTestHelper;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.Leader;
 import org.apache.helix.model.MonitoringConfig;
-import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterStateVerifier;
 import org.junit.Assert;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Maps;
+import com.aphyr.riemann.Proto.Event;
+import com.aphyr.riemann.client.RiemannClient;
 
 public class TestClientServerMonitoring extends ZkUnitTestBase {
   @Test
   public void testMonitoring() throws Exception {
-    final int NUM_PARTICIPANTS = 4;
+    final int NUM_PARTICIPANTS = 0;
     final int NUM_PARTITIONS = 8;
-    final int NUM_REPLICAS = 2;
+    final int NUM_REPLICAS = 1;
 
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    // Set up cluster
+    // Set up monitoring cluster
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
-        "TestDB", // resource name prefix
+        "MonitoringService", // resource name prefix
         1, // resources
         NUM_PARTITIONS, // partitions per resource
         NUM_PARTICIPANTS, // number of nodes
         NUM_REPLICAS, // replicas
-        "MasterSlave", // pick a built-in state model
+        "OnlineOffline", // pick a built-in state model
         RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
         true); // do rebalance
 
-    // start participants
-    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
-    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
-      participants[i] =
-          new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i));
-      participants[i].syncStart();
-    }
-    HelixDataAccessor accessor = participants[0].getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
-    // set a custom monitoring config
-    MonitoringConfig monitoringConfig = new MonitoringConfig("sampleMonitoringConfig");
-    monitoringConfig.setConfig(getMonitoringConfigString());
-    accessor.setProperty(keyBuilder.monitoringConfig("sampleMonitoringConfig"), monitoringConfig);
-
-    // start controller
+    // Enable auto-join
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+    // Start controller
     ClusterControllerManager controller =
         new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
-    controller.registerMonitoringServer(new RiemannMonitoringServer(InetAddress.getLocalHost()
-        .getHostName()));
     controller.syncStart();
 
-    // make sure the leader has registered and is showing the server port
-    Leader leader = accessor.getProperty(keyBuilder.controllerLeader());
-    Assert.assertNotNull(leader);
-    Assert.assertNotEquals(leader.getMonitoringPort(), -1);
-    Assert.assertNotNull(leader.getMonitoringHost());
+    // Start monitoring server
+    int port = MonitoringTestHelper.availableTcpPort();
+    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+    RiemannConfigs.Builder builder = new RiemannConfigs.Builder().addConfig(monitoringConfig);
+    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+    server.start();
+
+    // Start Riemann agent
+    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, port);
+    agent.start();
+
+    // Check live-instance
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+    Assert.assertNotNull(liveInstances);
+    Assert.assertEquals(liveInstances.size(), 1);
+
+    // Check external-view
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Connect monitoring client
+    final RiemannMonitoringClient client =
+        new RiemannMonitoringClient(ZK_ADDR, ClusterId.from(clusterName),
+            ResourceId.from("MonitoringService0"), 1);
+    client.connect();
 
-    // run the spectator
-    spectate(clusterName, "TestDB0", NUM_PARTITIONS);
+    final RiemannClient rclient = RiemannClient.tcp("localhost", port);
+    rclient.connect();
 
-    // stop participants
-    for (MockParticipantManager participant : participants) {
-      participant.syncStop();
-    }
+    // Test MonitoringEvent#send()
+    MonitoringEvent event = new MonitoringEvent().tag("test").ttl(5);
+    client.send(ResourceId.from("TestDB"), event, false);
 
-    // stop controller
-    controller.syncStop();
-  }
+    // Check monitoring server has received the event with tag="test"
+    result = TestHelper.verify(new TestHelper.Verifier() {
 
-  private String getMonitoringConfigString() {
-    StringBuilder sb =
-        new StringBuilder()
-            .append("(defn parse-int\r\n")
-            .append(
-                "  \"Convert a string to an integer\"\r\n  [instr]\r\n  (Integer/parseInt
instr))\r\n\r\n")
-            .append("(defn parse-double\r\n  \"Convert a string into a double\"\r\n  [instr]\r\n")
-            .append("  (Double/parseDouble instr))\r\n\r\n(defn check-failure-rate\r\n")
-            .append(
-                "  \"Check if the event should trigger an alarm based on failure rate\"\r\n
 [e]\r\n")
-            .append(
-                "  (let [writeCount (parse-int (:writeCount e)) failedCount (parse-int (:failedCount
e))]\r\n")
-            .append(
-                "    (if (> writeCount 0)\r\n      (let [ratio (double (/ failedCount
writeCount))]\r\n")
-            .append("        (if (> ratio 0.1) ; Report if the failure count exceeds 10%\r\n")
-            .append(
-                "          (prn (:host e) \"has an unacceptable failure rate of\" ratio))))))\r\n\r\n")
-            .append(
-                "(defn check-95th-latency\r\n  \"Check if the 95th percentile latency is
within expectations\"\r\n")
-            .append("  [e]\r\n  (let [latency (parse-double (:latency95 e))]\r\n")
-            .append(
-                "    (if (> latency 1.0) ; Report if the 95th percentile latency exceeds
1.0s\r\n")
-            .append(
-                "      (prn (:host e) \"has an unacceptable 95th percentile latency of\"
latency))))\r\n\r\n")
-            .append("(streams\r\n  (where\r\n    (service #\".*LatencyReport.*\")")
-            .append(
-                " ; Only process services containing LatencyReport\r\n    check-failure-rate\r\n")
-            .append("    check-95th-latency))");
-    return sb.toString();
-  }
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test\"");
+        return (events.size() == 1) && (events.get(0).getTagsCount() == 1)
+            && (events.get(0).getTags(0).equals("test"));
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(result);
 
-  private void spectate(final String clusterName, final String resourceName, final int numPartitions)
-      throws Exception {
-    final Random random = new Random();
-    final ClusterId clusterId = ClusterId.from(clusterName);
-    final ResourceId resourceId = ResourceId.from(resourceName);
+    // Test MonitoringEvent#sendAndFlush()
+    MonitoringEvent event2 = new MonitoringEvent().tag("test2").ttl(5);
+    client.sendAndFlush(ResourceId.from("TestDB2"), event2);
 
-    // Connect to Helix
-    final HelixManager manager =
-        HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.SPECTATOR,
ZK_ADDR);
-    manager.connect();
+    // Check monitoring server has received the event with tag="test2"
+    result = TestHelper.verify(new TestHelper.Verifier() {
 
-    // Attach a monitoring client to this connection
-    final MonitoringClient client =
-        new RiemannMonitoringClient(clusterId, manager.getHelixDataAccessor());
-    client.connect();
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test2\"");
+        return (events.size() == 1) && (events.get(0).getTagsCount() == 1)
+            && (events.get(0).getTags(0).equals("test2"));
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(result);
 
-    // Start spectating
-    final RoutingTableProvider routingTableProvider = new RoutingTableProvider();
-    manager.addExternalViewChangeListener(routingTableProvider);
+    // Test MonitoringEvent#every()
+    client.every(ResourceId.from("TestDB3"), 1, 0, TimeUnit.SECONDS, new Runnable() {
 
-    // Send some metrics
-    client.every(5, 0, TimeUnit.SECONDS, new Runnable() {
       @Override
       public void run() {
-        Map<ParticipantId, Integer> writeCounts = Maps.newHashMap();
-        Map<ParticipantId, Integer> failedCounts = Maps.newHashMap();
-        Map<ParticipantId, Double> latency95Map = Maps.newHashMap();
-        for (int i = 0; i < numPartitions; i++) {
-          // Figure out who hosts what
-          PartitionId partitionId = PartitionId.from(resourceId, i + "");
-          List<InstanceConfig> instances =
-              routingTableProvider.getInstances(resourceName, partitionId.stringify(), "MASTER");
-          if (instances.size() < 1) {
-            continue;
-          }
-
-          // Normally you would get these attributes by using a CallTracker
-          ParticipantId participantId = instances.get(0).getParticipantId();
-          int writeCount = random.nextInt(1000) + 10;
-          if (!writeCounts.containsKey(participantId)) {
-            writeCounts.put(participantId, writeCount);
-          } else {
-            writeCounts.put(participantId, writeCounts.get(participantId) + writeCount);
-          }
-          int failedCount = i != 0 ? 0 : writeCount / 2; // bad write count from p0 master
-          if (!failedCounts.containsKey(participantId)) {
-            failedCounts.put(participantId, failedCount);
-          } else {
-            failedCounts.put(participantId, failedCounts.get(participantId) + failedCount);
-          }
-          double latency = (i != 1) ? 0.001 : 5.000; // bad 95th latency from p1 master
-          latency95Map.put(participantId, latency);
-        }
-
-        // Send everything grouped by participant
-        for (ParticipantId participantId : writeCounts.keySet()) {
-          Map<String, String> attributes = Maps.newHashMap();
-          attributes.put("writeCount", writeCounts.get(participantId) + "");
-          attributes.put("failedCount", failedCounts.get(participantId) + "");
-          attributes.put("latency95", latency95Map.get(participantId) + "");
-
-          // Send an event with a ttl long enough to span the send interval
-          MonitoringEvent e =
-              new MonitoringEvent().cluster(clusterId).resource(resourceId)
-                  .participant(participantId).name("LatencyReport").attributes(attributes)
-                  .eventState("update").ttl(10.0f);
-          client.send(e, false);
-        }
+        MonitoringEvent event3 =
+            new MonitoringEvent().tag("test3").resource(ResourceId.from("db" + System.currentTimeMillis())).ttl(5);
+        client.send(ResourceId.from("TestDB3"), event3, false);
       }
     });
-    Thread.sleep(60000);
+
+    // Check monitoring server has received at least 2 event2 with tag="test3"
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test3\"");
+        return (events.size() > 2) && (events.get(0).getTagsCount() == 1)
+            && (events.get(0).getTags(0).equals("test3"));
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result);
+
+    // Stop client
     client.disconnect();
-    manager.disconnect();
+    rclient.disconnect();
+
+    // Stop controller
+    controller.syncStop();
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
new file mode 100644
index 0000000..100c28c
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
@@ -0,0 +1,129 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.MonitoringTestHelper;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.monitoring.RiemannConfigs.Builder;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestRiemannAgent extends ZkUnitTestBase {
+  @Test
+  public void testStartAndStop() throws Exception {
+    final int NUM_PARTICIPANTS = 0;
+    final int NUM_PARTITIONS = 4;
+    final int NUM_REPLICAS = 1;
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up monitoring cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "MonitoringService", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "OnlineOffline", // pick a built-in state model
+        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+        true); // do rebalance
+
+    // Enable auto-join
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // Start monitoring server
+    int port = MonitoringTestHelper.availableTcpPort();
+    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+    RiemannConfigs.Builder builder = new Builder().addConfig(monitoringConfig);
+    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+    server.start();
+
+    // Start Riemann agent
+    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, port);
+    agent.start();
+
+    // Check live-instance
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+    Assert.assertNotNull(liveInstances);
+    Assert.assertEquals(liveInstances.size(), 1);
+
+    // Check external-view
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Stop monitoring server
+    server.stop();
+
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+        return liveInstances != null && liveInstances.size() == 0;
+      }
+    }, 15 * 1000);
+    Assert.assertTrue(result, "RiemannAgent should be disconnected if RiemannServer is stopped");
+
+    // Stop controller
+    controller.syncStop();
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
new file mode 100644
index 0000000..f7f45f9
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
@@ -0,0 +1,79 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.helix.MonitoringTestHelper;
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.MonitoringConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.aphyr.riemann.client.RiemannClient;
+import com.google.common.collect.Lists;
+
+public class TestRiemannMonitoringServer {
+
+  @Test
+  public void testBasic() throws IOException {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    int port = MonitoringTestHelper.availableTcpPort();
+    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+    RiemannConfigs.Builder builder = new RiemannConfigs.Builder().addConfig(monitoringConfig);
+    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+
+    // Check server starts
+    server.start();
+    Assert.assertTrue(server.isStarted());
+
+    RiemannClient rclient = null;
+    try {
+      rclient = RiemannClient.tcp("localhost", port);
+      rclient.connect();
+    } catch (IOException e) {
+      Assert.fail("Riemann server should start on port: " + port);
+    }
+
+    // Check server stops
+    Assert.assertNotNull(rclient);
+    rclient.disconnect();
+    server.stop();
+    Assert.assertFalse(server.isStarted());
+
+    try {
+      rclient = RiemannClient.tcp("localhost", port);
+      rclient.connect();
+      Assert.fail("Riemann server should be stopped on port: " + port);
+    } catch (IOException e) {
+      // ok
+    }
+
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+}


Mime
View raw message