ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swa...@apache.org
Subject ambari git commit: Revert "AMBARI-9185. Add Kafka metric sink implementation to enable sink to AMS. Build failure."
Date Fri, 16 Jan 2015 23:26:19 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk d7ca3036e -> f1d354cca


Revert "AMBARI-9185. Add Kafka metric sink implementation to enable sink to AMS. Build failure."

This reverts commit 8d27ac2b77abdeeb61fd8d9bc6b51ce113fcd900.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f1d354cc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f1d354cc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f1d354cc

Branch: refs/heads/trunk
Commit: f1d354ccae8f95a63735628c133183a87249d1b2
Parents: d7ca303
Author: Siddharth Wagle <swagle@hortonworks.com>
Authored: Fri Jan 16 15:25:57 2015 -0800
Committer: Siddharth Wagle <swagle@hortonworks.com>
Committed: Fri Jan 16 15:25:57 2015 -0800

----------------------------------------------------------------------
 ambari-metrics/ambari-metrics-assembly/pom.xml  |  47 +-
 .../src/main/assembly/sink-windows.xml          |   8 -
 .../src/main/assembly/sink.xml                  |   8 -
 .../src/main/package/rpm/sink/postinstall.sh    |  11 +-
 .../ambari-metrics-kafka-sink/pom.xml           | 163 -------
 .../src/main/assemblies/empty.xml               |  21 -
 .../src/main/assemblies/jar-with-common.xml     |  34 --
 .../kafka/KafkaTimelineMetricsReporter.java     | 448 -------------------
 .../KafkaTimelineMetricsReporterMBean.java      |  25 --
 .../metrics2/sink/kafka/ScheduledReporter.java  | 218 ---------
 .../kafka/KafkaTimelineMetricsReporterTest.java | 109 -----
 .../sink/kafka/ScheduledReporterTest.java       | 105 -----
 ambari-metrics/pom.xml                          |   7 +-
 .../0.8.1.2.2/configuration/kafka-broker.xml    |  31 +-
 .../KAFKA/0.8.1.2.2/configuration/kafka-env.xml |   6 -
 .../KAFKA/0.8.1.2.2/package/scripts/kafka.py    |   3 -
 .../KAFKA/0.8.1.2.2/package/scripts/params.py   |  27 --
 17 files changed, 12 insertions(+), 1259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/pom.xml b/ambari-metrics/ambari-metrics-assembly/pom.xml
index 60ba30b..1e1ca8c 100644
--- a/ambari-metrics/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics/ambari-metrics-assembly/pom.xml
@@ -36,7 +36,6 @@
     <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
     <flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir>
-    <kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir>
     <python.ver>python &gt;= 2.6</python.ver>
     <deb.python.ver>python (&gt;= 2.6)</deb.python.ver>
     <deb.architecture>amd64</deb.architecture>
@@ -44,7 +43,6 @@
     <hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar>
     <storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar>
     <flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar>
-    <kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar>
   </properties>
 
   <build>
@@ -449,6 +447,7 @@
                           <location>${hadoop-sink.dir}/target/ambari-metrics-hadoop-sink-with-common-${project.version}.jar</location>
                         </source>
                       </sources>
+
                     </mapping>
                     <mapping>
                       <directory>/usr/lib/flume/lib</directory>
@@ -466,22 +465,6 @@
                         </source>
                       </sources>
                     </mapping>
-                    <mapping>
-                      <directory>/usr/lib/ambari-metrics-kafka-sink</directory>
-                      <sources>
-                        <source>
-                          <location>${kafka-sink.dir}/target/${kafka.sink.jar}</location>
-                        </source>
-                      </sources>
-                    </mapping>
-                     <mapping>
-                      <directory>/usr/lib/ambari-metrics-kafka-sink/lib</directory>
-                      <sources>
-                        <source>
-                          <location>${kafka-sink.dir}/target/lib</location>
-                        </source>
-                      </sources>
-                    </mapping>
                   </mappings>
                 </configuration>
 
@@ -612,7 +595,6 @@
                     <path>/var/log/ambari-metrics-collector</path>
                     <path>/var/lib/ambari-metrics-collector</path>
                     <path>/usr/lib/ambari-metrics-hadoop-sink</path>
-                    <path>/usr/lib/ambari-metrics-kafka-sink</path>
                     <path>/usr/lib/flume/lib</path>
                     <path>/usr/lib/storm/lib</path>
                   </paths>
@@ -789,28 +771,6 @@
                   </mapper>
                 </data>
 
-                <!-- kafka sink -->
-
-                <data>
-                  <src>${kafka-sink.dir}/target/${kafka.sink.jar}</src>
-                  <type>file</type>
-                  <mapper>
-                    <type>perm</type>
-                    <filemode>644</filemode>
-                    <dirmode>755</dirmode>
-                    <prefix>/usr/lib/ambari-metrics-kafka-sink</prefix>
-                  </mapper>
-                </data>
-                <data>
-                  <src>${kafka-sink.dir}/target/lib</src>
-                  <type>file</type>
-                  <mapper>
-                    <type>perm</type>
-                    <filemode>644</filemode>
-                    <dirmode>755</dirmode>
-                    <prefix>/usr/lib/ambari-metrics-kafka-sink/lib</prefix>
-                  </mapper>
-                </data>
               </dataSet>
             </configuration>
           </plugin>
