ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From avija...@apache.org
Subject ambari git commit: AMBARI-17080 Support Storm 1.0 in Ambari Metrics for Storm (dsen)
Date Tue, 05 Jul 2016 21:40:06 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 25ed05ce8 -> 57fccc039


AMBARI-17080 Support Storm 1.0 in Ambari Metrics for Storm (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/57fccc03
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/57fccc03
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/57fccc03

Branch: refs/heads/trunk
Commit: 57fccc039c00b301836cf74c15016734fedd64ca
Parents: 25ed05c
Author: Aravindan Vijayan <avijayan@hortonworks.com>
Authored: Tue Jul 5 14:38:55 2016 -0700
Committer: Aravindan Vijayan <avijayan@hortonworks.com>
Committed: Tue Jul 5 14:38:55 2016 -0700

----------------------------------------------------------------------
 ambari-metrics/ambari-metrics-assembly/pom.xml  |  27 +-
 .../src/main/assembly/sink-windows.xml          |   8 +
 .../src/main/assembly/sink.xml                  |   8 +
 ambari-metrics/ambari-metrics-common/pom.xml    |  58 ++++-
 .../ambari-metrics-storm-sink-legacy/pom.xml    | 207 +++++++++++++++
 .../src/main/assemblies/empty.xml               |  21 ++
 .../storm/StormTimelineMetricsReporter.java     | 193 ++++++++++++++
 .../sink/storm/StormTimelineMetricsSink.java    | 250 +++++++++++++++++++
 .../storm/StormTimelineMetricsSinkTest.java     | 112 +++++++++
 .../ambari-metrics-storm-sink/pom.xml           |   3 +-
 .../src/main/conf/storm-metrics2.properties.j2  |  22 --
 .../hadoop/metrics2/sink/storm/NumberUtil.java  |  38 +++
 .../storm/StormTimelineMetricsReporter.java     | 194 ++++++++------
 .../sink/storm/StormTimelineMetricsSink.java    |  12 +-
 .../storm/StormTimelineMetricsSinkTest.java     |   2 +-
 ambari-metrics/pom.xml                          |   6 +
 .../0.1.0/configuration/storm-site.xml          |  18 ++
 .../STORM/0.9.1/package/scripts/params_linux.py |   3 +-
 .../STORM/0.9.1/package/scripts/storm.py        |   9 +-
 .../STORM/0.9.1/package/scripts/ui_server.py    |  10 +-
 .../templates/storm-metrics2.properties.j2      |   1 +
 .../services/STORM/configuration/storm-site.xml |   4 +
 .../stacks/2.1/STORM/test_storm_nimbus.py       |   4 +-
 .../stacks/2.1/STORM/test_storm_ui_server.py    |   8 +-
 24 files changed, 1103 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/pom.xml b/ambari-metrics/ambari-metrics-assembly/pom.xml
index 941c3aa..174f65f 100644
--- a/ambari-metrics/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics/ambari-metrics-assembly/pom.xml
@@ -38,6 +38,7 @@
     <grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
     <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
+    <storm-sink-legacy.dir>${project.basedir}/../ambari-metrics-storm-sink-legacy</storm-sink-legacy.dir>
     <flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir>
     <kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir>
     <python.ver>python &gt;= 2.6</python.ver>
@@ -51,6 +52,7 @@
     <deb.dependency.list>${deb.python.ver},python-dev,gcc</deb.dependency.list>
     <hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar>
     <storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar>
+    <storm.sink.legacy.jar>ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</storm.sink.legacy.jar>
     <flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar>
     <kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar>
   </properties>
@@ -391,6 +393,14 @@
                       </sources>
                     </mapping>
                     <mapping>
+                      <directory>/usr/lib/storm/lib</directory>
+                      <sources>
+                        <source>
+                          <location>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</location>
+                        </source>
+                      </sources>
+                    </mapping>
+                    <mapping>
                       <directory>/usr/lib/ambari-metrics-kafka-sink</directory>
                       <sources>
                         <source>
@@ -934,7 +944,7 @@
                   </mapper>
                 </data>
 
-                <!-- storm sink -->
+                <!-- storm sinks -->
 
                 <data>
                   <src>${storm-sink.dir}/target/${storm.sink.jar}</src>
@@ -946,6 +956,16 @@
                     <prefix>/usr/lib/storm/lib</prefix>
                   </mapper>
                 </data>
+                <data>
+                  <src>${storm-sink-legacy.dir}/target/${storm.sink.legacy.jar}</src>
+                  <type>file</type>
+                  <mapper>
+                    <type>perm</type>
+                    <filemode>644</filemode>
+                    <dirmode>755</dirmode>
+                    <prefix>/usr/lib/storm/lib</prefix>
+                  </mapper>
+                </data>
 
                 <!-- kafka sink -->
 
@@ -1253,6 +1273,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-storm-sink-legacy</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
       <artifactId>ambari-metrics-kafka-sink</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
index e82d2d4..14b49b2 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
@@ -39,6 +39,10 @@
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
     <fileSet>
+      <directory>${storm-sink-legacy.dir}/src/main/conf</directory>
+      <outputDirectory>hadoop-sink/conf</outputDirectory>
+    </fileSet>
+    <fileSet>
       <directory>${kafka-sink.dir}/target/lib</directory>
       <outputDirectory>hadoop-sink/lib</outputDirectory>
     </fileSet>
@@ -58,6 +62,10 @@
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
     <file>
+      <source>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</source>
+      <outputDirectory>hadoop-sink</outputDirectory>
+    </file>
+    <file>
       <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
index 4a3b7c5..f548808 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
@@ -39,6 +39,10 @@
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
     <fileSet>
+      <directory>${storm-sink-legacy.dir}/src/main/conf</directory>
+      <outputDirectory>hadoop-sink/conf</outputDirectory>
+    </fileSet>
+    <fileSet>
       <directory>${kafka-sink.dir}/target/lib</directory>
       <outputDirectory>hadoop-sink/lib</outputDirectory>
     </fileSet>
@@ -58,6 +62,10 @@
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
     <file>
+      <source>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</source>
+      <outputDirectory>hadoop-sink</outputDirectory>
+    </file>
+    <file>
       <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
index 3e11e7f..0e66a6d 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -53,6 +53,62 @@
           <controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.3</version>
+        <executions>
+          <!-- Run shade goal on package phase -->
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <minimizeJar>true</minimizeJar>
+              <createDependencyReducedPom>false</createDependencyReducedPom>
+              <relocations>
+                <relocation>
+                  <pattern>com.google</pattern>
+                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.google</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.commons.io</pattern>
+                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.io</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.curator</pattern>
+                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.curator</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.jute</pattern>
+                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jute</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.zookeeper</pattern>
+                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.zookeeper</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.slf4j</pattern>
+                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.slf4j</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.log4j</pattern>
+                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.log4j</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>jline</pattern>
+                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jline</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.jboss</pattern>
+                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jboss</shadedPattern>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 
@@ -62,7 +118,6 @@
       <artifactId>commons-logging</artifactId>
       <version>1.1.1</version>
     </dependency>
-    <!-- TODO: Need to add these as shaded dependencies -->
     <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
@@ -83,7 +138,6 @@
       <artifactId>curator-framework</artifactId>
       <version>2.7.1</version>
     </dependency>
-    <!--  END TODO -->
     <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-xc</artifactId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink-legacy/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/pom.xml b/ambari-metrics/ambari-metrics-storm-sink-legacy/pom.xml
new file mode 100644
index 0000000..4fc4d17
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/pom.xml
@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>ambari-metrics</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>2.0.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>ambari-metrics-storm-sink-legacy</artifactId>
+  <version>2.0.0.0-SNAPSHOT</version>
+  <name>Ambari Metrics Storm Sink (Legacy)</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <!--<storm.version>0.9.3.2.2.1.0-2340</storm.version>-->
+    <storm.version>0.10.0.2.3.0.0-2557</storm.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.0</version>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.8</version>
+        <executions>
+          <execution>
+            <id>parse-version</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>parse-version</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>regex-property</id>
+            <goals>
+              <goal>regex-property</goal>
+            </goals>
+            <configuration>
+              <name>ambariVersion</name>
+              <value>${project.version}</value>
+              <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
+              <replacement>$1.$2.$3.$4</replacement>
+              <failIfNoMatch>false</failIfNoMatch>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>com.github.goldin</groupId>
+        <artifactId>copy-maven-plugin</artifactId>
+        <version>0.2.5</version>
+        <executions>
+          <execution>
+            <id>create-archive</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.2</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <outputFile>${project.build.directory}/${project.artifactId}-with-common-${project.version}.jar</outputFile>
+          <minimizeJar>false</minimizeJar>
+          <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
+          <artifactSet>
+            <includes>
+              <include>org.apache.ambari:ambari-metrics-common</include>
+              <include>org.codehaus.jackson:jackson-mapper-asl</include>
+              <include>org.codehaus.jackson:jackson-core-asl</include>
+              <include>org.codehaus.jackson:jackson-xc</include>
+              <include>org.apache.hadoop:hadoop-annotations</include>
+              <include>commons-logging:commons-logging</include>
+              <include>org.apache.commons:commons-lang3</include>
+              <include>commons-codec:commons-codec</include>
+            </includes>
+          </artifactSet>
+          <relocations>
+            <relocation>
+              <pattern>org.apache.commons.logging</pattern>
+              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.logging</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.apache.hadoop.classification</pattern>
+              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.hadoop.classification</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.codehaus.jackson</pattern>
+              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jackson</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.apache.commons.lang3</pattern>
+              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.lang3</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.apache.commons.codec</pattern>
+              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.codec</shadedPattern>
+            </relocation>
+          </relocations>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.vafer</groupId>
+        <artifactId>jdeb</artifactId>
+        <version>1.0.1</version>
+        <executions>
+          <execution>
+            <!--Stub execution on direct plugin call - workaround for ambari deb build process-->
+            <id>stub-execution</id>
+            <phase>none</phase>
+            <goals>
+              <goal>jdeb</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <skip>true</skip>
+          <attach>false</attach>
+          <submodules>false</submodules>
+          <controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.3.2</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>1.8</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${storm.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+      <version>4.10</version>
+    </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <version>3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-easymock</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml
new file mode 100644
index 0000000..35738b1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml
@@ -0,0 +1,21 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+    <id>empty</id>
+    <formats/>
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
new file mode 100644
index 0000000..73381d9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.SupervisorSummary;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.metric.IClusterReporter;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
+    implements IClusterReporter {
+
+  public static final String METRICS_COLLECTOR_CATEGORY = "metrics_collector";
+  public static final String APP_ID = "appId";
+
+  private String hostname;
+  private String collectorUri;
+  private String port;
+  private String collectors;
+  private String zkQuorum;
+  private String protocol;
+  private NimbusClient nimbusClient;
+  private String applicationId;
+  private int timeoutSeconds;
+
+  public StormTimelineMetricsReporter() {
+
+  }
+
+  @Override
+  protected String getCollectorUri(String host) {
+    return constructTimelineMetricUri(protocol, host, port);
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
+  }
+
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  @Override
+  protected String getZookeeperQuorum() {
+    return zkQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
+  @Override
+  public void prepare(Map conf) {
+    LOG.info("Preparing Storm Metrics Reporter");
+    try {
+      try {
+        hostname = InetAddress.getLocalHost().getHostName();
+        // If not FQDN , call  DNS
+        if ((hostname == null) || (!hostname.contains("."))) {
+          hostname = InetAddress.getLocalHost().getCanonicalHostName();
+        }
+      } catch (UnknownHostException e) {
+        LOG.error("Could not identify hostname.");
+        throw new RuntimeException("Could not identify hostname.", e);
+      }
+      Validate.notNull(conf.get(METRICS_COLLECTOR_CATEGORY), METRICS_COLLECTOR_CATEGORY + " can not be null");
+      Map cf = (Map) conf.get(METRICS_COLLECTOR_CATEGORY);
+      Map stormConf = Utils.readStormConfig();
+      this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
+
+      collectors = cf.get(COLLECTOR_PROPERTY).toString();
+      protocol = cf.get(COLLECTOR_PROTOCOL) != null ? cf.get(COLLECTOR_PROTOCOL).toString() : "http";
+      port = cf.get(COLLECTOR_PORT) != null ? cf.get(COLLECTOR_PORT).toString() : "6188";
+      zkQuorum = cf.get(ZOOKEEPER_QUORUM) != null ? cf.get(ZOOKEEPER_QUORUM).toString() : null;
+
+      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
+          Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
+          DEFAULT_POST_TIMEOUT_SECONDS;
+      applicationId = cf.get(APP_ID).toString();
+
+      collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
+      if (protocol.contains("https")) {
+        String trustStorePath = cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
+        String trustStoreType = cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
+        String trustStorePwd = cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
+        loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
+      }
+    } catch (Exception e) {
+      LOG.warn("Could not initialize metrics collector, please specify " +
+          "protocol, host, port under $STORM_HOME/conf/config.yaml ", e);
+    }
+    // Initialize the collector write strategy
+    super.init();
+  }
+
+  @Override
+  public void reportMetrics() throws Exception {
+    List<TimelineMetric> totalMetrics = new ArrayList<TimelineMetric>(7);
+    ClusterSummary cs = this.nimbusClient.getClient().getClusterInfo();
+    long currentTimeMillis = System.currentTimeMillis();
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+        applicationId, "Supervisors", String.valueOf(cs.get_supervisors_size())));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+        applicationId, "Topologies", String.valueOf(cs.get_topologies_size())));
+
+    List<SupervisorSummary> sups = cs.get_supervisors();
+    int totalSlots = 0;
+    int usedSlots = 0;
+    for (SupervisorSummary ssum : sups) {
+      totalSlots += ssum.get_num_workers();
+      usedSlots += ssum.get_num_used_workers();
+    }
+    int freeSlots = totalSlots - usedSlots;
+
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+        applicationId, "Total Slots", String.valueOf(totalSlots)));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+        applicationId, "Used Slots", String.valueOf(usedSlots)));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+        applicationId, "Free Slots", String.valueOf(freeSlots)));
+
+    List<TopologySummary> topos = cs.get_topologies();
+    int totalExecutors = 0;
+    int totalTasks = 0;
+    for (TopologySummary topo : topos) {
+      totalExecutors += topo.get_num_executors();
+      totalTasks += topo.get_num_tasks();
+    }
+
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+        applicationId, "Total Executors", String.valueOf(totalExecutors)));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+        applicationId, "Total Tasks", String.valueOf(totalTasks)));
+
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    timelineMetrics.setMetrics(totalMetrics);
+
+    try {
+      emitMetrics(timelineMetrics);
+    } catch (UnableToConnectException e) {
+      LOG.warn("Unable to connect to Metrics Collector " + e.getConnectUrl() + ". " + e.getMessage());
+    }
+
+  }
+
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(attributeName);
+    timelineMetric.setHostName(hostname);
+    timelineMetric.setAppId(component);
+    timelineMetric.setStartTime(currentTimeMillis);
+    timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
+    return timelineMetric;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
new file mode 100644
index 0000000..9266e5f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.TopologyContext;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
+
+public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
+  public static final int SYSTEM_BOLT_TASK_ID = -1;
+
+  private String collectorUri;
+  private TimelineMetricsCache metricsCache;
+  private String hostname;
+  private int timeoutSeconds;
+  private String collectors;
+  private String zkQuorum;
+  private String protocol;
+  private String port;
+  private String topologyName;
+
+  @Override
+  protected String getCollectorUri(String host) {
+    return constructTimelineMetricUri(protocol, host, port);
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
+  }
+
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  @Override
+  protected String getZookeeperQuorum() {
+    return zkQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
+  @Override
+  public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
+    LOG.info("Preparing Storm Metrics Sink");
+    try {
+      hostname = InetAddress.getLocalHost().getHostName();
+      //If not FQDN , call  DNS
+      if ((hostname == null) || (!hostname.contains("."))) {
+        hostname = InetAddress.getLocalHost().getCanonicalHostName();
+      }
+    } catch (UnknownHostException e) {
+      LOG.error("Could not identify hostname.");
+      throw new RuntimeException("Could not identify hostname.", e);
+    }
+    Configuration configuration = new Configuration("/storm-metrics2.properties");
+    timeoutSeconds = Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
+        String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
+    int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
+        String.valueOf(MAX_RECS_PER_NAME_DEFAULT)));
+    int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
+        String.valueOf(MAX_EVICTION_TIME_MILLIS)));
+    metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+    collectors = configuration.getProperty(COLLECTOR_PROPERTY);
+    zkQuorum = configuration.getProperty("zookeeper.quorum");
+    protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
+    port = configuration.getProperty(COLLECTOR_PORT, "6188");
+
+    // Initialize the collector write strategy
+    super.init();
+
+    if (protocol.contains("https")) {
+      String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
+      String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
+      String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+      loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
+    }
+    this.topologyName = removeNonce(topologyContext.getStormId());
+  }
+
+  @Override
+  public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+    List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
+    for (DataPoint dataPoint : dataPoints) {
+      LOG.debug(dataPoint.name + " = " + dataPoint.value);
+      List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
+
+      for (DataPoint populatedDataPoint : populatedDataPoints) {
+        String metricName;
+        if (taskInfo.srcTaskId == SYSTEM_BOLT_TASK_ID) {
+          metricName = createMetricNameForSystemBolt(taskInfo, populatedDataPoint.name);
+        } else {
+          metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcTaskId, populatedDataPoint.name);
+        }
+
+        LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value);
+
+        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, taskInfo.srcWorkerHost,
+            metricName, Double.valueOf(populatedDataPoint.value.toString()));
+
+        // Put intermediate values into the cache until it is time to send
+        metricsCache.putTimelineMetric(timelineMetric);
+
+        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(timelineMetric.getMetricName());
+
+        if (cachedMetric != null) {
+          metricList.add(cachedMetric);
+        }
+      }
+    }
+
+    if (!metricList.isEmpty()) {
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(metricList);
+
+      try {
+        emitMetrics(timelineMetrics);
+      } catch (UnableToConnectException uce) {
+        LOG.warn("Unable to send metrics to collector by address:" + uce.getConnectUrl());
+      }
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    LOG.info("Stopping Storm Metrics Sink");
+  }
+
+  private String removeNonce(String topologyId) {
+    return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-"));
+  }
+
+  private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
+    List<DataPoint> dataPoints = new ArrayList<>();
+
+    if (dataPoint.value == null) {
+      LOG.warn("Data point with name " + dataPoint.name + " is null. Discarding." + dataPoint.name);
+    } else if (dataPoint.value instanceof Map) {
+      Map<String, Object> dataMap = (Map<String, Object>) dataPoint.value;
+
+      for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
+        Double value = convertValueToDouble(entry.getKey(), entry.getValue());
+        if (value != null) {
+          dataPoints.add(new DataPoint(dataPoint.name + "." + entry.getKey(), value));
+        }
+      }
+    } else {
+      Double value = convertValueToDouble(dataPoint.name, dataPoint.value);
+      if (value != null) {
+        dataPoints.add(new DataPoint(dataPoint.name, value));
+      }
+    }
+
+    return dataPoints;
+  }
+
+  private Double convertValueToDouble(String metricName, Object value) {
+    if (value instanceof Number) {
+      return ((Number) value).doubleValue();
+    } else if (value instanceof String) {
+      try {
+        return Double.parseDouble((String) value);
+      } catch (NumberFormatException e) {
+        LOG.warn("Data point with name " + metricName + " doesn't have number format value " +
+            value + ". Discarding.");
+      }
+
+      return null;
+    } else {
+      LOG.warn("Data point with name " + metricName + " has value " + value +
+          " which is not supported. Discarding.");
+
+      return null;
+    }
+  }
+
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName,
+                                              String attributeName, Double attributeValue) {
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(attributeName);
+    timelineMetric.setHostName(hostName);
+    timelineMetric.setAppId(topologyName);
+    timelineMetric.setStartTime(currentTimeMillis);
+    timelineMetric.setType(ClassUtils.getShortCanonicalName(
+        attributeValue, "Number"));
+    timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue);
+    return timelineMetric;
+  }
+
+  private String createMetricName(String componentId, int taskId, String attributeName) {
+    String metricName = componentId + "." + taskId + "." + attributeName;
+    // since '._' is treat as special character (separator) so it should be replaced
+    return metricName.replace('_', '-');
+  }
+
+  private String createMetricNameForSystemBolt(TaskInfo taskInfo, String attributeName) {
+    String metricName = taskInfo.srcComponentId + "." + taskInfo.srcWorkerHost + "." +
+        taskInfo.srcWorkerPort + "." + attributeName;
+    // since '._' is treat as special character (separator) so it should be replaced
+    return metricName.replace('_', '-');
+  }
+
+  public void setMetricsCache(TimelineMetricsCache metricsCache) {
+    this.metricsCache = metricsCache;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
new file mode 100644
index 0000000..4ea7396
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_BOLT_TASK_ID;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+
+public class StormTimelineMetricsSinkTest {
+  @Test
+  public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", "value1")));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testNumericMetricMetricSubmission() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testNumericMetricFromSystemBoltMetricSubmission() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.localhost.1234.key1"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_BOLT_TASK_ID, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field1"))
+        .andReturn(new TimelineMetric()).once();
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field2"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+
+    Map<String, Object> valueMap = new HashMap<>();
+    valueMap.put("field1", 53);
+    valueMap.put("field2", 64.12);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap)));
+    verify(timelineMetricsCache);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/pom.xml b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
index ee4d2c3..612ad63 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/pom.xml
+++ b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
@@ -31,8 +31,7 @@ limitations under the License.
   <packaging>jar</packaging>
 
   <properties>
