Return-Path: X-Original-To: apmail-storm-commits-archive@minotaur.apache.org Delivered-To: apmail-storm-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0B2A0186E2 for ; Wed, 28 Oct 2015 14:56:41 +0000 (UTC) Received: (qmail 65045 invoked by uid 500); 28 Oct 2015 14:56:38 -0000 Delivered-To: apmail-storm-commits-archive@storm.apache.org Received: (qmail 65020 invoked by uid 500); 28 Oct 2015 14:56:38 -0000 Mailing-List: contact commits-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@storm.apache.org Delivered-To: mailing list commits@storm.apache.org Received: (qmail 64982 invoked by uid 99); 28 Oct 2015 14:56:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Oct 2015 14:56:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C3246E00D8; Wed, 28 Oct 2015 14:56:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kishorvpatil@apache.org To: commits@storm.apache.org Date: Wed, 28 Oct 2015 14:56:38 -0000 Message-Id: In-Reply-To: <4ea85c81762043f7a7020cae051cb43d@git.apache.org> References: <4ea85c81762043f7a7020cae051cb43d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/8] storm git commit: Added in an in-order test case. Added in an in-order test case. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7e0b08ef Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7e0b08ef Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7e0b08ef Branch: refs/heads/master Commit: 7e0b08ef2201db91cb97fec979cd98faf10dbd31 Parents: 41b35ea Author: Robert (Bobby) Evans Authored: Thu Sep 24 13:45:36 2015 -0500 Committer: Robert (Bobby) Evans Committed: Mon Oct 26 12:43:51 2015 -0500 ---------------------------------------------------------------------- .../storm/starter/FastWordCountTopology.java | 12 +- .../jvm/storm/starter/InOrderDeliveryTest.java | 175 +++++++++++++++++++ 2 files changed, 185 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7e0b08ef/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java index c50c994..dab9405 100644 --- a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java +++ b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java @@ -127,13 +127,21 @@ public class FastWordCountTopology { TopologyInfo info = client.getTopologyInfo(id); int uptime = info.get_uptime_secs(); long acked = 0; + long failed = 0; double weightedAvgTotal = 0.0; for (ExecutorSummary exec: info.get_executors()) { if ("spout".equals(exec.get_component_id())) { SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map failedMap = stats.get_failed().get(":all-time"); Map ackedMap = stats.get_acked().get(":all-time"); Map avgLatMap = stats.get_complete_ms_avg().get(":all-time"); for (String key: ackedMap.keySet()) { + if (failedMap != null) { + Long tmp = failedMap.get(key); + if (tmp != null) { + failed += tmp; + } + } long ackVal = ackedMap.get(key); double latVal = avgLatMap.get(key) * ackVal; acked += ackVal; @@ -142,7 +150,7 @@ public class FastWordCountTopology { } } double avgLatency = weightedAvgTotal/acked; - System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime)); + System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); } public static void kill(Nimbus.Client client, String name) throws Exception { @@ -169,7 +177,7 @@ public class FastWordCountTopology { } conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); Map clusterConf = Utils.readStormConfig(); clusterConf.putAll(Utils.readCommandLineOpts()); http://git-wip-us.apache.org/repos/asf/storm/blob/7e0b08ef/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java new file mode 100644 index 0000000..5df0688 --- /dev/null +++ b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.*; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.FailedException; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class InOrderDeliveryTest { + public static class InOrderSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + int _base = 0; + int _i = 0; + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _base = context.getThisTaskIndex(); + } + + @Override + public void nextTuple() { + Values v = new Values(_base, _i); + _collector.emit(v, "ACK"); + _i++; + } + + @Override + public void ack(Object id) { + //Ignored + } + + @Override + public void fail(Object id) { + //Ignored + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("c1", "c2")); + } + } + + public static class Check extends BaseBasicBolt { + Map expected = new HashMap(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + Integer c1 = tuple.getInteger(0); + Integer c2 = tuple.getInteger(1); + Integer exp = expected.get(c1); + if (exp == null) exp = 0; + if (c2.intValue() != exp.intValue()) { + System.out.println(c1+" "+c2+" != "+exp); + throw new FailedException(c1+" "+c2+" != "+exp); + } + exp = c2 + 1; + expected.put(c1, exp); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + //Empty + } + } + + public static void printMetrics(Nimbus.Client client, String name) throws Exception { + ClusterSummary summary = client.getClusterInfo(); + String id = null; + for (TopologySummary ts: summary.get_topologies()) { + if (name.equals(ts.get_name())) { + id = ts.get_id(); + } + } + if (id == null) { + throw new Exception("Could not find a topology named "+name); + } + TopologyInfo info = client.getTopologyInfo(id); + int uptime = info.get_uptime_secs(); + long acked = 0; + long failed = 0; + double weightedAvgTotal = 0.0; + for (ExecutorSummary exec: info.get_executors()) { + if ("spout".equals(exec.get_component_id())) { + SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map failedMap = stats.get_failed().get(":all-time"); + Map ackedMap = stats.get_acked().get(":all-time"); + Map avgLatMap = stats.get_complete_ms_avg().get(":all-time"); + for (String key: ackedMap.keySet()) { + if (failedMap != null) { + Long tmp = failedMap.get(key); + if (tmp != null) { + failed += tmp; + } + } + long ackVal = ackedMap.get(key); + double latVal = avgLatMap.get(key) * ackVal; + acked += ackVal; + weightedAvgTotal += latVal; + } + } + } + double avgLatency = weightedAvgTotal/acked; + System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); + } + + public static void kill(Nimbus.Client client, String name) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + client.killTopologyWithOpts(name, opts); + } + + public static void main(String[] args) throws Exception { + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new InOrderSpout(), 8); + builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1")); + + Config conf = new Config(); + conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class); + + String name = "in-order-test"; + if (args != null && args.length > 0) { + name = args[0]; + } + + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); + + Map clusterConf = Utils.readStormConfig(); + clusterConf.putAll(Utils.readCommandLineOpts()); + Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + + //Sleep for 50 mins + for (int i = 0; i < 50; i++) { + Thread.sleep(30 * 1000); + printMetrics(client, name); + } + kill(client, name); + } +}