ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swa...@apache.org
Subject [22/22] git commit: AMBARI-5707. Metrics system prototype implementation. (swagle)
Date Mon, 22 Sep 2014 18:02:32 GMT
AMBARI-5707. Metrics system prototype implementation. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/865d187e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/865d187e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/865d187e

Branch: refs/heads/branch-metrics-dev
Commit: 865d187e33ea028fcf34541b2f634f887db5aa7c
Parents: 802df76
Author: Siddharth Wagle <swagle@hortonworks.com>
Authored: Mon Sep 22 11:01:05 2014 -0700
Committer: Siddharth Wagle <swagle@hortonworks.com>
Committed: Mon Sep 22 11:01:05 2014 -0700

----------------------------------------------------------------------
 .../ambari-metrics-hadoop-sink/pom.xml          |  132 +
 .../src/main/assemblies/empty.xml               |   21 +
 .../src/main/assemblies/sink.xml                |   34 +
 .../timeline/AbstractTimelineMetricsSink.java   |  101 +
 .../metrics2/sink/timeline/TimelineMetric.java  |  170 +
 .../metrics2/sink/timeline/TimelineMetrics.java |  102 +
 .../sink/timeline/TimelineMetricsCache.java     |  128 +
 .../sink/timeline/TimelineMetricsSink.java      |  211 ++
 .../pom.xml                                     |  248 ++
 .../src/main/assemblies/ats.xml                 |   34 +
 .../src/main/assemblies/empty.xml               |   21 +
 .../main/conf/hbase-site-metrics-service.xml    |   72 +
 .../org/apache/hadoop/yarn/conf/YarnConfig.java |   26 +
 .../ApplicationHistoryClientService.java        |  211 ++
 .../ApplicationHistoryManager.java              |   28 +
 .../ApplicationHistoryManagerImpl.java          |  243 ++
 .../ApplicationHistoryReader.java               |  117 +
 .../ApplicationHistoryServer.java               |  190 +
 .../ApplicationHistoryStore.java                |   37 +
 .../ApplicationHistoryWriter.java               |  112 +
 .../FileSystemApplicationHistoryStore.java      |  784 +++++
 .../MemoryApplicationHistoryStore.java          |  274 ++
 .../NullApplicationHistoryStore.java            |  127 +
 .../timeline/AbstractTimelineAggregator.java    |  294 ++
 .../timeline/HBaseTimelineMetricStore.java      |  185 +
 .../metrics/timeline/PhoenixHBaseAccessor.java  |  522 +++
 .../metrics/timeline/PhoenixTransactSQL.java    |  398 +++
 .../TimelineMetricAggregatorHourly.java         |  142 +
 .../TimelineMetricAggregatorMinute.java         |  142 +
 .../TimelineMetricClusterAggregator.java        |  259 ++
 .../TimelineMetricClusterAggregatorHourly.java  |   48 +
 .../metrics/timeline/TimelineMetricStore.java   |   70 +
 .../records/ApplicationAttemptFinishData.java   |   95 +
 .../records/ApplicationAttemptHistoryData.java  |  171 +
 .../records/ApplicationAttemptStartData.java    |   82 +
 .../records/ApplicationFinishData.java          |   94 +
 .../records/ApplicationHistoryData.java         |  213 ++
 .../records/ApplicationStartData.java           |  106 +
 .../records/ContainerFinishData.java            |   90 +
 .../records/ContainerHistoryData.java           |  182 +
 .../records/ContainerStartData.java             |   92 +
 .../pb/ApplicationAttemptFinishDataPBImpl.java  |  239 ++
 .../pb/ApplicationAttemptStartDataPBImpl.java   |  208 ++
 .../impl/pb/ApplicationFinishDataPBImpl.java    |  226 ++
 .../impl/pb/ApplicationStartDataPBImpl.java     |  229 ++
 .../impl/pb/ContainerFinishDataPBImpl.java      |  204 ++
 .../impl/pb/ContainerStartDataPBImpl.java       |  258 ++
 .../timeline/EntityIdentifier.java              |  100 +
 .../timeline/GenericObjectMapper.java           |  135 +
 .../timeline/LeveldbTimelineStore.java          | 1473 ++++++++
 .../timeline/MemoryTimelineStore.java           |  360 ++
 .../timeline/NameValuePair.java                 |   59 +
 .../timeline/TimelineReader.java                |  155 +
 .../timeline/TimelineStore.java                 |   29 +
 .../timeline/TimelineWriter.java                |   46 +
 .../timeline/package-info.java                  |   20 +
 .../webapp/AHSController.java                   |   55 +
 .../webapp/AHSLogsPage.java                     |   55 +
 .../webapp/AHSView.java                         |   90 +
 .../webapp/AHSWebApp.java                       |   63 +
 .../webapp/AHSWebServices.java                  |  162 +
 .../webapp/AppAttemptPage.java                  |   69 +
 .../webapp/AppPage.java                         |   71 +
 .../webapp/ContainerPage.java                   |   41 +
 .../webapp/JAXBContextResolver.java             |   64 +
 .../webapp/NavBlock.java                        |   51 +
 .../webapp/TimelineWebServices.java             |  504 +++
 .../yarn/util/timeline/TimelineUtilsExt.java    |   39 +
 .../ApplicationHistoryStoreTestUtils.java       |   84 +
 .../TestApplicationHistoryClientService.java    |  206 ++
 .../TestApplicationHistoryManagerImpl.java      |   74 +
 .../TestApplicationHistoryServer.java           |   77 +
 .../TestFileSystemApplicationHistoryStore.java  |  233 ++
 .../TestMemoryApplicationHistoryStore.java      |  204 ++
 .../timeline/TestPhoenixTransactSQL.java        |   69 +
 .../timeline/TestTimelineMetricStore.java       |   81 +
 .../timeline/TestGenericObjectMapper.java       |  102 +
 .../timeline/TestLeveldbTimelineStore.java      |  253 ++
 .../timeline/TestMemoryTimelineStore.java       |   83 +
 .../timeline/TimelineStoreTestUtils.java        |  789 +++++
 .../webapp/TestAHSWebApp.java                   |  182 +
 .../webapp/TestAHSWebServices.java              |  303 ++
 .../webapp/TestTimelineWebServices.java         |  391 +++
 .../ambari-metrics-host-monitoring/pom.xml      |  152 +
 .../src/main/python/__init__.py                 |   21 +
 .../src/main/python/core/__init__.py            |   34 +
 .../main/python/core/application_metric_map.py  |  127 +
 .../src/main/python/core/config_reader.py       |  127 +
 .../src/main/python/core/controller.py          |  126 +
 .../src/main/python/core/emitter.py             |   90 +
 .../src/main/python/core/event_definition.py    |   85 +
 .../src/main/python/core/host_info.py           |  187 +
 .../src/main/python/core/metric_collector.py    |   91 +
 .../src/main/python/main.py                     |   49 +
 .../src/main/python/psutil/LICENSE              |   27 +
 .../src/main/python/psutil/MANIFEST.in          |   14 +
 .../src/main/python/psutil/Makefile             |   77 +
 .../src/main/python/psutil/README               |  270 ++
 .../src/main/python/psutil/build.out            |  137 +
 .../src/main/python/psutil/build.py             |   57 +
 .../psutil/__init__.py                          | 1987 +++++++++++
 .../lib.macosx-10.8-intel-2.7/psutil/_common.py |  258 ++
 .../lib.macosx-10.8-intel-2.7/psutil/_compat.py |  433 +++
 .../lib.macosx-10.8-intel-2.7/psutil/_psbsd.py  |  389 +++
 .../psutil/_pslinux.py                          | 1225 +++++++
 .../lib.macosx-10.8-intel-2.7/psutil/_psosx.py  |  341 ++
 .../psutil/_psposix.py                          |  157 +
 .../psutil/_pssunos.py                          |  533 +++
 .../psutil/_pswindows.py                        |  485 +++
 .../src/main/python/psutil/docs/Makefile        |  177 +
 .../src/main/python/psutil/docs/README          |   15 +
 .../python/psutil/docs/_static/copybutton.js    |   57 +
 .../main/python/psutil/docs/_static/sidebar.js  |  161 +
 .../python/psutil/docs/_template/globaltoc.html |   12 +
 .../psutil/docs/_template/indexcontent.html     |    4 +
 .../psutil/docs/_template/indexsidebar.html     |   16 +
 .../main/python/psutil/docs/_template/page.html |   66 +
 .../_themes/pydoctheme/static/pydoctheme.css    |  187 +
 .../psutil/docs/_themes/pydoctheme/theme.conf   |   23 +
 .../src/main/python/psutil/docs/conf.py         |  253 ++
 .../src/main/python/psutil/docs/index.rst       | 1247 +++++++
 .../src/main/python/psutil/docs/make.bat        |  242 ++
 .../main/python/psutil/examples/disk_usage.py   |   63 +
 .../src/main/python/psutil/examples/free.py     |   42 +
 .../src/main/python/psutil/examples/iotop.py    |  178 +
 .../src/main/python/psutil/examples/killall.py  |   32 +
 .../src/main/python/psutil/examples/meminfo.py  |   69 +
 .../src/main/python/psutil/examples/netstat.py  |   65 +
 .../src/main/python/psutil/examples/nettop.py   |  165 +
 .../src/main/python/psutil/examples/pmap.py     |   58 +
 .../python/psutil/examples/process_detail.py    |  162 +
 .../src/main/python/psutil/examples/top.py      |  232 ++
 .../src/main/python/psutil/examples/who.py      |   34 +
 .../src/main/python/psutil/make.bat             |  176 +
 .../src/main/python/psutil/psutil/__init__.py   | 1987 +++++++++++
 .../src/main/python/psutil/psutil/_common.py    |  258 ++
 .../src/main/python/psutil/psutil/_compat.py    |  433 +++
 .../src/main/python/psutil/psutil/_psbsd.py     |  389 +++
 .../src/main/python/psutil/psutil/_pslinux.py   | 1225 +++++++
 .../src/main/python/psutil/psutil/_psosx.py     |  341 ++
 .../src/main/python/psutil/psutil/_psposix.py   |  157 +
 .../src/main/python/psutil/psutil/_pssunos.py   |  533 +++
 .../src/main/python/psutil/psutil/_psutil_bsd.c | 2212 ++++++++++++
 .../src/main/python/psutil/psutil/_psutil_bsd.h |   51 +
 .../main/python/psutil/psutil/_psutil_common.c  |   37 +
 .../main/python/psutil/psutil/_psutil_common.h  |   10 +
 .../main/python/psutil/psutil/_psutil_linux.c   |  510 +++
 .../main/python/psutil/psutil/_psutil_linux.h   |   20 +
 .../src/main/python/psutil/psutil/_psutil_osx.c | 1881 ++++++++++
 .../src/main/python/psutil/psutil/_psutil_osx.h |   41 +
 .../main/python/psutil/psutil/_psutil_posix.c   |  128 +
 .../main/python/psutil/psutil/_psutil_posix.h   |   10 +
 .../main/python/psutil/psutil/_psutil_sunos.c   | 1290 +++++++
 .../main/python/psutil/psutil/_psutil_sunos.h   |   27 +
 .../main/python/psutil/psutil/_psutil_windows.c | 3241 ++++++++++++++++++
 .../main/python/psutil/psutil/_psutil_windows.h |   70 +
 .../src/main/python/psutil/psutil/_pswindows.py |  485 +++
 .../psutil/psutil/arch/bsd/process_info.c       |  285 ++
 .../psutil/psutil/arch/bsd/process_info.h       |   15 +
 .../psutil/psutil/arch/osx/process_info.c       |  293 ++
 .../psutil/psutil/arch/osx/process_info.h       |   16 +
 .../python/psutil/psutil/arch/windows/glpi.h    |   41 +
 .../psutil/psutil/arch/windows/ntextapi.h       |  287 ++
 .../psutil/arch/windows/process_handles.c       |  336 ++
 .../psutil/arch/windows/process_handles.h       |   10 +
 .../psutil/psutil/arch/windows/process_info.c   |  443 +++
 .../psutil/psutil/arch/windows/process_info.h   |   17 +
 .../psutil/psutil/arch/windows/security.c       |  238 ++
 .../psutil/psutil/arch/windows/security.h       |   17 +
 .../src/main/python/psutil/setup.py             |  198 ++
 ambari-metrics/pom.xml                          |  149 +
 pom.xml                                         |    7 +
 172 files changed, 43647 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