-    <!--<storm.version>0.9.3.2.2.1.0-2340</storm.version>-->
-    <storm.version>0.10.0.2.3.0.0-2557</storm.version>
+    <storm.version>1.1.0-SNAPSHOT</storm.version>
   </properties>
 
   <build>

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2 b/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2
deleted file mode 100644
index 4553224..0000000
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2
+++ /dev/null
@@ -1,22 +0,0 @@
-{#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-collector={{metric_collector_host}}
-port=6188
-maxRowCacheSize=10000
-sendInterval=59000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/NumberUtil.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/NumberUtil.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/NumberUtil.java
new file mode 100644
index 0000000..f41cf0e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/NumberUtil.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+public class NumberUtil {
+  private NumberUtil() {
+  }
+
+  public static Double convertValueToDouble(Object value) {
+    if (value instanceof Number) {
+      return ((Number) value).doubleValue();
+    } else if (value instanceof String) {
+      try {
+        return Double.parseDouble((String) value);
+      } catch (NumberFormatException e) {
+        throw e;
+      }
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 8f98563..9082e70 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -18,37 +18,33 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.SupervisorSummary;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.metric.IClusterReporter;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-import org.apache.commons.lang3.Validate;
+import org.apache.commons.lang3.ClassUtils;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
+import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
+import org.apache.storm.metric.api.DataPoint;
+import org.apache.storm.metric.api.IClusterMetricsConsumer;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
 public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
-  implements IClusterReporter {
+    implements IClusterMetricsConsumer {
 
-  public static final String METRICS_COLLECTOR_CATEGORY = "metrics_collector";
-  public static final String APP_ID = "appId";
+  public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
+  public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "nimbus";
 
   private String hostname;
-  private String collectorUri;
   private String port;
   private String collectors;
   private String zkQuorum;
   private String protocol;
-  private NimbusClient nimbusClient;
   private String applicationId;
   private int timeoutSeconds;
 
@@ -87,7 +83,7 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   }
 
   @Override
-  public void prepare(Map conf) {
+  public void prepare(Object registrationArgument) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
       try {
@@ -100,74 +96,62 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
         LOG.error("Could not identify hostname.");
         throw new RuntimeException("Could not identify hostname.", e);
       }
-      Validate.notNull(conf.get(METRICS_COLLECTOR_CATEGORY), METRICS_COLLECTOR_CATEGORY + " can not be null");
-      Map cf = (Map) conf.get(METRICS_COLLECTOR_CATEGORY);
-      Map stormConf = Utils.readStormConfig();
-      this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
-
-      collectors = cf.get(COLLECTOR_PROPERTY).toString();
-      protocol = cf.get(COLLECTOR_PROTOCOL) != null ? cf.get(COLLECTOR_PROTOCOL).toString() : "http";
-      port = cf.get(COLLECTOR_PORT) != null ? cf.get(COLLECTOR_PORT).toString() : "6188";
-      zkQuorum = cf.get(ZOOKEEPER_QUORUM) != null ? cf.get(ZOOKEEPER_QUORUM).toString() : null;
-
-      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
-        Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
-        DEFAULT_POST_TIMEOUT_SECONDS;
-      applicationId = cf.get(APP_ID).toString();
-
-      collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
+
+      Configuration configuration = new Configuration("/storm-metrics2.properties");
+      collectors = configuration.getProperty(COLLECTOR_PROPERTY);
+      protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
+      port = configuration.getProperty(COLLECTOR_PORT, "6188");
+      zkQuorum = configuration.getProperty(ZOOKEEPER_QUORUM);
+
+      timeoutSeconds = configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS) != null ?
+          Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS)) :
+          DEFAULT_POST_TIMEOUT_SECONDS;
+      applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
+
       if (protocol.contains("https")) {
-        String trustStorePath = cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
-        String trustStoreType = cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
-        String trustStorePwd = cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
+        String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
+        String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
+        String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
         loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
       }
     } catch (Exception e) {
       LOG.warn("Could not initialize metrics collector, please specify " +
-        "protocol, host, port under $STORM_HOME/conf/config.yaml ", e);
+          "protocol, host, port, appId, zkQuorum under $STORM_HOME/conf/storm-metrics2.properties ", e);
     }
     // Initialize the collector write strategy
     super.init();
   }
 
   @Override
