storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [01/18] storm git commit: STORM-2702: Part 1. Move files as needed
Date Thu, 07 Sep 2017 19:14:42 GMT
Repository: storm
Updated Branches:
  refs/heads/master 3ee9a899c -> 32389d75f


STORM-2702: Part 1.  Move files as needed


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0d10b8af
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0d10b8af
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0d10b8af

Branch: refs/heads/master
Commit: 0d10b8afe7e282d04b67f1a0c1c90db801842b14
Parents: b87dcc0
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Mon Aug 21 13:29:59 2017 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Mon Aug 21 13:29:59 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/README.md                | 163 ++++++++
 examples/storm-loadgen/pom.xml                  | 122 ++++++
 .../loadgen/HttpForwardingMetricsConsumer.java  |  85 +++++
 .../loadgen/HttpForwardingMetricsServer.java    | 118 ++++++
 .../storm/loadgen/ThroughputVsLatency.java      | 377 +++++++++++++++++++
 examples/storm-starter/pom.xml                  |   5 -
 .../storm/starter/ThroughputVsLatency.java      | 377 -------------------
 pom.xml                                         |   2 +-
 storm-client-misc/pom.xml                       |  63 ----
 .../metric/HttpForwardingMetricsConsumer.java   |  85 -----
 .../metric/HttpForwardingMetricsServer.java     | 118 ------
 11 files changed, 866 insertions(+), 649 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