new file mode 100644
index 0000000..310a53f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>ambari-metrics</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>1.3.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>ambari-metrics-hadoop-sink</artifactId>
+  <packaging>jar</packaging>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assemblies/sink.xml</descriptor>
+          </descriptors>
+          <tarLongFileMode>gnu</tarLongFileMode>
+        </configuration>
+        <executions>
+          <execution>
+            <id>build-tarball</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.0</version>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.8</version>
+        <executions>
+          <execution>
+            <id>parse-version</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>parse-version</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>regex-property</id>
+            <goals>
+              <goal>regex-property</goal>
+            </goals>
+            <configuration>
+              <name>ambariVersion</name>
+              <value>${project.version}</value>
+              <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
+              <replacement>$1.$2.$3</replacement>
+              <failIfNoMatch>false</failIfNoMatch>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>2.4.0</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-httpclient</groupId>
+      <artifactId>commons-httpclient</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+      <version>3.2.1</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-configuration</groupId>
+      <artifactId>commons-configuration</artifactId>
+      <version>1.6</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-sink/src/main/assemblies/empty.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/assemblies/empty.xml
new file mode 100644
index 0000000..35738b1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/assemblies/empty.xml
@@ -0,0 +1,21 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+    <id>empty</id>
+    <formats/>
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-sink/src/main/assemblies/sink.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/assemblies/sink.xml b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/assemblies/sink.xml
new file mode 100644
index 0000000..21a6b36
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/assemblies/sink.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly>
+  <!--This 'all' id is not appended to the produced bundle because we do this:
+    http://maven.apache.org/plugins/maven-assembly-plugin/faq.html#required-classifiers
+  -->
+  <id>dist</id>
+  <formats>
+    <format>dir</format>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <files>
+    <file>
+      <source>${project.build.directory}/${artifact.artifactId}-${artifact.version}.jar</source>
+      <outputDirectory>ambari-metrics-${project.version}/lib/ambari-metrics</outputDirectory>
+    </file>
+  </files>
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
new file mode 100644
index 0000000..2c42274
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.util.Servers;
+import org.apache.hadoop.net.DNS;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+
+public abstract class AbstractTimelineMetricsSink implements MetricsSink {
+
+  public final Log LOG = LogFactory.getLog(this.getClass());
+
+  private SubsetConfiguration conf;
+  private String hostName = "UNKNOWN.example.com";
+  private String serviceName = "";
+  private final String COLLECTOR_HOST_PROPERTY = "collector";
+  private final int DEFAULT_PORT = 8188;
+
+  private List<? extends SocketAddress> metricsServers;
+  private String collectorUri;
+
+  @Override
+  public void init(SubsetConfiguration conf) {
+    this.conf = conf;
+    LOG.info("Initializing Timeline metrics sink.");
+
+    // Take the hostname from the DNS class.
+    if (conf.getString("slave.host.name") != null) {
+      hostName = conf.getString("slave.host.name");
+    } else {
+      try {
+        hostName = DNS.getDefaultHost(
+          conf.getString("dfs.datanode.dns.interface", "default"),
+          conf.getString("dfs.datanode.dns.nameserver", "default"));
+      } catch (UnknownHostException uhe) {
+        LOG.error(uhe);
+        hostName = "UNKNOWN.example.com";
+      }
+    }
+
+    serviceName = getFirstConfigPrefix(conf);
+
+    // Load collector configs
+    metricsServers = Servers.parse(conf.getString(COLLECTOR_HOST_PROPERTY),
+      DEFAULT_PORT);
+
+    if (metricsServers == null || metricsServers.isEmpty()) {
+      LOG.error("No Metric collector configured.");
+    } else {
+      collectorUri = "http://" + conf.getString(COLLECTOR_HOST_PROPERTY).trim()
+        + "/ws/v1/timeline/metrics";
+    }
+  }
+
+  protected String getHostName() {
+    return hostName;
+  }
+
+  protected String getServiceName() {
+    return serviceName;
+  }
+
+  private String getFirstConfigPrefix(SubsetConfiguration conf) {
+    while (conf.getParent() instanceof SubsetConfiguration) {
+      conf = (SubsetConfiguration) conf.getParent();
+    }
+    return conf.getPrefix();
+  }
+
+  protected SocketAddress getServerSocketAddress() {
+    if (metricsServers != null && !metricsServers.isEmpty()) {
+      return metricsServers.get(0);
+    }
+    return null;
+  }
+
+  protected String getCollectorUri() {
+    return collectorUri;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
new file mode 100644
index 0000000..1b35d92
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Map;
+import java.util.TreeMap;
+
+@XmlRootElement(name = "metric")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetric implements Comparable<TimelineMetric> {
+
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private String hostName;
+  private long timestamp;
+  private long startTime;
+  private String type;
+  private Map<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+  @XmlElement(name = "metricname")
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public void setMetricName(String metricName) {
+    this.metricName = metricName;
+  }
+
+  @XmlElement(name = "appid")
+  public String getAppId() {
+    return appId;
+  }
+
+  public void setAppId(String appId) {
+    this.appId = appId;
+  }
+
+  @XmlElement(name = "instanceid")
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  @XmlElement(name = "hostname")
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String hostName) {
+    this.hostName = hostName;
+  }
+
+  @XmlElement(name = "timestamp")
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  @XmlElement(name = "starttime")
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @XmlElement(name = "type")
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  @XmlElement(name = "metrics")
+  public Map<Long, Double> getMetricValues() {
+    return metricValues;
+  }
+
+  public void setMetricValues(Map<Long, Double> metricValues) {
+    this.metricValues = metricValues;
+  }
+
+  public void addMetricValues(Map<Long, Double> metricValues) {
+    this.metricValues.putAll(metricValues);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineMetric metric = (TimelineMetric) o;
+
+    if (!metricName.equals(metric.metricName)) return false;
+    if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+      return false;
+    if (!appId.equals(metric.appId)) return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+    if (timestamp != metric.timestamp) return false;
+    if (startTime != metric.startTime) return false;
+
+    return true;
+  }
+
+  public boolean equalsExceptTime(TimelineMetric metric) {
+    if (!metricName.equals(metric.metricName)) return false;
+    if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+      return false;
+    if (!appId.equals(metric.appId)) return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + appId.hashCode();
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+
+  @Override
+  public int compareTo(TimelineMetric other) {
+    if (timestamp > other.timestamp) {
+      return -1;
+    } else if (timestamp < other.timestamp) {
+      return 1;
+    } else {
+      return metricName.compareTo(other.metricName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
new file mode 100644
index 0000000..a6c925a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The class that hosts a list of timeline entities.
+ */
+@XmlRootElement(name = "metrics")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetrics {
+
+  private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
+
+  public TimelineMetrics() {}
+
+  @XmlElement(name = "metrics")
+  public List<TimelineMetric> getMetrics() {
+    return allMetrics;
+  }
+
+  public void setMetrics(List<TimelineMetric> allMetrics) {
+    this.allMetrics = allMetrics;
+  }
+
+  private boolean isEqualTimelineMetrics(TimelineMetric metric1,
+                                         TimelineMetric metric2) {
+
+    boolean isEqual = true;
+
+    if (!metric1.getMetricName().equals(metric2.getMetricName())) {
+      return false;
+    }
+
+    if (metric1.getHostName() != null) {
+      isEqual = metric1.getHostName().equals(metric2.getHostName());
+    }
+
+    if (metric1.getAppId() != null) {
+      isEqual = metric1.getAppId().equals(metric2.getAppId());
+    }
+
+    return isEqual;
+  }
+
+  /**
+   * Merge with existing TimelineMetric if everything except startTime is
+   * the same.
+   * @param metric {@link TimelineMetric}
+   */
+  public void addOrMergeTimelineMetric(TimelineMetric metric) {
+    TimelineMetric metricToMerge = null;
+
+    if (!allMetrics.isEmpty()) {
+      for (TimelineMetric timelineMetric : allMetrics) {
+        if (timelineMetric.equalsExceptTime(metric)) {
+          metricToMerge = timelineMetric;
+          break;
+        }
+      }
+    }
+
+    if (metricToMerge != null) {
+      metricToMerge.addMetricValues(metric.getMetricValues());
+      if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+        metricToMerge.setTimestamp(metric.getTimestamp());
+      }
+      if (metricToMerge.getStartTime() > metric.getStartTime()) {
+        metricToMerge.setStartTime(metric.getStartTime());
+      }
+    } else {
+      allMetrics.add(metric);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java
new file mode 100644
index 0000000..36aaec2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TimelineMetricsCache {
+
+  private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
+  private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
+  static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
+  static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
+  private final int maxRecsPerName;
+  private final int maxEvictionTimeInMillis;
+
+  TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) {
+    this.maxRecsPerName = maxRecsPerName;
+    this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
+  }
+
+  class TimelineMetricWrapper {
+    private long timeDiff = -1;
+    private long oldestTimestamp = -1;
+    private TimelineMetric timelineMetric;
+
+    TimelineMetricWrapper(TimelineMetric timelineMetric) {
+      this.timelineMetric = timelineMetric;
+      this.oldestTimestamp = timelineMetric.getStartTime();
+    }
+
+    private void updateTimeDiff(long timestamp) {
+      if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
+        timeDiff = timestamp - oldestTimestamp;
+      } else {
+        oldestTimestamp = timestamp;
+      }
+    }
+
+    public void putMetric(TimelineMetric metric) {
+      this.timelineMetric.addMetricValues(metric.getMetricValues());
+      updateTimeDiff(metric.getStartTime());
+    }
+
+    public long getTimeDiff() {
+      return timeDiff;
+    }
+
+    public TimelineMetric getTimelineMetric() {
+      return timelineMetric;
+    }
+  }
+
+  // TODO: Change to ConcurentHashMap with weighted eviction
+  class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {
+    private static final long serialVersionUID = 1L;
+    private boolean gotOverflow = false;
+
+    @Override
+    protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) {
+      boolean overflow = size() > maxRecsPerName;
+      if (overflow && !gotOverflow) {
+        LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest);
+        gotOverflow = true;
+      }
+      return overflow;
+    }
+
+    public TimelineMetric evict(String metricName) {
+      TimelineMetricWrapper metricWrapper = this.get(metricName);
+
+      if (metricWrapper == null
+        || metricWrapper.getTimeDiff() < maxEvictionTimeInMillis) {
+        return null;
+      }
+
+      TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
+      this.remove(metricName);
+
+      return timelineMetric;
+    }
+
+    public void put(String metricName, TimelineMetric timelineMetric) {
+
+      TimelineMetricWrapper metric = this.get(metricName);
+      if (metric == null) {
+        this.put(metricName, new TimelineMetricWrapper(timelineMetric));
+      } else {
+        metric.putMetric(timelineMetric);
+      }
+    }
+  }
+
+  public TimelineMetric getTimelineMetric(String metricName) {
+    if (timelineMetricCache.containsKey(metricName)) {
+      return timelineMetricCache.evict(metricName);
+    }
+
+    return null;
+  }
+
+  public void putTimelineMetric(TimelineMetric timelineMetric) {
+    timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsSink.java
new file mode 100644
index 0000000..a843428
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsSink.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.impl.MsInfo;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsCache;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TimelineMetricsSink extends AbstractTimelineMetricsSink {
+  private static ObjectMapper mapper;
+  private Map<String, Set<String>> useTagsMap = new HashMap<String, Set<String>>();
+  private static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
+  private static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
+  private static final String METRICS_SEND_INTERVAL = "sendInterval";
+  protected HttpClient httpClient = new HttpClient();
+  private TimelineMetricsCache metricsCache;
+
+  static {
+    mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+    mapper.setAnnotationIntrospector(introspector);
+    mapper.getSerializationConfig()
+      .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+  }
+
+  @Override
+  public void init(SubsetConfiguration conf) {
+    super.init(conf);
+
+    int maxRowCacheSize = conf.getInt(MAX_METRIC_ROW_CACHE_SIZE,
+      TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT);
+    int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL,
+      TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); // ~ 1 min
+    metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+
+    conf.setListDelimiter(',');
+    Iterator<String> it = (Iterator<String>) conf.getKeys();
+    while (it.hasNext()) {
+      String propertyName = it.next();
+      if (propertyName != null && propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) {
+        String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length());
+        String[] tags = conf.getStringArray(propertyName);
+        boolean useAllTags = false;
+        Set<String> set = null;
+        if (tags.length > 0) {
+          set = new HashSet<String>();
+          for (String tag : tags) {
+            tag = tag.trim();
+            useAllTags |= tag.equals("*");
+            if (tag.length() > 0) {
+              set.add(tag);
+            }
+          }
+          if (useAllTags) {
+            set = null;
+          }
+        }
+        useTagsMap.put(contextName, set);
+      }
+    }
+  }
+
+  @Override
+  public void putMetrics(MetricsRecord record) {
+    try {
+      String recordName = record.name();
+      String contextName = record.context();
+
+      StringBuilder sb = new StringBuilder();
+      sb.append(contextName);
+      sb.append('.');
+      sb.append(recordName);
+
+      appendPrefix(record, sb);
+      sb.append(".");
+      int sbBaseLen = sb.length();
+
+      Collection<AbstractMetric> metrics =
+        (Collection<AbstractMetric>) record.metrics();
+
+      List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
+      for (AbstractMetric metric : metrics) {
+        sb.append(metric.name());
+        String name = sb.toString();
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(name);
+        timelineMetric.setHostName(getHostName());
+        timelineMetric.setAppId(getServiceName());
+        timelineMetric.setStartTime(record.timestamp());
+        timelineMetric.setType(ClassUtils.getShortCanonicalName(
+          metric.value(), "Number"));
+        timelineMetric.getMetricValues().put(record.timestamp(),
+          metric.value().doubleValue());
+        // Put intermediate values into the cache until it is time to send
+        metricsCache.putTimelineMetric(timelineMetric);
+
+        // Retrieve all values from cache if it is time to send
+        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(name);
+
+        if (cachedMetric != null) {
+          metricList.add(cachedMetric);
+        }
+
+        sb.setLength(sbBaseLen);
+      }
+
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(metricList);
+
+      if (!metricList.isEmpty()) {
+        emitMetrics(timelineMetrics);
+      }
+
+
+    } catch (IOException io) {
+      throw new MetricsException("Failed to putMetrics", io);
+    }
+  }
+
+  private void emitMetrics(TimelineMetrics metrics) throws IOException {
+    String jsonData = mapper.writeValueAsString(metrics);
+
+    SocketAddress socketAddress = getServerSocketAddress();
+
+    if (socketAddress != null) {
+      StringRequestEntity requestEntity = new StringRequestEntity(
+        jsonData, "application/json", "UTF-8");
+
+      PostMethod postMethod = new PostMethod(getCollectorUri());
+      postMethod.setRequestEntity(requestEntity);
+      int statusCode = httpClient.executeMethod(postMethod);
+      if (statusCode != 200) {
+        LOG.info("Unable to POST metrics to collector, " + getCollectorUri());
+      }
+    }
+  }
+
+  // Taken as is from Ganglia30 implementation
+  @InterfaceAudience.Private
+  public void appendPrefix(MetricsRecord record, StringBuilder sb) {
+    String contextName = record.context();
+    Collection<MetricsTag> tags = record.tags();
+    if (useTagsMap.containsKey(contextName)) {
+      Set<String> useTags = useTagsMap.get(contextName);
+      for (MetricsTag t : tags) {
+        if (useTags == null || useTags.contains(t.name())) {
+
+          // the context is always skipped here because it is always added
+
+          // the hostname is always skipped to avoid case-mismatches
+          // from different DNSes.
+
+          if (t.info() != MsInfo.Context && t.info() != MsInfo.Hostname && t.value() != null) {
+            sb.append('.').append(t.name()).append('=').append(t.value());
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void flush() {
+    // TODO: Buffering implementation
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
new file mode 100644
index 0000000..e1f11c9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
@@ -0,0 +1,248 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>ambari-metrics</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>1.3.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>ambari-metrics-hadoop-timelineservice</artifactId>
+  <version>1.3.0-SNAPSHOT</version>
+  <name>ambari-metrics-hadoop-timelineservice</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
+    <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+    <protobuf.version>2.5.0</protobuf.version>
+    <hadoop.version>2.4.0</hadoop.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assemblies/ats.xml</descriptor>
+          </descriptors>
+          <tarLongFileMode>gnu</tarLongFileMode>
+        </configuration>
+        <executions>
+          <execution>
+            <id>build-tarball</id>
+            <phase>none</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <artifactId>ambari-metrics-hadoop-sink</artifactId>
+      <groupId>org.apache.ambari</groupId>
+      <version>1.3.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>servlet-api</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.8.5</version>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.inject.extensions</groupId>
+      <artifactId>guice-servlet</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey.jersey-test-framework</groupId>
+      <artifactId>jersey-test-framework-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey.contribs</groupId>
+      <artifactId>jersey-guice</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <version>${hadoop.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>javax.xml.bind</groupId>
+      <artifactId>jaxb-api</artifactId>
+      <version>2.2.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>com.sun.jersey.jersey-test-framework</groupId>
+      <artifactId>jersey-test-framework-grizzly2</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+      <version>3.2.1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+      <version>1.8</version>
+    </dependency>
+
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/assemblies/ats.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/assemblies/ats.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/assemblies/ats.xml
new file mode 100644
index 0000000..21a6b36
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/assemblies/ats.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly>
+  <!--This 'all' id is not appended to the produced bundle because we do this:
+    http://maven.apache.org/plugins/maven-assembly-plugin/faq.html#required-classifiers
+  -->
+  <id>dist</id>
+  <formats>
+    <format>dir</format>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <files>
+    <file>
+      <source>${project.build.directory}/${artifact.artifactId}-${artifact.version}.jar</source>
+      <outputDirectory>ambari-metrics-${project.version}/lib/ambari-metrics</outputDirectory>
+    </file>
+  </files>
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/assemblies/empty.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/assemblies/empty.xml
new file mode 100644
index 0000000..35738b1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/assemblies/empty.xml
@@ -0,0 +1,21 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+    <id>empty</id>
+    <formats/>
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/conf/hbase-site-metrics-service.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/conf/hbase-site-metrics-service.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/conf/hbase-site-metrics-service.xml
new file mode 100644
index 0000000..4c85581
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/conf/hbase-site-metrics-service.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>hbase.rootdir</name>
+    <value>file:///grid/0/hbase</value>
+  </property>
+  <property>
+    <name>hbase.tmp.dir</name>
+    <value>/grid/0/hbase-tmp</value>
+  </property>
+  <property>
+    <name>hbase.cluster.distributed</name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>hbase.master.wait.on.regionservers.mintostart</name>
+    <value>1</value>
+  </property>
+  <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>localhost</value>
+  </property>
+  <property>
+    <name>phoenix.query.spoolThresholdBytes</name>
+    <value>12582912</value>
+  </property>
+  <property>
+      <name>hbase.zookeeper.property.dataDir</name>
+      <value>/grid/0/zookeeper</value>
+  </property>
+  <property>
+      <name>hbase.client.scanner.caching</name>
+      <value>10000</value>
+  </property>
+  <property>
+    <name>hfile.block.cache.size</name>
+    <value>0.6</value>
+  </property>
+  <property>
+    <name>hbase.regionserver.global.memstore.upperLimit</name>
+    <value>0.2</value>
+  </property>
+  <property>
+    <name>hbase.regionserver.global.memstore.lowerLimit</name>
+    <value>0.1</value>
+  </property>
+  <property>
+    <name>phoenix.groupby.maxCacheSize</name>
+    <value>307200000</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/conf/YarnConfig.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/conf/YarnConfig.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/conf/YarnConfig.java
new file mode 100644
index 0000000..fe5a553
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/conf/YarnConfig.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.conf;
+
+public class YarnConfig extends YarnConfiguration {
+  public static final String TIMELINE_METRICS_SERVICE_PREFIX =
+        TIMELINE_SERVICE_PREFIX + "metrics.";
+
+  public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
+        TIMELINE_METRICS_SERVICE_PREFIX + "aggregator.checkpoint.dir";
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
new file mode 100644
index 0000000..e15198b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+public class ApplicationHistoryClientService extends AbstractService {
+  private static final Log LOG = LogFactory
+    .getLog(ApplicationHistoryClientService.class);
+  private ApplicationHistoryManager history;
+  private ApplicationHistoryProtocol protocolHandler;
+  private Server server;
+  private InetSocketAddress bindAddress;
+
+  public ApplicationHistoryClientService(ApplicationHistoryManager history) {
+    super("ApplicationHistoryClientService");
+    this.history = history;
+    this.protocolHandler = new ApplicationHSClientProtocolHandler();
+  }
+
+  protected void serviceStart() throws Exception {
+    Configuration conf = getConfig();
+    YarnRPC rpc = YarnRPC.create(conf);
+    InetSocketAddress address =
+        conf.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
+
+    server =
+        rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler,
+          address, conf, null, conf.getInt(
+            YarnConfiguration.TIMELINE_SERVICE_HANDLER_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT));
+
+    server.start();
+    this.bindAddress =
+        conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+          server.getListenerAddress());
+    LOG.info("Instantiated ApplicationHistoryClientService at "
+        + this.bindAddress);
+
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (server != null) {
+      server.stop();
+    }
+    super.serviceStop();
+  }
+
+  @Private
+  public ApplicationHistoryProtocol getClientHandler() {
+    return this.protocolHandler;
+  }
+
+  @Private
+  public InetSocketAddress getBindAddress() {
+    return this.bindAddress;
+  }
+
+  private class ApplicationHSClientProtocolHandler implements
+      ApplicationHistoryProtocol {
+
+    @Override
+    public CancelDelegationTokenResponse cancelDelegationToken(
+        CancelDelegationTokenRequest request) throws YarnException, IOException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+        GetApplicationAttemptReportRequest request) throws YarnException,
+        IOException {
+      try {
+        GetApplicationAttemptReportResponse response =
+            GetApplicationAttemptReportResponse.newInstance(history
+              .getApplicationAttempt(request.getApplicationAttemptId()));
+        return response;
+      } catch (IOException e) {
+        throw new ApplicationAttemptNotFoundException(e.getMessage());
+      }
+    }
+
+    @Override
+    public GetApplicationAttemptsResponse getApplicationAttempts(
+        GetApplicationAttemptsRequest request) throws YarnException,
+        IOException {
+      GetApplicationAttemptsResponse response =
+          GetApplicationAttemptsResponse
+            .newInstance(new ArrayList<ApplicationAttemptReport>(history
+              .getApplicationAttempts(request.getApplicationId()).values()));
+      return response;
+    }
+
+    @Override
+    public GetApplicationReportResponse getApplicationReport(
+        GetApplicationReportRequest request) throws YarnException, IOException {
+      try {
+        ApplicationId applicationId = request.getApplicationId();
+        GetApplicationReportResponse response =
+            GetApplicationReportResponse.newInstance(history
+              .getApplication(applicationId));
+        return response;
+      } catch (IOException e) {
+        throw new ApplicationNotFoundException(e.getMessage());
+      }
+    }
+
+    @Override
+    public GetApplicationsResponse getApplications(
+        GetApplicationsRequest request) throws YarnException, IOException {
+      GetApplicationsResponse response =
+          GetApplicationsResponse.newInstance(new ArrayList<ApplicationReport>(
+            history.getAllApplications().values()));
+      return response;
+    }
+
+    @Override
+    public GetContainerReportResponse getContainerReport(
+        GetContainerReportRequest request) throws YarnException, IOException {
+      try {
+        GetContainerReportResponse response =
+            GetContainerReportResponse.newInstance(history.getContainer(request
+              .getContainerId()));
+        return response;
+      } catch (IOException e) {
+        throw new ContainerNotFoundException(e.getMessage());
+      }
+    }
+
+    @Override
+    public GetContainersResponse getContainers(GetContainersRequest request)
+        throws YarnException, IOException {
+      GetContainersResponse response =
+          GetContainersResponse.newInstance(new ArrayList<ContainerReport>(
+            history.getContainers(request.getApplicationAttemptId()).values()));
+      return response;
+    }
+
+    @Override
+    public GetDelegationTokenResponse getDelegationToken(
+        GetDelegationTokenRequest request) throws YarnException, IOException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public RenewDelegationTokenResponse renewDelegationToken(
+        RenewDelegationTokenRequest request) throws YarnException, IOException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java
new file mode 100644
index 0000000..db25d29
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.api.ApplicationContext;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ApplicationHistoryManager extends ApplicationContext {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java
new file mode 100644
index 0000000..b56a595
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ApplicationHistoryManagerImpl extends AbstractService implements
+    ApplicationHistoryManager {
+  private static final Log LOG = LogFactory
+    .getLog(ApplicationHistoryManagerImpl.class);
+  private static final String UNAVAILABLE = "N/A";
+
+  private ApplicationHistoryStore historyStore;
+  private String serverHttpAddress;
+
+  public ApplicationHistoryManagerImpl() {
+    super(ApplicationHistoryManagerImpl.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    LOG.info("ApplicationHistory Init");
+    historyStore = createApplicationHistoryStore(conf);
+    historyStore.init(conf);
+    serverHttpAddress = WebAppUtils.getHttpSchemePrefix(conf) +
+        WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting ApplicationHistory");
+    historyStore.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping ApplicationHistory");
+    historyStore.stop();
+    super.serviceStop();
+  }
+
+  protected ApplicationHistoryStore createApplicationHistoryStore(
+      Configuration conf) {
+    return ReflectionUtils.newInstance(conf.getClass(
+      YarnConfiguration.APPLICATION_HISTORY_STORE,
+      FileSystemApplicationHistoryStore.class,
+      ApplicationHistoryStore.class), conf);
+  }
+
+  @Override
+  public ContainerReport getAMContainer(ApplicationAttemptId appAttemptId)
+      throws IOException {
+    ApplicationReport app =
+        getApplication(appAttemptId.getApplicationId());
+    return convertToContainerReport(historyStore.getAMContainer(appAttemptId),
+        app == null ? null : app.getUser());
+  }
+
+  @Override
+  public Map<ApplicationId, ApplicationReport> getAllApplications()
+      throws IOException {
+    Map<ApplicationId, ApplicationHistoryData> histData =
+        historyStore.getAllApplications();
+    HashMap<ApplicationId, ApplicationReport> applicationsReport =
+        new HashMap<ApplicationId, ApplicationReport>();
+    for (Entry<ApplicationId, ApplicationHistoryData> entry : histData
+      .entrySet()) {
+      applicationsReport.put(entry.getKey(),
+        convertToApplicationReport(entry.getValue()));
+    }
+    return applicationsReport;
+  }
+
+  @Override
+  public ApplicationReport getApplication(ApplicationId appId)
+      throws IOException {
+    return convertToApplicationReport(historyStore.getApplication(appId));
+  }
+
+  private ApplicationReport convertToApplicationReport(
+      ApplicationHistoryData appHistory) throws IOException {
+    ApplicationAttemptId currentApplicationAttemptId = null;
+    String trackingUrl = UNAVAILABLE;
+    String host = UNAVAILABLE;
+    int rpcPort = -1;
+
+    ApplicationAttemptHistoryData lastAttempt =
+        getLastAttempt(appHistory.getApplicationId());
+    if (lastAttempt != null) {
+      currentApplicationAttemptId = lastAttempt.getApplicationAttemptId();
+      trackingUrl = lastAttempt.getTrackingURL();
+      host = lastAttempt.getHost();
+      rpcPort = lastAttempt.getRPCPort();
+    }
+    return ApplicationReport.newInstance(appHistory.getApplicationId(),
+      currentApplicationAttemptId, appHistory.getUser(), appHistory.getQueue(),
+      appHistory.getApplicationName(), host, rpcPort, null,
+      appHistory.getYarnApplicationState(), appHistory.getDiagnosticsInfo(),
+      trackingUrl, appHistory.getStartTime(), appHistory.getFinishTime(),
+      appHistory.getFinalApplicationStatus(), null, "", 100,
+      appHistory.getApplicationType(), null);
+  }
+
+  private ApplicationAttemptHistoryData getLastAttempt(ApplicationId appId)
+      throws IOException {
+    Map<ApplicationAttemptId, ApplicationAttemptHistoryData> attempts =
+        historyStore.getApplicationAttempts(appId);
+    ApplicationAttemptId prevMaxAttemptId = null;
+    for (ApplicationAttemptId attemptId : attempts.keySet()) {
+      if (prevMaxAttemptId == null) {
+        prevMaxAttemptId = attemptId;
+      } else {
+        if (prevMaxAttemptId.getAttemptId() < attemptId.getAttemptId()) {
+          prevMaxAttemptId = attemptId;
+        }
+      }
+    }
+    return attempts.get(prevMaxAttemptId);
+  }
+
+  private ApplicationAttemptReport convertToApplicationAttemptReport(
+      ApplicationAttemptHistoryData appAttemptHistory) {
+    return ApplicationAttemptReport.newInstance(
+      appAttemptHistory.getApplicationAttemptId(), appAttemptHistory.getHost(),
+      appAttemptHistory.getRPCPort(), appAttemptHistory.getTrackingURL(),
+      appAttemptHistory.getDiagnosticsInfo(),
+      appAttemptHistory.getYarnApplicationAttemptState(),
+      appAttemptHistory.getMasterContainerId());
+  }
+
+  @Override
+  public ApplicationAttemptReport getApplicationAttempt(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    return convertToApplicationAttemptReport(historyStore
+      .getApplicationAttempt(appAttemptId));
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, ApplicationAttemptReport>
+      getApplicationAttempts(ApplicationId appId) throws IOException {
+    Map<ApplicationAttemptId, ApplicationAttemptHistoryData> histData =
+        historyStore.getApplicationAttempts(appId);
+    HashMap<ApplicationAttemptId, ApplicationAttemptReport> applicationAttemptsReport =
+        new HashMap<ApplicationAttemptId, ApplicationAttemptReport>();
+    for (Entry<ApplicationAttemptId, ApplicationAttemptHistoryData> entry : histData
+      .entrySet()) {
+      applicationAttemptsReport.put(entry.getKey(),
+        convertToApplicationAttemptReport(entry.getValue()));
+    }
+    return applicationAttemptsReport;
+  }
+
+  @Override
+  public ContainerReport getContainer(ContainerId containerId)
+      throws IOException {
+    ApplicationReport app =
+        getApplication(containerId.getApplicationAttemptId().getApplicationId());
+    return convertToContainerReport(historyStore.getContainer(containerId),
+        app == null ? null: app.getUser());
+  }
+
+  private ContainerReport convertToContainerReport(
+      ContainerHistoryData containerHistory, String user) {
+    // If the container has the aggregated log, add the server root url
+    String logUrl = WebAppUtils.getAggregatedLogURL(
+        serverHttpAddress,
+        containerHistory.getAssignedNode().toString(),
+        containerHistory.getContainerId().toString(),
+        containerHistory.getContainerId().toString(),
+        user);
+    return ContainerReport.newInstance(containerHistory.getContainerId(),
+      containerHistory.getAllocatedResource(),
+      containerHistory.getAssignedNode(), containerHistory.getPriority(),
+      containerHistory.getStartTime(), containerHistory.getFinishTime(),
+      containerHistory.getDiagnosticsInfo(), logUrl,
+      containerHistory.getContainerExitStatus(),
+      containerHistory.getContainerState());
+  }
+
+  @Override
+  public Map<ContainerId, ContainerReport> getContainers(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    ApplicationReport app =
+        getApplication(appAttemptId.getApplicationId());
+    Map<ContainerId, ContainerHistoryData> histData =
+        historyStore.getContainers(appAttemptId);
+    HashMap<ContainerId, ContainerReport> containersReport =
+        new HashMap<ContainerId, ContainerReport>();
+    for (Entry<ContainerId, ContainerHistoryData> entry : histData.entrySet()) {
+      containersReport.put(entry.getKey(),
+        convertToContainerReport(entry.getValue(),
+            app == null ? null : app.getUser()));
+    }
+    return containersReport;
+  }
+
+  @Private
+  @VisibleForTesting
+  public ApplicationHistoryStore getHistoryStore() {
+    return this.historyStore;
+  }
+}


Mime
View raw message