-  public void reportMetrics() throws Exception {
-    List<TimelineMetric> totalMetrics = new ArrayList<TimelineMetric>(7);
-    ClusterSummary cs = this.nimbusClient.getClient().getClusterInfo();
-    long currentTimeMillis = System.currentTimeMillis();
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Supervisors", String.valueOf(cs.get_supervisors_size())));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Topologies", String.valueOf(cs.get_topologies_size())));
-
-    List<SupervisorSummary> sups = cs.get_supervisors();
-    int totalSlots = 0;
-    int usedSlots = 0;
-    for (SupervisorSummary ssum : sups) {
-      totalSlots += ssum.get_num_workers();
-      usedSlots += ssum.get_num_used_workers();
-    }
-    int freeSlots = totalSlots - usedSlots;
-
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Total Slots", String.valueOf(totalSlots)));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Used Slots", String.valueOf(usedSlots)));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Free Slots", String.valueOf(freeSlots)));
-
-    List<TopologySummary> topos = cs.get_topologies();
-    int totalExecutors = 0;
-    int totalTasks = 0;
-    for (TopologySummary topo : topos) {
-      totalExecutors += topo.get_num_executors();
-      totalTasks += topo.get_num_tasks();
+  public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) {
+    long timestamp = clusterInfo.getTimestamp();
+    List<TimelineMetric> totalMetrics = new ArrayList<>();
+
+    for (DataPoint dataPoint : dataPoints) {
+      LOG.debug(dataPoint.getName() + " = " + dataPoint.getValue());
+      List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
+
+      for (DataPoint populatedDataPoint : populatedDataPoints) {
+        LOG.debug("Populated datapoint: " + dataPoint.getName() + " = " + dataPoint.getValue());
+
+        try {
+          StormAmbariMappedMetric mappedMetric = StormAmbariMappedMetric
+              .valueOf(populatedDataPoint.getName());
+          TimelineMetric timelineMetric = createTimelineMetric(timestamp * 1000, applicationId,
+              mappedMetric.getAmbariMetricName(),
+              Double.valueOf(populatedDataPoint.getValue().toString()));
+
+          totalMetrics.add(timelineMetric);
+        } catch (IllegalArgumentException e) {
+          // not interested metrics on Ambari, skip
+          LOG.debug("Not interested metrics, skip: " + populatedDataPoint.getName());
+        }
+      }
     }
 
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Total Executors", String.valueOf(totalExecutors)));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Total Tasks", String.valueOf(totalTasks)));
+    if (totalMetrics.size() <= 0) {
+      return;
+    }
 
     TimelineMetrics timelineMetrics = new TimelineMetrics();
     timelineMetrics.setMetrics(totalMetrics);