new file mode 100644
index 0000000..8414768
--- /dev/null
+++ b/examples/storm-loadgen/README.md
@@ -0,0 +1,163 @@
+# Storm Load Generation Tools
+
+A set of tools to place an artificial load on a storm cluster to compare against a different storm cluster.  This is particularly helpful when making changes to the data path in storm to see what if any impact the changes had.  This is also useful for end users that want to compare different hardware setups to see what the trade-offs are, although actually running your real topologies is going to be more accurate.
+
+## Methodology
+The idea behind all of these tools is to measure the trade-offs between latency, throughput, and cost when processing data using Apache Storm.
+
+When processing data you typically will know a few things.  First you will know about how much data you are going to be processing.  This will typically be a range of values that change throughput the day.  You also will have an idea of how quickly you need the data processed by.  Often this is measured in terms of the latency it takes to process data at the some percentile or set of percentiles.  This is because of most use cases the value of the data declines over time, and being able to react to the data quickly is more valuable.  You probably also have a budget for how much you are willing to spend to be able to process this data.  There are always trade-offs in how quickly you can process some data and how efficiently you can processes that data both in terms of resource usage (cost) and latency.  These tools are designed to help you explore that space.
+
+## Tools
+### CaptureLoad 
+
+`CaptureLoad` will look at the topologies on a running cluster and store the structure of and metrics about each of theses topologies storing them in a format that can be used later to reproduce a similar load on the cluster.
+
+#### Usage
+```
+storm jar storm-loadgen.jar org.apache.storm.loadgen.CaptureLoad [options] [topologyName]*
+```
+|Option| Description|
+|-----|-----|
+|-a,--anonymize | Strip out any possibly identifiable information|
+| -h,--help | Print a help message |
+| -o,--output-dir <file> | Where to write (defaults to ./loadgen/)|
+
+#### Limitations
+This is still a work in progress.  It does not currently capture CPU or memory usage of a topology.  Resource requests (used by RAS when scheduling) within the topology are also not captured yet, nor is the user that actually ran the topology.
+
+### GenLoad
+
+`GenLoad` will take the files produced by `CaptureLoad` and replay them in a simulated way on a cluster.  It also offers lots of ways to capture metrics about those simulated topologies to be able to compare different software versions of different hardware setups.  You can also make adjustments to the topology before submitting it to change the size or throughput of the topology.
+
+### Usage
+```
+storm jar storm-loadgen.jar org.apache.storm.loadgen.GenLoad [options] [capture_file]*
+```
+
+|Option| Description|
+|-----|-----|
+| --debug | Print debug information about the adjusted topology before submitting it. |
+|-h,--help | Print a help message |
+| --local-or-shuffle | Replace shuffle grouping with local or shuffle grouping. |
+| --parallel &lt;MULTIPLIER> | How much to scale the topology up or down in parallelism. The new parallelism will round up to the next whole number (defaults to 1.0 no scaling) The total throughput of the topology will not be scaled. |
+| -r,--report-interval &lt;INTERVAL_SECS> | How long in between reported metrics.  Will be rounded up to the next 10 sec boundary. default 30 |
+| --reporter &lt;TYPE:FILE?OPTIONS>  | Provide the config for a reporter to run. See below for more information about these |
+| -t,--test-time &lt;MINS> | How long to run the tests for in mins (defaults to 5) |
+| --throughput &lt;MULTIPLIER> | How much to scale the topology up or down in throughput. (defaults to 1.0 no scaling)|
+| -w,--report-window &lt;INTERVAL_SECS> | How long of a rolling window should be in each report.  Will be rounded up to the next report interval boundary. default 30|
+
+## ThroughputVsLatency
+This is a topology similar to `GenLoad` in most ways, except instead of simulating a load it runs a word count algorithm.
+
+### Usage
+```
+storm jar storm-loadgen.jar org.apache.storm.loadgen.ThroughputVsLatency [options]
+```
+
+|Option| Description|
+|-----|-----|
+|--counters &lt;NUM>| Number of counter bolts to use (defaults to 1)|
+| -h,--help | Print a help message |
+| --name <TOPO_NAME> | Name of the topology to run (defaults to wc-test) |
+| -r,--report-interval &lt;INTERVAL_SECS>| How long in between reported metrics.  Will be rounded up to the next 10 sec boundary. default 30 |
+| --rate &lt;SENTENCES/SEC>| How many sentences per second to run. (defaults to 500) |
+| --reporter &lt;TYPE:FILE?OPTIONS>  | Provide the config for a reporter to run. See below for more information about these |
+|--splitters &lt;NUM> | Number of splitter bolts to use (defaults to 1) |
+| --spouts &lt;NUM>| Number of spouts to use (defaults to 1) |
+| -t,--test-time &lt;MINS>| How long to run the tests for in mins (defaults to 5) |
+| -w,--report-window &lt;INTERVAL_SECS>| How long of a rolling window should be in each report.  Will be rounded up to the next report interval boundary.|
+
+# Reporters
+Reporters provide a way to store various statistics about a running topology. There are currently a few supported reporters
+
+ * legacy - report values like ThroughputVsLatency has done in the past
+ * TSV - tab separated values
+ * CSV - comma separated values
+
+All of these types can have their data written out to a file.  To do this add a path after the type.  For example `legacy:./legacy_data` or `tsv:my_run.tsv`. By default the file will be over written unless an option is given to append instead. Options are in a URL like format, with a `?` separating the type:path from the options, and all of the options separated by a `&`.  To append to the file you can do something like `csv:./my_run.csv?append` or  `csv:./my_run.csv?append=true`
+
+Not all options are supported by all reporters.
+
+|Reporter Option| Description | Supported Reporters|
+|---------------|-------------|--------------------|
+|time | Set the time unit that you want latency and CPU reported in.  This can be from nanoseconds up to seconds.  Most names are supported for the types| legacy, csv, tsv|
+|columns | A comma separated list of columns to output (see below for the metrics supported).  Defaults to "start_time", "end_time", "completion_rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed" | csv, tsv |
+|extraColumns | Like columns but ones that should be added to the defaults instead of replacing them (this is mostly for convenience) | csv, tsv |
+|meta | An arbitrary string that will appear as a "meta" column at the end.  This helps when appending to files to keep different runs separated | csv, tsv|
+
+There are a lot of different metrics supported
+
+|Metrics Name| Description|
+|------------|------------|
+|99%ile| 99th percentile completion latency. |
+|99.9%ile| 99.9th percentile completion latency. |
+|median| Median completion latency. |
+|mean| Mean completion latency. |
+|min| Minimum completion latency. |
+|max| Maximum completion latency. |
+|stddev| Standard Deviation of completion latency. |
+|user_cpu| User space CPU time.|
+|sys_cpu| System space CPU time. |
+|gc_cpu| Amount of CPU time spent in GC as reported by the JVM. |
+|cores| The number of CPU cores used. `(user_cpu + sys_cpu) / time_window`|
+|uptime| The amount of time the oldest topology has been up for. |
+|acked| The number of tuples fully acked as reported by Storm's metrics. |
+|rate| The rate of tuples fully acked as reported by Storm's metrics. |
+|completed| The number of tuples fully acked as reported by the latency histogram metrics. |
+|completion_rate| The rate of tuples fully acked as reported by the latency histogram metrics. |
+|mem| The amount of memory used by the topology in MB, as reported by the JVM. |
+|failed| The number of failed tuples as reported by Storm's metrics. |
+|start_time| The starting time of the metrics window from when the first topology was launched.
+|end_time| The ending time of the metrics window from the the first topology was launched.
+|time_window| the length in seconds for the time window. |
+|ids| The topology ids that are being tracked |
+
+# Captured Load File Format
+The file format used with `CaptureLoad` and `GenLoad` is based off of the flux file format, but with some extensions and omissions.
+
+At a top level the supported options keys are
+
+| Config | Description |
+|--------|-------------|
+| name | The name of the topology.  If not given the base name of the file will be used. |
+| config | A map of String to Object configs to use when submitting the topology. |
+| spouts | A list of spouts for the topology. |
+| bolts | A list of bolts in the topology. |
+| streams | A list of streams connecting different components in the topology. |
+
+## Spouts and Bolts
+
+Spouts and bolts have the same format.
+
+| Config | Description |
+|--------|-------------|
+| id | The id of the bolt or spout.  This should be unique within the topology |
+| parallelism | How many instances of this component should be a part of the topology |
+| streams | The streams that are produced by this bolt or spout |
+
+### Output Streams
+
+This is not a part of flux.  It defines the output of a bolt or spout.
+
+| Config | Description |
+|--------|-------------|
+| streamId | The ID of the stream being output.  The default is "default" |
+| rate | This is a map describing the rate at which messages are output on this stream. |
+
+The rate has at least a `mean` value.  If you want the rate to vary a bit over time you can also include a Standard Deviation with `stddev` and a `min` and `max` value.  The actual rates selected will follow a Gaussian distribution within those bounds.
+
+## (Input) Streams
+
+The streams that connect components together has the form.
+
+| Config | Description |
+|--------|-------------|
+| from | the component id the stream is coming from |
+| to | the component id the stream is going to |
+| grouping | This is a map that defines the grouping used |
+| grouping.type | the type of grouping including `SHUFFLE`, `FIELDS`, `ALL`, `GLOBAL`, `LOCAL_OR_SHUFFLE`, `NONE`, or `PARTIAL_KEY`.  defaults to `SHUFFLE` |
+| grouping.streamId | the id of the stream (default is "default") |
+| execTime | a distribution of the amount of time in milliseconds that execution of this component takes (execute latency). |
+| processTime | a distribution of the amount of time in milliseconds that processing of a tuple takes (process latency). |
+
+The `execTime` and `processTime` values follow the same pattern as the `OutputStream` `rate`.  A `mean` values is required, but `stddev`, `min`, and `max` may also be given.