@@ -1103,11 +1063,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.ambari</groupId>
-      <artifactId>ambari-metrics-kafka-sink</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.ambari</groupId>
       <artifactId>ambari-metrics-host-monitoring</artifactId>
       <version>${project.version}</version>
       <type>pom</type>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
index 38a8093..0a36fac 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
@@ -37,10 +37,6 @@
       <directory>${storm-sink.dir}/src/main/conf</directory>
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
-    <fileSet>
-      <directory>${kafka-sink.dir}/target/lib</directory>
-      <outputDirectory>hadoop-sink/lib</outputDirectory>
-    </fileSet>
   </fileSets>
 
   <files>
@@ -57,10 +53,6 @@
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
     <file>
-      <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
-      <outputDirectory>hadoop-sink</outputDirectory>
-    </file>
-    <file>
       <source>${basedir}/src/main/package/msi/sink.wxs</source>
       <outputDirectory>../../</outputDirectory>
       <filtered>true</filtered>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
index 4a3b7c5..2426904 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
@@ -38,10 +38,6 @@
       <directory>${storm-sink.dir}/src/main/conf</directory>
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
-    <fileSet>
-      <directory>${kafka-sink.dir}/target/lib</directory>
-      <outputDirectory>hadoop-sink/lib</outputDirectory>
-    </fileSet>
   </fileSets>
 
   <files>
@@ -57,10 +53,6 @@
       <source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
-    <file>
-      <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
-      <outputDirectory>hadoop-sink</outputDirectory>
-    </file>
   </files>
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh b/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
index e87b9f0..1955680 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
@@ -15,20 +15,15 @@
 # limitations under the License
 
 HADOOP_LINK_NAME="/usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar"
-HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}"
-
 FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar"
+HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}"
 FLUME_SINK_JAR="/usr/lib/flume/lib/${flume.sink.jar}"
-
-KAFKA_LINK_NAME="/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar"
-KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
-
 #link for storm jar not required with current loading
 #STORM_SINK_JAR="/usr/lib/storm/lib/${storm.sink.jar}"
 #STORM_LINK_NAME="/usr/lib/storm/lib/ambari-metrics-storm-sink.jar"
 
-JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR} ${KAFKA_SINK_JAR})
-LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME} ${KAFKA_LINK_NAME})
+JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR})
+LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME})
 
 for index in ${!LINKS[*]}
 do

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
deleted file mode 100644
index 55c6f07..0000000
--- a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
+++ /dev/null
@@ -1,163 +0,0 @@
-<?<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<parent>
-		<artifactId>ambari-metrics</artifactId>
-		<groupId>org.apache.ambari</groupId>
-		<version>0.1.0-SNAPSHOT</version>
-	</parent>
-	<modelVersion>4.0.0</modelVersion>
-	<artifactId>ambari-metrics-kafka-sink</artifactId>
-	<version>0.1.0-SNAPSHOT</version>
-	<packaging>jar</packaging>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-dependency-plugin</artifactId>
-				<version>2.9</version>
-				<executions>
-					<execution>
-						<id>copy-dependencies</id>
-						<phase>package</phase>
-						<goals>
-							<goal>copy-dependencies</goal>
-						</goals>
-						<configuration>
-							<includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,guava,jackson-core-asl,jackson-mapper-asl,jackson-xc,hadoop-common</includeArtifactIds>
-							<outputDirectory>${project.build.directory}/lib</outputDirectory>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<executions>
-					<execution>
-						<configuration>
-							<descriptors>
-								<descriptor>src/main/assemblies/jar-with-common.xml</descriptor>
-							</descriptors>
-							<attach>false</attach>
-							<tarLongFileMode>gnu</tarLongFileMode>
-							<appendAssemblyId>false</appendAssemblyId>
-							<finalName>${project.artifactId}-with-common-${project.version}</finalName>
-						</configuration>
-						<id>build-jar</id>
-						<phase>package</phase>
-						<goals>
-							<goal>single</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.0</version>
-			</plugin>
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.8</version>
-				<executions>
-					<execution>
-						<id>parse-version</id>
-						<phase>validate</phase>
-						<goals>
-							<goal>parse-version</goal>
-						</goals>
-					</execution>
-					<execution>
-						<id>regex-property</id>
-						<goals>
-							<goal>regex-property</goal>
-						</goals>
-						<configuration>
-							<name>ambariVersion</name>
-							<value>${project.version}</value>
-							<regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
-							<replacement>$1.$2.$3</replacement>
-							<failIfNoMatch>false</failIfNoMatch>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>com.github.goldin</groupId>
-				<artifactId>copy-maven-plugin</artifactId>
-				<version>0.2.5</version>
-				<executions>
-					<execution>
-						<id>create-archive</id>
-						<phase>none</phase>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.ambari</groupId>
-			<artifactId>ambari-metrics-common</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka_2.10</artifactId>
-			<version>0.8.1.1</version>
-		</dependency>
-		<dependency>
-			<groupId>com.yammer.metrics</groupId>
-			<artifactId>metrics-core</artifactId>
-			<version>2.2.0</version>
-		</dependency>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<scope>test</scope>
-			<version>4.10</version>
-		</dependency>
-		<dependency>
-			<groupId>org.easymock</groupId>
-			<artifactId>easymock</artifactId>
-			<version>3.2</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.powermock</groupId>
-			<artifactId>powermock-api-easymock</artifactId>
-			<version>1.4.9</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.powermock</groupId>
-			<artifactId>powermock-module-junit4</artifactId>
-			<version>1.4.9</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.mockito</groupId>
-			<artifactId>mockito-all</artifactId>
-			<version>1.9.5</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml
deleted file mode 100644
index 17ff68a..0000000
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<assembly>
-    <id>empty</id>
-    <formats/>
-</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml b/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml
deleted file mode 100644
index 4fa8585..0000000
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<?xml version="1.0"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<assembly>
-  <id>jar-with-common</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <dependencySets>
-    <dependencySet>
-      <outputDirectory>/</outputDirectory>
-      <unpack>true</unpack>
-      <includes>
-        <include>org.apache.ambari:ambari-metrics-common</include>
-        <include>org.apache.ambari:ambari-metrics-kafka-sink</include>
-      </includes>
-    </dependencySet>
-  </dependencySets>
-</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
deleted file mode 100644
index 762b5f2..0000000
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.kafka;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import kafka.metrics.KafkaMetricsConfig;
-import kafka.metrics.KafkaMetricsReporter;
-import kafka.utils.VerifiableProperties;
-
-import org.apache.commons.lang.ClassUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.util.Servers;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Metered;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricProcessor;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.stats.Snapshot;
-
-public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink implements KafkaMetricsReporter,
-    KafkaTimelineMetricsReporterMBean {
-
-  private final static Log LOG = LogFactory.getLog(KafkaTimelineMetricsReporter.class);
-
-  private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "kafka.timeline.metrics.sendInterval";
-  private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = "kafka.timeline.metrics.maxRowCacheSize";
-  private static final String TIMELINE_HOST_PROPERTY = "kafka.timeline.metrics.host";
-  private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port";
-  private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled";
-  private static final String TIMELINE_DEFAULT_HOST = "localhost";
-  private static final String TIMELINE_DEFAULT_PORT = "8188";
-
-  private boolean initialized = false;
-  private boolean running = false;
-  private Object lock = new Object();
-  private String collectorUri;
-  private String hostname;
-  private SocketAddress socketAddress;
-  private TimelineScheduledReporter reporter;
-  private TimelineMetricsCache metricsCache;
-
-  @Override
-  protected SocketAddress getServerSocketAddress() {
-    return socketAddress;
-  }
-
-  @Override
-  protected String getCollectorUri() {
-    return collectorUri;
-  }
-
-  public void setMetricsCache(TimelineMetricsCache metricsCache) {
-    this.metricsCache = metricsCache;
-  }
-
-  public void init(VerifiableProperties props) {
-    synchronized (lock) {
-      if (!initialized) {
-        LOG.info("Initializing Kafka Timeline Metrics Sink");
-        try {
-          hostname = InetAddress.getLocalHost().getHostName();
-        } catch (UnknownHostException e) {
-          LOG.error("Could not identify hostname.");
-          throw new RuntimeException("Could not identify hostname.", e);
-        }
-        KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
-        int metricsSendInterval = Integer.parseInt(props.getString(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY,
-            String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
-        int maxRowCacheSize = Integer.parseInt(props.getString(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY,
-            String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
-        String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST);
-        String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
-        setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
-        collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics";
-        List<InetSocketAddress> socketAddresses = Servers.parse(metricCollectorHost,
-            Integer.parseInt(metricCollectorPort));
-        if (socketAddresses != null && !socketAddresses.isEmpty()) {
-          socketAddress = socketAddresses.get(0);
-        }
-        initializeReporter();
-        if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
-          startReporter(metricsConfig.pollingIntervalSecs());
-        }
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("CollectorUri = " + collectorUri);
-          LOG.trace("SocketAddress = " + socketAddress);
-          LOG.trace("MetricsSendInterval = " + metricsSendInterval);
-          LOG.trace("MaxRowCacheSize = " + maxRowCacheSize);
-        }
-      }
-    }
-  }
-
-  public String getMBeanName() {
-    return "kafka:type=org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter";
-  }
-
-  public synchronized void startReporter(long period) {
-    synchronized (lock) {
-      if (initialized && !running) {
-        reporter.start(period, TimeUnit.SECONDS);
-        running = true;
-        LOG.info(String.format("Started Kafka Timeline metrics reporter with polling period %d seconds", period));
-      }
-    }
-  }
-
-  public synchronized void stopReporter() {
-    synchronized (lock) {
-      if (initialized && running) {
-        reporter.stop();
-        running = false;
-        LOG.info("Stopped Kafka Timeline metrics reporter");
-        initializeReporter();
-      }
-    }
-  }
-
-  private void initializeReporter() {
-    reporter = new TimelineScheduledReporter(Metrics.defaultRegistry(), "timeline-scheduled-reporter",
-        TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
-    initialized = true;
-  }
-
-  interface Context {
-    public List<TimelineMetric> getTimelineMetricList();
-  }
-
-  class TimelineScheduledReporter extends ScheduledReporter implements MetricProcessor<Context> {
-
-    private static final String APP_ID = "kafka_broker";
-    private static final String COUNT_SUFIX = ".count";
-    private static final String ONE_MINUTE_RATE_SUFIX = ".1MinuteRate";
-    private static final String MEAN_SUFIX = ".mean";
-    private static final String MEAN_RATE_SUFIX = ".meanRate";
-    private static final String FIVE_MINUTE_RATE_SUFIX = ".5MinuteRate";
-    private static final String FIFTEEN_MINUTE_RATE_SUFIX = ".15MinuteRate";
-    private static final String MIN_SUFIX = ".min";
-    private static final String MAX_SUFIX = ".max";
-    private static final String MEDIAN_SUFIX = ".median";
-    private static final String STD_DEV_SUFIX = "stddev";
-    private static final String SEVENTY_FIFTH_PERCENTILE_SUFIX = ".75percentile";
-    private static final String NINETY_FIFTH_PERCENTILE_SUFIX = ".95percentile";
-    private static final String NINETY_EIGHTH_PERCENTILE_SUFIX = ".98percentile";
-    private static final String NINETY_NINTH_PERCENTILE_SUFIX = ".99percentile";
-    private static final String NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX = ".999percentile";
-
-    protected TimelineScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) {
-      super(registry, name, rateUnit, durationUnit);
-    }
-
-    @Override
-    public void report(Set<Entry<MetricName, Metric>> metrics) {
-      final List<TimelineMetric> metricsList = new ArrayList<TimelineMetric>();
-      try {
-        for (Entry<MetricName, Metric> entry : metrics) {
-          final MetricName metricName = entry.getKey();
-          final Metric metric = entry.getValue();
-          Context context = new Context() {
-
-            public List<TimelineMetric> getTimelineMetricList() {
-              return metricsList;
-            }
-
-          };
-          metric.processWith(this, metricName, context);
-        }
-      } catch (Throwable t) {
-        LOG.error("Exception processing Kafka metric", t);
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Metrics List size: " + metricsList.size());
-        LOG.trace("Metics Set size: " + metrics.size());
-      }
-      if (!metricsList.isEmpty()) {
-        TimelineMetrics timelineMetrics = new TimelineMetrics();
-        timelineMetrics.setMetrics(metricsList);
-        try {
-          emitMetrics(timelineMetrics);
-        } catch (IOException e) {
-          LOG.error("Unexpected error", e);
-        } catch (Throwable t) {
-          LOG.error("Exception emitting metrics", t);
-        }
-      }
-    }
-
-    private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName,
-        Number attributeValue) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Creating timeline metric: " + attributeName + " = " + attributeValue + " time = "
-            + currentTimeMillis + " app_id = " + component);
-      }
-      TimelineMetric timelineMetric = new TimelineMetric();
-      timelineMetric.setMetricName(attributeName);
-      timelineMetric.setHostName(hostname);
-      timelineMetric.setAppId(component);
-      timelineMetric.setStartTime(currentTimeMillis);
-      timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
-      timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue());
-      return timelineMetric;
-    }
-
-    @Override
-    public void processMeter(MetricName name, Metered meter, Context context) throws Exception {
-      final long currentTimeMillis = System.currentTimeMillis();
-      final String sanitizedName = sanitizeName(name);
-      final String meterCountName = sanitizedName + COUNT_SUFIX;
-      final TimelineMetric countMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterCountName, meter.count());
-
-      final String meterOneMinuteRateName = sanitizedName + ONE_MINUTE_RATE_SUFIX;
-      final TimelineMetric oneMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          meterOneMinuteRateName, meter.oneMinuteRate());
-
-      final String meterMeanRateName = sanitizedName + MEAN_RATE_SUFIX;
-      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterMeanRateName,
-          meter.meanRate());
-
-      final String meterFiveMinuteRateName = sanitizedName + FIVE_MINUTE_RATE_SUFIX;
-      final TimelineMetric fiveMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          meterFiveMinuteRateName, meter.fiveMinuteRate());
-
-      final String meterFifteenMinuteRateName = sanitizedName + FIFTEEN_MINUTE_RATE_SUFIX;
-      final TimelineMetric fifteenMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          meterFifteenMinuteRateName, meter.fifteenMinuteRate());
-
-      metricsCache.putTimelineMetric(countMetric);
-      metricsCache.putTimelineMetric(oneMinuteRateMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(fiveMinuteRateMetric);
-      metricsCache.putTimelineMetric(fifteenMinuteRateMetric);
-
-      String[] metricNames = new String[] { meterCountName, meterOneMinuteRateName, meterMeanRateName,
-          meterFiveMinuteRateName, meterFifteenMinuteRateName };
-      populateMetricsList(context, metricNames);
-    }
-
-    @Override
-    public void processCounter(MetricName name, Counter counter, Context context) throws Exception {
-      final long currentTimeMillis = System.currentTimeMillis();
-      final String sanitizedName = sanitizeName(name);
-      final String metricCountName = sanitizedName + COUNT_SUFIX;
-      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricCountName, counter.count());
-      metricsCache.putTimelineMetric(metric);
-      populateMetricsList(context, metricCountName);
-    }
-
-    @Override
-    public void processHistogram(MetricName name, Histogram histogram, Context context) throws Exception {
-      final long currentTimeMillis = System.currentTimeMillis();
-      final Snapshot snapshot = histogram.getSnapshot();
-      final String sanitizedName = sanitizeName(name);
-
-      final String histogramMinName = sanitizedName + MIN_SUFIX;
-      final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMinName,
-          histogram.min());
-
-      final String histogramMaxName = sanitizedName + MAX_SUFIX;
-      final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMaxName,
-          histogram.max());
-
-      final String histogramMeanName = sanitizedName + MEAN_SUFIX;
-      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMeanName,
-          histogram.mean());
-
-      final String histogramMedianName = sanitizedName + MEDIAN_SUFIX;
-      final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMedianName,
-          snapshot.getMedian());
-
-      final String histogramStdDevName = sanitizedName + STD_DEV_SUFIX;
-      final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramStdDevName,
-          histogram.stdDev());
-
-      final String histogramSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramSeventyFifthPercentileName, snapshot.get75thPercentile());
-
-      final String histogramNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyFifthPercentileName, snapshot.get95thPercentile());
-
-      final String histogramNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyEighthPercentileName, snapshot.get98thPercentile());
-
-      final String histogramNinetyNinethPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyNinethPercentileName, snapshot.get99thPercentile());
-
-      final String histogramNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyNinePointNinePercentileName, snapshot.get999thPercentile());
-
-      metricsCache.putTimelineMetric(minMetric);
-      metricsCache.putTimelineMetric(maxMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(medianMetric);
-      metricsCache.putTimelineMetric(stdDevMetric);
-      metricsCache.putTimelineMetric(seventyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyEighthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric);
-
-      String[] metricNames = new String[] { histogramMaxName, histogramMeanName, histogramMedianName, histogramMinName,
-          histogramNinetyEighthPercentileName, histogramNinetyFifthPercentileName,
-          histogramNinetyNinePointNinePercentileName, histogramNinetyNinethPercentileName,
-          histogramSeventyFifthPercentileName, histogramStdDevName };
-      populateMetricsList(context, metricNames);
-    }
-
-    @Override
-    public void processTimer(MetricName name, Timer timer, Context context) throws Exception {
-      final long currentTimeMillis = System.currentTimeMillis();
-      final Snapshot snapshot = timer.getSnapshot();
-      final String sanitizedName = sanitizeName(name);
-
-      final String timerMinName = sanitizedName + MIN_SUFIX;
-      final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMinName, timer.min());
-
-      final String timerMaxName = sanitizedName + MAX_SUFIX;
-      final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMaxName, timer.max());
-
-      final String timerMeanName = sanitizedName + MEAN_SUFIX;
-      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMeanName, timer.mean());
-
-      final String timerMedianName = sanitizedName + MEDIAN_SUFIX;
-      final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMedianName,
-          snapshot.getMedian());
-
-      final String timerStdDevName = sanitizedName + STD_DEV_SUFIX;
-      final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerStdDevName,
-          timer.stdDev());
-
-      final String timerSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerSeventyFifthPercentileName, snapshot.get75thPercentile());
-
-      final String timerNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyFifthPercentileName, snapshot.get95thPercentile());
-
-      final String timerNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyEighthPercentileName, snapshot.get98thPercentile());
-
-      final String timerNinetyNinthPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyNinthPercentileName, snapshot.get99thPercentile());
-
-      final String timerNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyNinePointNinePercentileName, snapshot.get999thPercentile());
-
-      metricsCache.putTimelineMetric(minMetric);
-      metricsCache.putTimelineMetric(maxMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(medianMetric);
-      metricsCache.putTimelineMetric(stdDevMetric);
-      metricsCache.putTimelineMetric(seventyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyEighthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric);
-
-      String[] metricNames = new String[] { timerMaxName, timerMeanName, timerMedianName, timerMinName,
-          timerNinetyEighthPercentileName, timerNinetyFifthPercentileName, timerNinetyNinePointNinePercentileName,
-          timerNinetyNinthPercentileName, timerSeventyFifthPercentileName, timerStdDevName };
-      populateMetricsList(context, metricNames);
-    }
-
-    @Override
-    public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {
-      final long currentTimeMillis = System.currentTimeMillis();
-      final String sanitizedName = sanitizeName(name);
-      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, sanitizedName,
-          Double.parseDouble(String.valueOf(gauge.value())));
-      metricsCache.putTimelineMetric(metric);
-      populateMetricsList(context, sanitizedName);
-    }
-
-    private void populateMetricsList(Context context, String... metricNames) {
-      for (String metricName : metricNames) {
-        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName);
-        if (cachedMetric != null) {
-          context.getTimelineMetricList().add(cachedMetric);
-        }
-      }
-    }
-
-    protected String sanitizeName(MetricName name) {
-      if (name == null) {
-        return "";
-      }
-      final String qualifiedTypeName = name.getGroup() + "." + name.getType() + "." + name.getName();
-      final String metricName = name.hasScope() ? qualifiedTypeName + '.' + name.getScope() : qualifiedTypeName;
-      final StringBuilder sb = new StringBuilder();
-      for (int i = 0; i < metricName.length(); i++) {
-        final char p = metricName.charAt(i);
-        if (!(p >= 'A' && p <= 'Z') && !(p >= 'a' && p <= 'z') && !(p >= '0' && p <= '9') && (p != '_') && (p != '-')
-            && (p != '.') && (p != '\0')) {
-          sb.append('_');
-        } else {
-          sb.append(p);
-        }
-      }
-      return sb.toString();
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java
deleted file mode 100644
index 7f6c5c9..0000000
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.kafka;
-
-import kafka.metrics.KafkaMetricsReporterMBean;
-
-public interface KafkaTimelineMetricsReporterMBean extends KafkaMetricsReporterMBean {
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java
deleted file mode 100644
index f4f8333..0000000
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.kafka;
-
-import java.io.Closeable;
-import java.util.Locale;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.reporting.ConsoleReporter;
-import com.yammer.metrics.reporting.CsvReporter;
-
-/**
- * The abstract base class for all scheduled reporters (i.e., reporters which
- * process a registry's metrics periodically).
- *
- * @see ConsoleReporter
- * @see CsvReporter
- * @see Slf4jReporter
- */
-public abstract class ScheduledReporter implements Closeable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ScheduledReporter.class);
-
-  /**
-   * A simple named thread factory.
-   */
-  @SuppressWarnings("NullableProblems")
-  private static class NamedThreadFactory implements ThreadFactory {
-    private final ThreadGroup group;
-    private final AtomicInteger threadNumber = new AtomicInteger(1);
-    private final String namePrefix;
-
-    private NamedThreadFactory(String name) {
-      final SecurityManager s = System.getSecurityManager();
-      this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
-      this.namePrefix = "metrics-" + name + "-thread-";
-    }
-
-    @Override
-    public Thread newThread(Runnable r) {
-      final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
-      t.setDaemon(true);
-      if (t.getPriority() != Thread.NORM_PRIORITY) {
-        t.setPriority(Thread.NORM_PRIORITY);
-      }
-      return t;
-    }
-  }
-
-  private static final AtomicInteger FACTORY_ID = new AtomicInteger();
-
-  private final MetricsRegistry registry;
-  private final ScheduledExecutorService executor;
-  private final double durationFactor;
-  private final String durationUnit;
-  private final double rateFactor;
-  private final String rateUnit;
-
-  /**
-   * Creates a new {@link ScheduledReporter} instance.
-   *
-   * @param registry
-   *          the {@link com.codahale.metrics.MetricRegistry} containing the
-   *          metrics this reporter will report
-   * @param name
-   *          the reporter's name
-   * @param rateUnit
-   *          a unit of time
-   * @param durationUnit
-   *          a unit of time
-   */
-  protected ScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) {
-    this(registry, rateUnit, durationUnit, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-'
-        + FACTORY_ID.incrementAndGet())));
-  }
-
-  /**
-   * Creates a new {@link ScheduledReporter} instance.
-   *
-   * @param registry
-   *          the {@link com.codahale.metrics.MetricRegistry} containing the
-   *          metrics this reporter will report
-   * @param executor
-   *          the executor to use while scheduling reporting of metrics.
-   */
-  protected ScheduledReporter(MetricsRegistry registry, TimeUnit rateUnit, TimeUnit durationUnit,
-      ScheduledExecutorService executor) {
-    this.registry = registry;
-    this.executor = executor;
-    this.rateFactor = rateUnit.toSeconds(1);
-    this.rateUnit = calculateRateUnit(rateUnit);
-    this.durationFactor = 1.0 / durationUnit.toNanos(1);
-    this.durationUnit = durationUnit.toString().toLowerCase(Locale.US);
-  }
-
-  /**
-   * Starts the reporter polling at the given period.
-   *
-   * @param period
-   *          the amount of time between polls
-   * @param unit
-   *          the unit for {@code period}
-   */
-  public void start(long period, TimeUnit unit) {
-    executor.scheduleAtFixedRate(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          report();
-        } catch (RuntimeException ex) {
-          LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this
-              .getClass().getSimpleName(), ex);
-        }
-      }
-    }, period, period, unit);
-  }
-
-  /**
-   * Stops the reporter and shuts down its thread of execution.
-   *
-   * Uses the shutdown pattern from
-   * http://docs.oracle.com/javase/7/docs/api/java
-   * /util/concurrent/ExecutorService.html
-   */
-  public void stop() {
-    executor.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
-        executor.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
-          System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      executor.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  /**
-   * Stops the reporter and shuts down its thread of execution.
-   */
-  @Override
-  public void close() {
-    stop();
-  }
-
-  /**
-   * Report the current values of all metrics in the registry.
-   */
-  public void report() {
-    synchronized (this) {
-      report(registry.allMetrics().entrySet());
-    }
-  }
-
-  /**
-   * Called periodically by the polling thread. Subclasses should report all the
-   * given metrics.
-   *
-   * @param metrics
-   *          all the metrics in the registry
-   */
-  public abstract void report(Set<Entry<MetricName, Metric>> metrics);
-
-  protected String getRateUnit() {
-    return rateUnit;
-  }
-
-  protected String getDurationUnit() {
-    return durationUnit;
-  }
-
-  protected double convertDuration(double duration) {
-    return duration * durationFactor;
-  }
-
-  protected double convertRate(double rate) {
-    return rate * rateFactor;
-  }
-
-  private String calculateRateUnit(TimeUnit unit) {
-    final String s = unit.toString().toLowerCase(Locale.US);
-    return s.substring(0, s.length() - 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
deleted file mode 100644
index 67c61e1..0000000
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.kafka;
-
-import static org.mockito.Mockito.mock;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import kafka.utils.VerifiableProperties;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ Metrics.class, HttpClient.class })
-@PowerMockIgnore("javax.management.*")
-public class KafkaTimelineMetricsReporterTest {
-
-  private final List<Metric> list = new ArrayList<Metric>();
-  private final MetricsRegistry registry = new MetricsRegistry();
-  @SuppressWarnings("rawtypes")
-  private final Gauge gauge = mock(Gauge.class);
-  private final KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter = new KafkaTimelineMetricsReporter();
-  private VerifiableProperties props;
-
-  @Before
-  public void setUp() throws Exception {
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    Gauge g = registry.newGauge(System.class, "gauge", gauge);
-    Counter counter = registry.newCounter(System.class, "counter");
-    Histogram histogram = registry.newHistogram(System.class, "histogram");
-    Meter meter = registry.newMeter(System.class, "meter", "empty", TimeUnit.MILLISECONDS);
-    Timer timer = registry.newTimer(System.class, "timer");
-    list.add(g);
-    list.add(counter);
-    list.add(histogram);
-    list.add(meter);
-    list.add(timer);
-    Properties properties = new Properties();
-    properties.setProperty("kafka.timeline.metrics.sendInterval", "5900");
-    properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000");
-    properties.setProperty("kafka.timeline.metrics.host", "localhost");
-    properties.setProperty("kafka.timeline.metrics.port", "8188");
-    properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
-    props = new VerifiableProperties(properties);
-  }
-
-  @Test
-  public void testReporterStartStop() {
-    mockStatic(Metrics.class);
-    EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2);
-    TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(kafkaTimelineMetricsReporter);
-    kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
-    HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
-    kafkaTimelineMetricsReporter.setHttpClient(httpClient);
-    replay(Metrics.class, httpClient, timelineMetricsCache);
-    kafkaTimelineMetricsReporter.init(props);
-    kafkaTimelineMetricsReporter.stopReporter();
-    verifyAll();
-  }
-
-  private TimelineMetricsCache getTimelineMetricsCache(KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter) {
-    TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class);
-    kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
-    EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1")).andReturn(new TimelineMetric()).once();
-    timelineMetricsCache.putTimelineMetric(EasyMock.anyObject(TimelineMetric.class));
-    EasyMock.expectLastCall().once();
-    return timelineMetricsCache;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
deleted file mode 100644
index 41f9126..0000000
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.kafka;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
-
-public class ScheduledReporterTest {
-  private final Gauge gauge = mock(Gauge.class);
-  private final List<Metric> list = new ArrayList<Metric>();
-  private final MetricsRegistry registry = new MetricsRegistry();
-  private final ScheduledReporter reporter = spy(new ScheduledReporter(registry, "example", TimeUnit.SECONDS,
-      TimeUnit.MILLISECONDS) {
-    @Override
-    public void report(Set<Entry<MetricName, Metric>> metrics) {
-      // nothing doing!
-    }
-  });
-
-  @SuppressWarnings("unchecked")
-  @Before
-  public void setUp() throws Exception {
-    Gauge g = registry.newGauge(System.class, "gauge", gauge);
-    Counter counter = registry.newCounter(System.class, "counter");
-    Histogram histogram = registry.newHistogram(System.class, "histogram");
-    Meter meter = registry.newMeter(System.class, "meter", "empty", TimeUnit.MILLISECONDS);
-    Timer timer = registry.newTimer(System.class, "timer");
-    list.add(g);
-    list.add(counter);
-    list.add(histogram);
-    list.add(meter);
-    list.add(timer);
-    reporter.start(200, TimeUnit.MILLISECONDS);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    reporter.stop();
-  }
-
-  @Test
-  public void pollsPeriodically() throws Exception {
-    Thread.sleep(500);
-    verify(reporter, times(2)).report(set(list));
-  }
-
-  private Set<Entry<MetricName, Metric>> set(List<Metric> metrics) {
-    final Map<MetricName, Metric> map = new HashMap<MetricName, Metric>();
-    for (Metric metric : metrics) {
-      String name = null;
-      if (metric instanceof Gauge) {
-        name = "gauge";
-      } else if (metric instanceof Counter) {
-        name = "counter";
-      } else if (metric instanceof Histogram) {
-        name = "histogram";
-      } else if (metric instanceof Meter) {
-        name = "meter";
-      } else if (metric instanceof Timer) {
-        name = "timer";
-      }
-      map.put(new MetricName(System.class, name), metric);
-    }
-    return map.entrySet();
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index 22067d9..d99cc82 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -14,7 +14,11 @@
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
---><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0                              http://maven.apache.org/xsd/maven-4.0.0.xsd">
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
   <groupId>org.apache.ambari</groupId>
   <modelVersion>4.0.0</modelVersion>
@@ -25,7 +29,6 @@
     <module>ambari-metrics-common</module>
     <module>ambari-metrics-hadoop-sink</module>
     <module>ambari-metrics-flume-sink</module>
-    <module>ambari-metrics-kafka-sink</module>
     <module>ambari-metrics-storm-sink</module>
     <module>ambari-metrics-timelineservice</module>
     <module>ambari-metrics-host-monitoring</module>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
index 9c11007..dc1b6b4 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
@@ -101,7 +101,7 @@
     <name>num.partitions</name>
     <value>1</value>
     <description>
-        The default number of partitions per topic.
+       	The default number of partitions per topic.
     </description>
   </property>
   <property>
@@ -291,9 +291,9 @@
   </property>
   <property>
     <name>kafka.metrics.reporters</name>
-    <value>{{kafka_metrics_reporters}}</value>
+    <value>kafka.ganglia.KafkaGangliaMetricsReporter</value>
     <description>
-      kafka ganglia metrics reporter and kafka timeline metrics reporter
+      kafka ganglia metrics reporter
     </description>
   </property>
   <property>
@@ -318,29 +318,4 @@
     <value>kafka</value>
     <description>Ganglia group name </description>
   </property>
-  <property>
-    <name>kafka.timeline.metrics.reporter.enabled</name>
-    <value>true</value>
-    <description>Kafka timeline metrics reporter enable</description>
-  </property>
-  <property>
-    <name>kafka.timeline.metrics.host</name>
-    <value>{{metric_collector_host}}</value>
-    <description>Timeline host</description>
-  </property>
-  <property>
-    <name>kafka.timeline.metrics.port</name>
-    <value>{{metric_collector_port}}</value>
-    <description>Timeline port</description>
-  </property>
-  <property>
-    <name>kafka.timeline.metrics.reporter.sendInterval</name>
-    <value>5900</value>
-    <description>Timeline metrics reporter send interval</description>
-  </property>
-    <property>
-    <name>kafka.timeline.metrics.maxRowCacheSize</name>
-    <value>10000</value>
-    <description>Timeline metrics reporter send interval</description>
-  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml
index f322406..7ad4396 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml
@@ -50,12 +50,6 @@
 # The java implementation to use.
 export JAVA_HOME={{java64_home}}
 export PATH=$PATH:$JAVA_HOME/bin
-
-# Add kafka sink to classpath and related depenencies
-if [ -e "/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar" ]; then
-  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar
-  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/*
-fi
     </value>
   </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
index 33ee47a..c0231a8 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
@@ -35,9 +35,6 @@ def kafka():
     kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
     kafka_server_config['broker.id'] = brokerid
     kafka_server_config['host.name'] = params.hostname
-    kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host
-    kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
-    kafka_server_config['kafka.metrics.reporters'] = params.kafka_metrics_reporters
     kafka_data_dir = kafka_server_config['log.dirs']
     Directory(filter(None,kafka_data_dir.split(",")),
               owner=params.kafka_user,

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
index 800ccc4..067e537 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
@@ -60,30 +60,3 @@ if (('kafka-log4j' in config['configurations']) and ('content' in config['config
     log4j_props = config['configurations']['kafka-log4j']['content']
 else:
     log4j_props = None
-
-if 'ganglia_server_host' in config['clusterHostInfo'] and \
-    len(config['clusterHostInfo']['ganglia_server_host'])>0:
-  ganglia_installed = True
-  ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
-  ganglia_report_interval = 60
-else:
-  ganglia_installed = False
-
-kafka_metrics_reporters=""
-
-if ganglia_installed:
-  kafka_metrics_reporters = "kafka.ganglia.KafkaGangliaMetricsReporter"
-
-ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", [])
-has_metric_collector = not len(ams_collector_hosts) == 0
-
-if has_metric_collector:
-  metric_collector_host = ams_collector_hosts[0]
-  metric_collector_port = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:8188")
-  if metric_collector_port and metric_collector_port.find(':') != -1:
-    metric_collector_port = metric_collector_port.split(':')[1]
-
-  if not len(kafka_metrics_reporters) == 0:
-      kafka_metrics_reporters = kafka_metrics_reporters + ','
-
-  kafka_metrics_reporters = kafka_metrics_reporters + "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"


Mime
View raw message