@@ -177,17 +161,87 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
     } catch (UnableToConnectException e) {
       LOG.warn("Unable to connect to Metrics Collector " + e.getConnectUrl() + ". " + e.getMessage());
     }
+  }
+
+  @Override
+  public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) {
+    // Ambari is not interested on metrics on each supervisor
+  }
+
+  @Override
+  public void cleanup() {
+    LOG.info("Stopping Storm Metrics Reporter");
+  }
+
+  private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
+    List<DataPoint> dataPoints = new ArrayList<>();
+
+    if (dataPoint.getValue() == null) {
+      LOG.warn("Data point with name " + dataPoint.getName() + " is null. Discarding." + dataPoint
+          .getName());
+    } else if (dataPoint.getValue() instanceof Map) {
+      Map<String, Object> dataMap = (Map<String, Object>) dataPoint.getValue();
+
+      for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
+        Double value = convertValueToDouble(entry.getKey(), entry.getValue());
+        if (value != null) {
+          dataPoints.add(new DataPoint(dataPoint.getName() + "." + entry.getKey(), value));
+        }
+      }
+    } else {
+      Double value = convertValueToDouble(dataPoint.getName(), dataPoint.getValue());
+      if (value != null) {
+        dataPoints.add(new DataPoint(dataPoint.getName(), value));
+      }
+    }
+
+    return dataPoints;
+  }
 