http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/examples/storm-loadgen/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/pom.xml b/examples/storm-loadgen/pom.xml
new file mode 100644
index 0000000..f75e575
--- /dev/null
+++ b/examples/storm-loadgen/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>storm</artifactId>
+    <groupId>org.apache.storm</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <artifactId>storm-loadgen</artifactId>
+  <packaging>jar</packaging>
+  <name>storm-loadgen</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.hdrhistogram</groupId>
+      <artifactId>HdrHistogram</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-client</artifactId>
+      <version>${project.version}</version>
+      <!--
+        Use "provided" scope to keep storm out of the jar-with-dependencies
+        For IntelliJ dev, intellij will load properly.
+      -->
+      <scope>${provided.scope}</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-metrics</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <createDependencyReducedPom>true</createDependencyReducedPom>
+          <filters>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>META-INF/*.SF</exclude>
+                <exclude>META-INF/*.sf</exclude>
+                <exclude>META-INF/*.DSA</exclude>
+                <exclude>META-INF/*.dsa</exclude>
+                <exclude>META-INF/*.RSA</exclude>
+                <exclude>META-INF/*.rsa</exclude>
+                <exclude>META-INF/*.EC</exclude>
+                <exclude>META-INF/*.ec</exclude>
+                <exclude>META-INF/MSFTSIG.SF</exclude>
+                <exclude>META-INF/MSFTSIG.RSA</exclude>
+              </excludes>
+            </filter>
+          </filters>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                </transformer>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <!--Note - the version would be inherited-->
+        <configuration>
+          <maxAllowedViolations>0</maxAllowedViolations>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
new file mode 100644
index 0000000..aa4579c
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.misc.metric;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.net.URL;
+import java.net.HttpURLConnection;
+
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.storm.serialization.KryoValuesSerializer;
+
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Listens for all metrics and POSTs them serialized to a configured URL
+ *
+ * To use, add this to your topology's configuration:
+ *
+ * ```java
+ *   conf.registerMetricsConsumer(org.apache.storm.metrics.HttpForwardingMetricsConsumer.class, "http://example.com:8080/metrics/my-topology/", 1);
+ * ```
+ *
+ * The body of the post is data serialized using {@link org.apache.storm.serialization.KryoValuesSerializer}, with the data passed in
+ * as a list of `[TaskInfo, Collection<DataPoint>]`.  More things may be appended to the end of the list in the future.
+ *
+ * The values can be deserialized using the org.apache.storm.serialization.KryoValuesDeserializer, and a 
+ * correct config + classpath.
+ *
+ * @see org.apache.storm.serialization.KryoValuesSerializer
+ */
+public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
+    private transient URL _url; 
+    private transient IErrorReporter _errorReporter;
+    private transient KryoValuesSerializer _serializer;
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { 
+        try {
+            _url = new URL((String)registrationArgument);
+            _errorReporter = errorReporter;
+            _serializer = new KryoValuesSerializer(topoConf);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+        try {
+            HttpURLConnection con = (HttpURLConnection)_url.openConnection();
+            con.setRequestMethod("POST");
+            con.setDoOutput(true);
+            Output out = new Output(con.getOutputStream());
+            _serializer.serializeInto(Arrays.asList(taskInfo, dataPoints), out);
+            out.flush();
+            out.close();
+            //The connection is not sent unless a response is requested
+            int response = con.getResponseCode();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void cleanup() { }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
new file mode 100644
index 0000000..ef2769a
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.misc.metric;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.List;
+import java.net.ServerSocket;
+import java.net.InetAddress;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.ServletException;
+
+import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
+import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
+
+import com.esotericsoftware.kryo.io.Input;
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.utils.Utils;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+/**
+ * A server that can listen for metrics from the HttpForwardingMetricsConsumer.
+ */
+public abstract class HttpForwardingMetricsServer {
+    private Map _conf;
+    private Server _server = null;
+    private int _port = -1;
+    private String _url = null;
+
+    ThreadLocal<KryoValuesDeserializer> _des = new ThreadLocal<KryoValuesDeserializer>() {
+        @Override
+        protected KryoValuesDeserializer initialValue() {
+            return new KryoValuesDeserializer(_conf);
+        }
+    };
+
+    private class MetricsCollectionServlet extends HttpServlet
+    {
+        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+        {
+            Input in = new Input(request.getInputStream());
+            List<Object> metrics = _des.get().deserializeFrom(in);
+            handle((TaskInfo)metrics.get(0), (Collection<DataPoint>)metrics.get(1));
+            response.setStatus(HttpServletResponse.SC_OK);
+        }
+    }
+
+    public HttpForwardingMetricsServer(Map<String, Object> conf) {
+        _conf = Utils.readStormConfig();
+        if (conf != null) {
+            _conf.putAll(conf);
+        }
+    }
+
+    //This needs to be thread safe
+    public abstract void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
+
+    public void serve(Integer port) {
+        try {
+            if (_server != null) throw new RuntimeException("The server is already running");
+    
+            if (port == null || port <= 0) {
+                ServerSocket s = new ServerSocket(0);
+                port = s.getLocalPort();
+                s.close();
+            }
+            _server = new Server(port);
+            _port = port;
+            _url = "http://"+InetAddress.getLocalHost().getHostName()+":"+_port+"/";
+ 
+            ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+            context.setContextPath("/");
+            _server.setHandler(context);
+ 
+            context.addServlet(new ServletHolder(new MetricsCollectionServlet()),"/*");
+
+            _server.start();
+         } catch (RuntimeException e) {
+             throw e;
+         } catch (Exception e) {
+             throw new RuntimeException(e);
+         }
+    }
+
+    public void serve() {
+        serve(null);
+    }
+
+    public int getPort() {
+        return _port;
+    }
+
+    public String getUrl() {
+        return _url;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
new file mode 100644
index 0000000..96c13c5
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.HdrHistogram.Histogram;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutStats;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
+import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
+import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
+import org.apache.storm.misc.metric.HttpForwardingMetricsServer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.NimbusClient;
+
+/**
+ * WordCount but the spout goes at a predefined rate and we collect
+ * proper latency statistics.
+ */
+public class ThroughputVsLatency {
+    private static class SentWithTime {
+        public final String sentence;
+        public final long time;
+
+        SentWithTime(String sentence, long time) {
+            this.sentence = sentence;
+            this.time = time;
+        }
+    }
+
+    public static class FastRandomSentenceSpout extends BaseRichSpout {
+        static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+                "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+
+        SpoutOutputCollector _collector;
+        long _periodNano;
+        long _emitAmount;
+        Random _rand;
+        long _nextEmitTime;
+        long _emitsLeft;
+        HistogramMetric _histo;
+
+        public FastRandomSentenceSpout(long ratePerSecond) {
+            if (ratePerSecond > 0) {
+                _periodNano = Math.max(1, 1000000000/ratePerSecond);
+                _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
+            } else {
+                _periodNano = Long.MAX_VALUE - 1;
+                _emitAmount = 1;
+            }
+        }
+
+        @Override
+        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+            _collector = collector;
+            _rand = ThreadLocalRandom.current();
+            _nextEmitTime = System.nanoTime();
+            _emitsLeft = _emitAmount;
+            _histo = new HistogramMetric(3600000000000L, 3);
+            context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
+        }
+
+        @Override
+        public void nextTuple() {
+            if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
+                _emitsLeft = _emitAmount;
+                _nextEmitTime = _nextEmitTime + _periodNano;
+            }
+
+            if (_emitsLeft > 0) {
+                String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
+                _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
+                _emitsLeft--;
+            }
+        }
+
+        @Override
+        public void ack(Object id) {
+            long end = System.nanoTime();
+            SentWithTime st = (SentWithTime)id;
+            _histo.recordValue(end-st.time);
+        }
+
+        @Override
+        public void fail(Object id) {
+            SentWithTime st = (SentWithTime)id;
+            _collector.emit(new Values(st.sentence), id);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("sentence"));
+        }
+    }
+
+    public static class SplitSentence extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word: sentence.split("\\s+")) {
+                collector.emit(new Values(word, 1));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null)
+                count = 0;
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    private static class MemMeasure {
+        private long _mem = 0;
+        private long _time = 0;
+
+        public synchronized void update(long mem) {
+            _mem = mem;
+            _time = System.currentTimeMillis();
+        }
+
+        public synchronized long get() {
+            return isExpired() ? 0l : _mem;
+        }
+
+        public synchronized boolean isExpired() {
+            return (System.currentTimeMillis() - _time) >= 20000;
+        }
+    }
+
+    private static final Histogram _histo = new Histogram(3600000000000L, 3);
+    private static final AtomicLong _systemCPU = new AtomicLong(0);
+    private static final AtomicLong _userCPU = new AtomicLong(0);
+    private static final AtomicLong _gcCount = new AtomicLong(0);
+    private static final AtomicLong _gcMs = new AtomicLong(0);
+    private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
+
+    private static long readMemory() {
+        long total = 0;
+        for (MemMeasure mem: _memoryBytes.values()) {
+            total += mem.get();
+        }
+        return total;
+    }
+
+    private static long _prev_acked = 0;
+    private static long _prev_uptime = 0;
+
+    public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
+        ClusterSummary summary = client.getClusterInfo();
+        String id = null;
+        for (TopologySummary ts: summary.get_topologies()) {
+            if (name.equals(ts.get_name())) {
+                id = ts.get_id();
+            }
+        }
+        if (id == null) {
+            throw new Exception("Could not find a topology named "+name);
+        }
+        TopologyInfo info = client.getTopologyInfo(id);
+        int uptime = info.get_uptime_secs();
+        long acked = 0;
+        long failed = 0;
+        for (ExecutorSummary exec: info.get_executors()) {
+            if ("spout".equals(exec.get_component_id()) && exec.get_stats() != null && exec.get_stats().get_specific() != null) {
+                SpoutStats stats = exec.get_stats().get_specific().get_spout();
+                Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+                Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+                if (ackedMap != null) {
+                    for (String key: ackedMap.keySet()) {
+                        if (failedMap != null) {
+                            Long tmp = failedMap.get(key);
+                            if (tmp != null) {
+                                failed += tmp;
+                            }
+                        }
+                        long ackVal = ackedMap.get(key);
+                        acked += ackVal;
+                    }
+                }
+            }
+        }
+        long ackedThisTime = acked - _prev_acked;
+        long thisTime = uptime - _prev_uptime;
+        long nnpct, nnnpct, min, max;
+        double mean, stddev;
+        synchronized(_histo) {
+            nnpct = _histo.getValueAtPercentile(99.0);
+            nnnpct = _histo.getValueAtPercentile(99.9);
+            min = _histo.getMinValue();
+            max = _histo.getMaxValue();
+            mean = _histo.getMean();
+            stddev = _histo.getStdDeviation();
+            _histo.reset();
+        }
+        long user = _userCPU.getAndSet(0);
+        long sys = _systemCPU.getAndSet(0);
+        long gc = _gcMs.getAndSet(0);
+        double memMB = readMemory() / (1024.0 * 1024.0);
+        System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
+                "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
+                "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
+                uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
+                min, max, mean, stddev, user, sys, gc, memMB);
+        _prev_uptime = uptime;
+        _prev_acked = acked;
+    }
+
+    public static void kill(Nimbus.Iface client, String name) throws Exception {
+        KillOptions opts = new KillOptions();
+        opts.set_wait_secs(0);
+        client.killTopologyWithOpts(name, opts);
+    }
+
+    public static void main(String[] args) throws Exception {
+        long ratePerSecond = 500;
+        if (args != null && args.length > 0) {
+            ratePerSecond = Long.valueOf(args[0]);
+        }
+
+        int parallelism = 4;
+        if (args != null && args.length > 1) {
+            parallelism = Integer.valueOf(args[1]);
+        }
+
+        int numMins = 5;
+        if (args != null && args.length > 2) {
+            numMins = Integer.valueOf(args[2]);
+        }
+
+        String name = "wc-test";
+        if (args != null && args.length > 3) {
+            name = args[3];
+        }
+
+        Config conf = new Config();
+        HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
+            @Override
+            public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+                String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
+                for (DataPoint dp: dataPoints) {
+                    if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
+                        synchronized(_histo) {
+                            _histo.add((Histogram)dp.value);
+                        }
+                    } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
+                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                        Object sys = m.get("sys-ms");
+                        if (sys instanceof Number) {
+                            _systemCPU.getAndAdd(((Number)sys).longValue());
+                        }
+                        Object user = m.get("user-ms");
+                        if (user instanceof Number) {
+                            _userCPU.getAndAdd(((Number)user).longValue());
+                        }
+                    } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
+                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                        Object count = m.get("count");
+                        if (count instanceof Number) {
+                            _gcCount.getAndAdd(((Number)count).longValue());
+                        }
+                        Object time = m.get("timeMs");
+                        if (time instanceof Number) {
+                            _gcMs.getAndAdd(((Number)time).longValue());
+                        }
+                    } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
+                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                        Object val = m.get("usedBytes");
+                        if (val instanceof Number) {
+                            MemMeasure mm = _memoryBytes.get(worker);
+                            if (mm == null) {
+                                mm = new MemMeasure();
+                                MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
+                                mm = tmp == null ? mm : tmp; 
+                            }
+                            mm.update(((Number)val).longValue());
+                        }
+                    }
+                }
+            }
+        };
+
+        metricServer.serve();
+        String url = metricServer.getUrl();
+
+        NimbusClient client = NimbusClient.getConfiguredClient(conf);
+        conf.setNumWorkers(parallelism);
+        conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
+        conf.registerMetricsConsumer(org.apache.storm.misc.metric.HttpForwardingMetricsConsumer.class, url, 1);
+        Map<String, String> workerMetrics = new HashMap<String, String>();
+        if (!NimbusClient.isLocalOverride()) {
+            //sigar uses JNI and does not work in local mode
+            workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+        }
+        conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
+        conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+        conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
+                "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
+        conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        int numEach = 4 * parallelism;
+        builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
+
+        builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
+
+        try {
+            StormSubmitter.submitTopology(name, conf, builder.createTopology());
+
+            for (int i = 0; i < numMins * 2; i++) {
+                Thread.sleep(30 * 1000);
+                printMetrics(client.getClient(), name);
+            }
+        } finally {
+            kill(client.getClient(), name);
+            System.exit(0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 2ef18f2..e95958d 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -90,11 +90,6 @@
     </dependency>
       <dependency>
           <groupId>org.apache.storm</groupId>
-          <artifactId>storm-client-misc</artifactId>
-          <version>${project.version}</version>
-      </dependency>
-      <dependency>
-          <groupId>org.apache.storm</groupId>
           <artifactId>storm-client</artifactId>
           <version>${project.version}</version>
           <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
deleted file mode 100644
index 96c13c5..0000000
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.starter;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.HdrHistogram.Histogram;
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.ClusterSummary;
-import org.apache.storm.generated.ExecutorSummary;
-import org.apache.storm.generated.KillOptions;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.generated.SpoutStats;
-import org.apache.storm.generated.TopologyInfo;
-import org.apache.storm.generated.TopologySummary;
-import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
-import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
-import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
-import org.apache.storm.misc.metric.HttpForwardingMetricsServer;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.NimbusClient;
-
-/**
- * WordCount but the spout goes at a predefined rate and we collect
- * proper latency statistics.
- */
-public class ThroughputVsLatency {
-    private static class SentWithTime {
-        public final String sentence;
-        public final long time;
-
-        SentWithTime(String sentence, long time) {
-            this.sentence = sentence;
-            this.time = time;
-        }
-    }
-
-    public static class FastRandomSentenceSpout extends BaseRichSpout {
-        static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
-                "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
-
-        SpoutOutputCollector _collector;
-        long _periodNano;
-        long _emitAmount;
-        Random _rand;
-        long _nextEmitTime;
-        long _emitsLeft;
-        HistogramMetric _histo;
-
-        public FastRandomSentenceSpout(long ratePerSecond) {
-            if (ratePerSecond > 0) {
-                _periodNano = Math.max(1, 1000000000/ratePerSecond);
-                _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
-            } else {
-                _periodNano = Long.MAX_VALUE - 1;
-                _emitAmount = 1;
-            }
-        }
-
-        @Override
-        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
-            _collector = collector;
-            _rand = ThreadLocalRandom.current();
-            _nextEmitTime = System.nanoTime();
-            _emitsLeft = _emitAmount;
-            _histo = new HistogramMetric(3600000000000L, 3);
-            context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
-        }
-
-        @Override
-        public void nextTuple() {
-            if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
-                _emitsLeft = _emitAmount;
-                _nextEmitTime = _nextEmitTime + _periodNano;
-            }
-
-            if (_emitsLeft > 0) {
-                String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
-                _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
-                _emitsLeft--;
-            }
-        }
-
-        @Override
-        public void ack(Object id) {
-            long end = System.nanoTime();
-            SentWithTime st = (SentWithTime)id;
-            _histo.recordValue(end-st.time);
-        }
-
-        @Override
-        public void fail(Object id) {
-            SentWithTime st = (SentWithTime)id;
-            _collector.emit(new Values(st.sentence), id);
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("sentence"));
-        }
-    }
-
-    public static class SplitSentence extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word: sentence.split("\\s+")) {
-                collector.emit(new Values(word, 1));
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
-        }
-    }
-
-    public static class WordCount extends BaseBasicBolt {
-        Map<String, Integer> counts = new HashMap<String, Integer>();
-
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String word = tuple.getString(0);
-            Integer count = counts.get(word);
-            if (count == null)
-                count = 0;
-            count++;
-            counts.put(word, count);
-            collector.emit(new Values(word, count));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
-        }
-    }
-
-    private static class MemMeasure {
-        private long _mem = 0;
-        private long _time = 0;
-
-        public synchronized void update(long mem) {
-            _mem = mem;
-            _time = System.currentTimeMillis();
-        }
-
-        public synchronized long get() {
-            return isExpired() ? 0l : _mem;
-        }
-
-        public synchronized boolean isExpired() {
-            return (System.currentTimeMillis() - _time) >= 20000;
-        }
-    }
-
-    private static final Histogram _histo = new Histogram(3600000000000L, 3);
-    private static final AtomicLong _systemCPU = new AtomicLong(0);
-    private static final AtomicLong _userCPU = new AtomicLong(0);
-    private static final AtomicLong _gcCount = new AtomicLong(0);
-    private static final AtomicLong _gcMs = new AtomicLong(0);
-    private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
-
-    private static long readMemory() {
-        long total = 0;
-        for (MemMeasure mem: _memoryBytes.values()) {
-            total += mem.get();
-        }
-        return total;
-    }
-
-    private static long _prev_acked = 0;
-    private static long _prev_uptime = 0;
-
-    public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
-        ClusterSummary summary = client.getClusterInfo();
-        String id = null;
-        for (TopologySummary ts: summary.get_topologies()) {
-            if (name.equals(ts.get_name())) {
-                id = ts.get_id();
-            }
-        }
-        if (id == null) {
-            throw new Exception("Could not find a topology named "+name);
-        }
-        TopologyInfo info = client.getTopologyInfo(id);
-        int uptime = info.get_uptime_secs();
-        long acked = 0;
-        long failed = 0;
-        for (ExecutorSummary exec: info.get_executors()) {
-            if ("spout".equals(exec.get_component_id()) && exec.get_stats() != null && exec.get_stats().get_specific() != null) {
-                SpoutStats stats = exec.get_stats().get_specific().get_spout();
-                Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-                Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-                if (ackedMap != null) {
-                    for (String key: ackedMap.keySet()) {
-                        if (failedMap != null) {
-                            Long tmp = failedMap.get(key);
-                            if (tmp != null) {
-                                failed += tmp;
-                            }
-                        }
-                        long ackVal = ackedMap.get(key);
-                        acked += ackVal;
-                    }
-                }
-            }
-        }
-        long ackedThisTime = acked - _prev_acked;
-        long thisTime = uptime - _prev_uptime;
-        long nnpct, nnnpct, min, max;
-        double mean, stddev;
-        synchronized(_histo) {
-            nnpct = _histo.getValueAtPercentile(99.0);
-            nnnpct = _histo.getValueAtPercentile(99.9);
-            min = _histo.getMinValue();
-            max = _histo.getMaxValue();
-            mean = _histo.getMean();
-            stddev = _histo.getStdDeviation();
-            _histo.reset();
-        }
-        long user = _userCPU.getAndSet(0);
-        long sys = _systemCPU.getAndSet(0);
-        long gc = _gcMs.getAndSet(0);
-        double memMB = readMemory() / (1024.0 * 1024.0);
-        System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
-                "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
-                "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
-                uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
-                min, max, mean, stddev, user, sys, gc, memMB);
-        _prev_uptime = uptime;
-        _prev_acked = acked;
-    }
-
-    public static void kill(Nimbus.Iface client, String name) throws Exception {
-        KillOptions opts = new KillOptions();
-        opts.set_wait_secs(0);
-        client.killTopologyWithOpts(name, opts);
-    }
-
-    public static void main(String[] args) throws Exception {
-        long ratePerSecond = 500;
-        if (args != null && args.length > 0) {
-            ratePerSecond = Long.valueOf(args[0]);
-        }
-
-        int parallelism = 4;
-        if (args != null && args.length > 1) {
-            parallelism = Integer.valueOf(args[1]);
-        }
-
-        int numMins = 5;
-        if (args != null && args.length > 2) {
-            numMins = Integer.valueOf(args[2]);
-        }
-
-        String name = "wc-test";
-        if (args != null && args.length > 3) {
-            name = args[3];
-        }
-
-        Config conf = new Config();
-        HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
-            @Override
-            public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-                String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
-                for (DataPoint dp: dataPoints) {
-                    if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
-                        synchronized(_histo) {
-                            _histo.add((Histogram)dp.value);
-                        }
-                    } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
-                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                        Object sys = m.get("sys-ms");
-                        if (sys instanceof Number) {
-                            _systemCPU.getAndAdd(((Number)sys).longValue());
-                        }
-                        Object user = m.get("user-ms");
-                        if (user instanceof Number) {
-                            _userCPU.getAndAdd(((Number)user).longValue());
-                        }
-                    } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
-                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                        Object count = m.get("count");
-                        if (count instanceof Number) {
-                            _gcCount.getAndAdd(((Number)count).longValue());
-                        }
-                        Object time = m.get("timeMs");
-                        if (time instanceof Number) {
-                            _gcMs.getAndAdd(((Number)time).longValue());
-                        }
-                    } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
-                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                        Object val = m.get("usedBytes");
-                        if (val instanceof Number) {
-                            MemMeasure mm = _memoryBytes.get(worker);
-                            if (mm == null) {
-                                mm = new MemMeasure();
-                                MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
-                                mm = tmp == null ? mm : tmp; 
-                            }
-                            mm.update(((Number)val).longValue());
-                        }
-                    }
-                }
-            }
-        };
-
-        metricServer.serve();
-        String url = metricServer.getUrl();
-
-        NimbusClient client = NimbusClient.getConfiguredClient(conf);
-        conf.setNumWorkers(parallelism);
-        conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
-        conf.registerMetricsConsumer(org.apache.storm.misc.metric.HttpForwardingMetricsConsumer.class, url, 1);
-        Map<String, String> workerMetrics = new HashMap<String, String>();
-        if (!NimbusClient.isLocalOverride()) {
-            //sigar uses JNI and does not work in local mode
-            workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
-        }
-        conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
-        conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
-        conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
-                "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
-        conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        int numEach = 4 * parallelism;
-        builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
-
-        builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
-        builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
-
-        try {
-            StormSubmitter.submitTopology(name, conf, builder.createTopology());
-
-            for (int i = 0; i < numMins * 2; i++) {
-                Thread.sleep(30 * 1000);
-                printMetrics(client.getClient(), name);
-            }
-        } finally {
-            kill(client.getClient(), name);
-            System.exit(0);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 006387b..02a2ba6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -326,7 +326,6 @@
         <module>storm-buildtools/maven-shade-clojure-transformer</module>
         <module>storm-buildtools/storm-maven-plugins</module>
         <module>storm-client</module>
-        <module>storm-client-misc</module>
         <module>storm-server</module>
         <module>storm-core</module>
         <module>storm-webapp</module>
@@ -362,6 +361,7 @@
 
         <!-- examples -->
         <module>examples/storm-starter</module>
+	<module>examples/storm-loadgen</module>
         <module>examples/storm-mongodb-examples</module>
         <module>examples/storm-redis-examples</module>
         <module>examples/storm-opentsdb-examples</module>

http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/storm-client-misc/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client-misc/pom.xml b/storm-client-misc/pom.xml
deleted file mode 100644
index 276e2c8..0000000
--- a/storm-client-misc/pom.xml
+++ /dev/null
@@ -1,63 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~ http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.0.0-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>storm-client-misc</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.eclipse.jetty</groupId>
-            <artifactId>jetty-server</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.eclipse.jetty</groupId>
-            <artifactId>jetty-servlet</artifactId>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <!--Note - the version would be inherited-->
-                <configuration>
-                    <maxAllowedViolations>39</maxAllowedViolations>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/storm-client-misc/src/main/java/org/apache/storm/misc/metric/HttpForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/storm-client-misc/src/main/java/org/apache/storm/misc/metric/HttpForwardingMetricsConsumer.java b/storm-client-misc/src/main/java/org/apache/storm/misc/metric/HttpForwardingMetricsConsumer.java
deleted file mode 100644
index aa4579c..0000000
--- a/storm-client-misc/src/main/java/org/apache/storm/misc/metric/HttpForwardingMetricsConsumer.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.misc.metric;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.net.URL;
-import java.net.HttpURLConnection;
-
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.storm.serialization.KryoValuesSerializer;
-
-import org.apache.storm.metric.api.IMetricsConsumer;
-import org.apache.storm.task.IErrorReporter;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * Listens for all metrics and POSTs them serialized to a configured URL
- *
- * To use, add this to your topology's configuration:
- *
- * ```java
- *   conf.registerMetricsConsumer(org.apache.storm.metrics.HttpForwardingMetricsConsumer.class, "http://example.com:8080/metrics/my-topology/", 1);
- * ```
- *
- * The body of the post is data serialized using {@link org.apache.storm.serialization.KryoValuesSerializer}, with the data passed in
- * as a list of `[TaskInfo, Collection<DataPoint>]`.  More things may be appended to the end of the list in the future.
- *
- * The values can be deserialized using the org.apache.storm.serialization.KryoValuesDeserializer, and a 
- * correct config + classpath.
- *
- * @see org.apache.storm.serialization.KryoValuesSerializer
- */
-public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
-    private transient URL _url; 
-    private transient IErrorReporter _errorReporter;
-    private transient KryoValuesSerializer _serializer;
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { 
-        try {
-            _url = new URL((String)registrationArgument);
-            _errorReporter = errorReporter;
-            _serializer = new KryoValuesSerializer(topoConf);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-        try {
-            HttpURLConnection con = (HttpURLConnection)_url.openConnection();
-            con.setRequestMethod("POST");
-            con.setDoOutput(true);
-            Output out = new Output(con.getOutputStream());
-            _serializer.serializeInto(Arrays.asList(taskInfo, dataPoints), out);
-            out.flush();
-            out.close();
-            //The connection is not sent unless a response is requested
-            int response = con.getResponseCode();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void cleanup() { }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0d10b8af/storm-client-misc/src/main/java/org/apache/storm/misc/metric/HttpForwardingMetricsServer.java
----------------------------------------------------------------------
diff --git a/storm-client-misc/src/main/java/org/apache/storm/misc/metric/HttpForwardingMetricsServer.java b/storm-client-misc/src/main/java/org/apache/storm/misc/metric/HttpForwardingMetricsServer.java
deleted file mode 100644
index ef2769a..0000000
--- a/storm-client-misc/src/main/java/org/apache/storm/misc/metric/HttpForwardingMetricsServer.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.misc.metric;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.List;
-import java.net.ServerSocket;
-import java.net.InetAddress;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.ServletException;
-
-import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
-import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
-
-import com.esotericsoftware.kryo.io.Input;
-import org.apache.storm.serialization.KryoValuesDeserializer;
-import org.apache.storm.utils.Utils;
-
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-
-/**
- * A server that can listen for metrics from the HttpForwardingMetricsConsumer.
- */
-public abstract class HttpForwardingMetricsServer {
-    private Map _conf;
-    private Server _server = null;
-    private int _port = -1;
-    private String _url = null;
-
-    ThreadLocal<KryoValuesDeserializer> _des = new ThreadLocal<KryoValuesDeserializer>() {
-        @Override
-        protected KryoValuesDeserializer initialValue() {
-            return new KryoValuesDeserializer(_conf);
-        }
-    };
-
-    private class MetricsCollectionServlet extends HttpServlet
-    {
-        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
-        {
-            Input in = new Input(request.getInputStream());
-            List<Object> metrics = _des.get().deserializeFrom(in);
-            handle((TaskInfo)metrics.get(0), (Collection<DataPoint>)metrics.get(1));
-            response.setStatus(HttpServletResponse.SC_OK);
-        }
-    }
-
-    public HttpForwardingMetricsServer(Map<String, Object> conf) {
-        _conf = Utils.readStormConfig();
-        if (conf != null) {
-            _conf.putAll(conf);
-        }
-    }
-
-    //This needs to be thread safe
-    public abstract void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
-
-    public void serve(Integer port) {
-        try {
-            if (_server != null) throw new RuntimeException("The server is already running");
-    
-            if (port == null || port <= 0) {
-                ServerSocket s = new ServerSocket(0);
-                port = s.getLocalPort();
-                s.close();
-            }
-            _server = new Server(port);
-            _port = port;
-            _url = "http://"+InetAddress.getLocalHost().getHostName()+":"+_port+"/";
- 
-            ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-            context.setContextPath("/");
-            _server.setHandler(context);
- 
-            context.addServlet(new ServletHolder(new MetricsCollectionServlet()),"/*");
-
-            _server.start();
-         } catch (RuntimeException e) {
-             throw e;
-         } catch (Exception e) {
-             throw new RuntimeException(e);
-         }
-    }
-
-    public void serve() {
-        serve(null);
-    }
-
-    public int getPort() {
-        return _port;
-    }
-
-    public String getUrl() {
-        return _url;
-    }
-}


Mime
View raw message