storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kishorvpa...@apache.org
Subject [2/8] storm git commit: Added in an in-order test case.
Date Wed, 28 Oct 2015 14:56:38 GMT
Added in an in-order test case.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7e0b08ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7e0b08ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7e0b08ef

Branch: refs/heads/master
Commit: 7e0b08ef2201db91cb97fec979cd98faf10dbd31
Parents: 41b35ea
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Thu Sep 24 13:45:36 2015 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Mon Oct 26 12:43:51 2015 -0500

----------------------------------------------------------------------
 .../storm/starter/FastWordCountTopology.java    |  12 +-
 .../jvm/storm/starter/InOrderDeliveryTest.java  | 175 +++++++++++++++++++
 2 files changed, 185 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7e0b08ef/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java
index c50c994..dab9405 100644
--- a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java
@@ -127,13 +127,21 @@ public class FastWordCountTopology {
     TopologyInfo info = client.getTopologyInfo(id);
     int uptime = info.get_uptime_secs();
     long acked = 0;
+    long failed = 0;
     double weightedAvgTotal = 0.0;
     for (ExecutorSummary exec: info.get_executors()) {
       if ("spout".equals(exec.get_component_id())) {
         SpoutStats stats = exec.get_stats().get_specific().get_spout();
+        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
         Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
         Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
         for (String key: ackedMap.keySet()) {
+          if (failedMap != null) {
+              Long tmp = failedMap.get(key);
+              if (tmp != null) {
+                  failed += tmp;
+              }
+          }
           long ackVal = ackedMap.get(key);
           double latVal = avgLatMap.get(key) * ackVal;
           acked += ackVal;
@@ -142,7 +150,7 @@ public class FastWordCountTopology {
       }
     }
     double avgLatency = weightedAvgTotal/acked;
-    System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec:
"+(((double)acked)/uptime));
+    System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec:
"+(((double)acked)/uptime+" failed: "+failed));
   } 
 
   public static void kill(Nimbus.Client client, String name) throws Exception {
@@ -169,7 +177,7 @@ public class FastWordCountTopology {
     }
 
     conf.setNumWorkers(1);
-    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+    StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
 
     Map clusterConf = Utils.readStormConfig();
     clusterConf.putAll(Utils.readCommandLineOpts());

http://git-wip-us.apache.org/repos/asf/storm/blob/7e0b08ef/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java
new file mode 100644
index 0000000..5df0688
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.*;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.FailedException;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class InOrderDeliveryTest {
+  public static class InOrderSpout extends BaseRichSpout {
+    SpoutOutputCollector _collector;
+    int _base = 0;
+    int _i = 0;
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+      _collector = collector;
+      _base = context.getThisTaskIndex();
+    }
+
+    @Override
+    public void nextTuple() {
+      Values v = new Values(_base, _i);
+      _collector.emit(v, "ACK");
+      _i++;
+    }
+
+    @Override
+    public void ack(Object id) {
+      //Ignored
+    }
+
+    @Override
+    public void fail(Object id) {
+      //Ignored
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("c1", "c2"));
+    }
+  }
+
+  public static class Check extends BaseBasicBolt {
+    Map<Integer, Integer> expected = new HashMap<Integer, Integer>();
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      Integer c1 = tuple.getInteger(0);
+      Integer c2 = tuple.getInteger(1);
+      Integer exp = expected.get(c1);
+      if (exp == null) exp = 0;
+      if (c2.intValue() != exp.intValue()) {
+          System.out.println(c1+" "+c2+" != "+exp);
+          throw new FailedException(c1+" "+c2+" != "+exp);
+      }
+      exp = c2 + 1;
+      expected.put(c1, exp);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      //Empty
+    }
+  }
+
+  public static void printMetrics(Nimbus.Client client, String name) throws Exception {
+    ClusterSummary summary = client.getClusterInfo();
+    String id = null;
+    for (TopologySummary ts: summary.get_topologies()) {
+      if (name.equals(ts.get_name())) {
+        id = ts.get_id();
+      }
+    }
+    if (id == null) {
+      throw new Exception("Could not find a topology named "+name);
+    }
+    TopologyInfo info = client.getTopologyInfo(id);
+    int uptime = info.get_uptime_secs();
+    long acked = 0;
+    long failed = 0;
+    double weightedAvgTotal = 0.0;
+    for (ExecutorSummary exec: info.get_executors()) {
+      if ("spout".equals(exec.get_component_id())) {
+        SpoutStats stats = exec.get_stats().get_specific().get_spout();
+        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+        Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
+        for (String key: ackedMap.keySet()) {
+          if (failedMap != null) {
+              Long tmp = failedMap.get(key);
+              if (tmp != null) {
+                  failed += tmp;
+              }
+          }
+          long ackVal = ackedMap.get(key);
+          double latVal = avgLatMap.get(key) * ackVal;
+          acked += ackVal;
+          weightedAvgTotal += latVal;
+        }
+      }
+    }
+    double avgLatency = weightedAvgTotal/acked;
+    System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec:
"+(((double)acked)/uptime+" failed: "+failed));
+  } 
+
+  public static void kill(Nimbus.Client client, String name) throws Exception {
+    KillOptions opts = new KillOptions();
+    opts.set_wait_secs(0);
+    client.killTopologyWithOpts(name, opts);
+  } 
+
+  public static void main(String[] args) throws Exception {
+
+    TopologyBuilder builder = new TopologyBuilder();
+
+    builder.setSpout("spout", new InOrderSpout(), 8);
+    builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1"));
+
+    Config conf = new Config();
+    conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
+
+    String name = "in-order-test";
+    if (args != null && args.length > 0) {
+        name = args[0];
+    }
+
+    conf.setNumWorkers(1);
+    StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
+
+    Map clusterConf = Utils.readStormConfig();
+    clusterConf.putAll(Utils.readCommandLineOpts());
+    Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+
+    //Sleep for 50 mins
+    for (int i = 0; i < 50; i++) {
+        Thread.sleep(30 * 1000);
+        printMetrics(client, name);
+    }
+    kill(client, name);
+  }
+}


Mime
View raw message