+  private Double convertValueToDouble(String metricName, Object value) {
+    try {
+      Double converted = NumberUtil.convertValueToDouble(value);
+      if (converted == null) {
+        LOG.warn("Data point with name " + metricName + " has value " + value +
+            " which is not supported. Discarding.");
+      }
+      return converted;
+    } catch (NumberFormatException e) {
+      LOG.warn("Data point with name " + metricName + " doesn't have number format value " +
+          value + ". Discarding.");
+      return null;
+    }
   }
 
-  private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String component,
+                                              String attributeName, Double attributeValue) {
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostname);
     timelineMetric.setAppId(component);
     timelineMetric.setStartTime(currentTimeMillis);
-    timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
+    timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
+    timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue);
     return timelineMetric;
   }
 
+  enum StormAmbariMappedMetric {
+    supervisors("Supervisors"),
+    topologies("Topologies"),
+    slotsTotal("Total Slots"),
+    slotsUsed("Used Slots"),
+    slotsFree("Free Slots"),
+    executorsTotal("Total Executors"),
+    tasksTotal("Total Tasks");
+
+    private String ambariMetricName;
+
+    StormAmbariMappedMetric(String ambariMetricName) {
+      this.ambariMetricName = ambariMetricName;
+    }
+
+    public String getAmbariMetricName() {
+      return ambariMetricName;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 0ef09d6..d04bc15 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.task.TopologyContext;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -97,11 +97,11 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     }
     Configuration configuration = new Configuration("/storm-metrics2.properties");
     timeoutSeconds = Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
-      String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
+        String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
     int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
         String.valueOf(MAX_RECS_PER_NAME_DEFAULT)));
     int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
-      String.valueOf(MAX_EVICTION_TIME_MILLIS)));
+        String.valueOf(MAX_EVICTION_TIME_MILLIS)));
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
     collectors = configuration.getProperty(COLLECTOR_PROPERTY);
     zkQuorum = configuration.getProperty("zookeeper.quorum");
@@ -218,7 +218,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   }
 
   private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName,
-      String attributeName, Double attributeValue) {
+                                              String attributeName, Double attributeValue) {
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostName);

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 4ea7396..3e9ed34 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import backtype.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.metric.api.IMetricsConsumer;
 
 public class StormTimelineMetricsSinkTest {
   @Test

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index 726a823..6ab8c60 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -26,6 +26,7 @@
     <module>ambari-metrics-flume-sink</module>
     <module>ambari-metrics-kafka-sink</module>
     <module>ambari-metrics-storm-sink</module>
+    <module>ambari-metrics-storm-sink-legacy</module>
     <module>ambari-metrics-timelineservice</module>
     <module>ambari-metrics-host-monitoring</module>
     <module>ambari-metrics-grafana</module>
@@ -77,6 +78,11 @@
       <name>hdp</name>
       <url>http://repo.hortonworks.com/content/groups/public/</url>
     </repository>
+    <repository>
+      <id>apache-snapshots</id>
+      <name>snapshots</name>
+      <url>https://repository.apache.org/content/repositories/snapshots</url>
+    </repository>
   </repositories>
   <dependencyManagement>
     <dependencies>

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml
index 0d029e8..61b4233 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml
@@ -26,4 +26,22 @@
     <description>Topology metrics reporter.</description>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>storm.cluster.metrics.consumer.register</name>
+    <value>[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter"}]</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>topology.metrics.consumer.register</name>
+    <value>[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink", "parallelism.hint": 1}]</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
index abe5084..8ef851f 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
@@ -189,7 +189,8 @@ if has_metric_collector:
   pass
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
-metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink*.jar"
+metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar"
+metric_collector_legacy_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar"
 
 # Collector hosts
 metric_collector_hosts = ""

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/storm.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/storm.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/storm.py
index e04dbe3..db80e3a 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/storm.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/storm.py
@@ -111,9 +111,14 @@ def storm(name=None):
     # On old HDP 2.1 versions, this symlink may also exist and break EU to newer versions
     Link("/usr/lib/storm/lib/ambari-metrics-storm-sink.jar", action="delete")
 
-    Execute(format("{sudo} ln -s {metric_collector_sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+    if Script.get_stack_name() == "HDP" and Script.is_stack_greater_or_equal("2.5"):
+      sink_jar = params.metric_collector_sink_jar
+    else:
+      sink_jar = params.metric_collector_legacy_sink_jar
+
+    Execute(format("{sudo} ln -s {sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
             not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
-            only_if=format("ls {metric_collector_sink_jar}")
+            only_if=format("ls {sink_jar}")
     )
 
   if params.storm_logs_supported:

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/ui_server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/ui_server.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/ui_server.py
index 6551067..ef2c536 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/ui_server.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/ui_server.py
@@ -85,6 +85,7 @@ class UiServerDefault(UiServer):
       stack_select.select("storm-client", params.version)
 
   def link_metrics_sink_jar(self):
+    import params
     # Add storm metrics reporter JAR to storm-ui-server classpath.
     # Remove symlinks. They can be there, if you doing upgrade from HDP < 2.2 to HDP >= 2.2
     Link(format("{storm_lib_dir}/ambari-metrics-storm-sink.jar"),
@@ -92,9 +93,14 @@ class UiServerDefault(UiServer):
     # On old HDP 2.1 versions, this symlink may also exist and break EU to newer versions
     Link("/usr/lib/storm/lib/ambari-metrics-storm-sink.jar", action="delete")
 
-    Execute(format("{sudo} ln -s {metric_collector_sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+    if Script.get_stack_name() == "HDP" and Script.is_stack_greater_or_equal("2.5"):
+      sink_jar = params.metric_collector_sink_jar
+    else:
+      sink_jar = params.metric_collector_legacy_sink_jar
+
+    Execute(format("{sudo} ln -s {sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
             not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
-            only_if=format("ls {metric_collector_sink_jar}")
+            only_if=format("ls {sink_jar}")
             )
 
   def start(self, env, upgrade_type=None):

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2 b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
index ea8b1d0..1f0875f 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
@@ -22,6 +22,7 @@ port={{metric_collector_port}}
 zookeeper.quorum={{zookeeper_quorum}}
 maxRowCacheSize=10000
 sendInterval={{metrics_report_interval}}000
+clusterReporterAppId=nimbus
 
 # HTTPS properties
 truststore.path = {{metric_truststore_path}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-server/src/main/resources/stacks/HDP/2.5/services/STORM/configuration/storm-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/services/STORM/configuration/storm-site.xml b/ambari-server/src/main/resources/stacks/HDP/2.5/services/STORM/configuration/storm-site.xml
index f3bbce8..902fdc9 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/services/STORM/configuration/storm-site.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/services/STORM/configuration/storm-site.xml
@@ -54,4 +54,8 @@
     </value-attributes>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>metrics.reporter.register</name>
+    <on-ambari-upgrade delete="true"/>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
index 2fcb7e2..04ed17b 100644
--- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
@@ -86,9 +86,9 @@ class TestStormNimbus(TestStormBase):
     self.assertResourceCalled('Link', '/usr/lib/storm/lib/ambari-metrics-storm-sink.jar',
                               action = ['delete'],
                               )
-    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar /usr/lib/storm/lib//ambari-metrics-storm-sink.jar',
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar /usr/lib/storm/lib//ambari-metrics-storm-sink.jar',
                               not_if = 'ls /usr/lib/storm/lib//ambari-metrics-storm-sink.jar',
-                              only_if = 'ls /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar',
+                              only_if = 'ls /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar',
                               )
     self.assertResourceCalled('Execute', 'source /etc/storm/conf/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH ; storm nimbus > /var/log/storm/nimbus.out 2>&1 &\n echo $! > /var/run/storm/nimbus.pid',
         path = ['/usr/bin'],

http://git-wip-us.apache.org/repos/asf/ambari/blob/57fccc03/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_ui_server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_ui_server.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_ui_server.py
index f7a2686..689eedc 100644
--- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_ui_server.py
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_ui_server.py
@@ -57,10 +57,10 @@ class TestStormUiServer(TestStormBase):
                               action=['delete'],
                               )
 
-    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar '
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar '
                                          '/usr/lib/storm/lib//ambari-metrics-storm-sink.jar',
                               not_if=format("ls /usr/lib/storm/lib//ambari-metrics-storm-sink.jar"),
-                              only_if=format("ls /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar")
+                              only_if=format("ls /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar")
                               )
     self.assertResourceCalled('Execute', 'source /etc/storm/conf/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH ; storm ui > /var/log/storm/ui.out 2>&1 &\n echo $! > /var/run/storm/ui.pid',
         path = ['/usr/bin'],
@@ -127,10 +127,10 @@ class TestStormUiServer(TestStormBase):
                               action=['delete'],
                               )
 
-    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar '
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar '
                                          '/usr/lib/storm/lib//ambari-metrics-storm-sink.jar',
                               not_if=format("ls /usr/lib/storm/lib//ambari-metrics-storm-sink.jar"),
-                              only_if=format("ls /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar")
+                              only_if=format("ls /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar")
                               )
     self.assertResourceCalled('Execute', 'source /etc/storm/conf/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH ; storm ui > /var/log/storm/ui.out 2>&1 &\n echo $! > /var/run/storm/ui.pid',
         path = ['/usr/bin'],


Mime
View raw message