ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adorosz...@apache.org
Subject [05/50] [abbrv] ambari git commit: AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen)
Date Tue, 23 May 2017 09:53:03 GMT
AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/041d353b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/041d353b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/041d353b

Branch: refs/heads/ambari-rest-api-explorer
Commit: 041d353b0d75b20b0322097e13a1701226e6fc97
Parents: 772be78
Author: Dmytro Sen <dsen@apache.org>
Authored: Wed May 17 19:38:29 2017 +0300
Committer: Dmytro Sen <dsen@apache.org>
Committed: Wed May 17 19:38:29 2017 +0300

----------------------------------------------------------------------
 .../logfeeder/metrics/LogFeederAMSClient.java   |  12 +-
 ambari-metrics/ambari-metrics-assembly/pom.xml  |  20 +++
 .../src/main/assembly/monitor-windows.xml       |   7 +
 .../src/main/assembly/monitor.xml               |   9 +-
 .../timeline/AbstractTimelineMetricsSink.java   |  24 ++-
 .../sink/timeline/AggregationResult.java        |  60 +++++++
 .../metrics2/sink/timeline/MetricAggregate.java | 110 ++++++++++++
 .../sink/timeline/MetricClusterAggregate.java   |  73 ++++++++
 .../sink/timeline/MetricHostAggregate.java      |  81 +++++++++
 .../metrics2/sink/timeline/TimelineMetric.java  |   6 +-
 .../TimelineMetricWithAggregatedValues.java     |  65 +++++++
 .../AbstractTimelineMetricSinkTest.java         |  10 ++
 .../availability/MetricCollectorHATest.java     |  10 ++
 .../cache/HandleConnectExceptionTest.java       |  10 ++
 .../sink/flume/FlumeTimelineMetricsSink.java    |  16 ++
 .../timeline/HadoopTimelineMetricsSink.java     |  20 ++-
 .../conf/unix/log4j.properties                  |  31 ++++
 .../conf/windows/log4j.properties               |  29 +++
 .../ambari-metrics-host-aggregator/pom.xml      | 120 +++++++++++++
 .../AbstractMetricPublisherThread.java          | 134 ++++++++++++++
 .../aggregator/AggregatedMetricsPublisher.java  | 101 +++++++++++
 .../host/aggregator/AggregatorApplication.java  | 180 +++++++++++++++++++
 .../host/aggregator/AggregatorWebService.java   |  56 ++++++
 .../host/aggregator/RawMetricsPublisher.java    |  60 +++++++
 .../host/aggregator/TimelineMetricsHolder.java  |  98 ++++++++++
 .../conf/unix/ambari-metrics-monitor            |   2 +-
 .../src/main/python/core/aggregator.py          | 110 ++++++++++++
 .../src/main/python/core/config_reader.py       |  35 +++-
 .../src/main/python/core/controller.py          |  28 +++
 .../src/main/python/core/emitter.py             |   8 +-
 .../src/main/python/core/stop_handler.py        |   3 +-
 .../src/main/python/main.py                     |   6 +-
 .../kafka/KafkaTimelineMetricsReporter.java     |  17 ++
 .../storm/StormTimelineMetricsReporter.java     |  14 ++
 .../sink/storm/StormTimelineMetricsSink.java    |  14 ++
 .../storm/StormTimelineMetricsReporter.java     |  16 ++
 .../sink/storm/StormTimelineMetricsSink.java    |  16 ++
 .../timeline/HBaseTimelineMetricStore.java      |  29 ++-
 .../metrics/timeline/PhoenixHBaseAccessor.java  |   4 +-
 .../timeline/TimelineMetricConfiguration.java   |   2 +
 .../metrics/timeline/TimelineMetricStore.java   |   2 +
 .../timeline/TimelineMetricsAggregatorSink.java |   4 +-
 .../timeline/aggregators/MetricAggregate.java   | 110 ------------
 .../aggregators/MetricClusterAggregate.java     |  73 --------
 .../aggregators/MetricHostAggregate.java        |  81 ---------
 .../TimelineMetricAppAggregator.java            |   1 +
 .../TimelineMetricClusterAggregator.java        |   2 +
 .../TimelineMetricClusterAggregatorSecond.java  |   1 +
 .../TimelineMetricHostAggregator.java           |   1 +
 .../aggregators/TimelineMetricReadHelper.java   |   2 +
 .../webapp/TimelineWebServices.java             |  31 ++++
 .../timeline/ITPhoenixHBaseAccessor.java        |   4 +-
 .../metrics/timeline/MetricTestHelper.java      |   2 +-
 .../timeline/PhoenixHBaseAccessorTest.java      |   4 +-
 .../timeline/TestMetricHostAggregate.java       |   8 +-
 .../timeline/TestTimelineMetricStore.java       |   6 +
 .../TimelineMetricsAggregatorMemorySink.java    |   4 +-
 .../aggregators/ITClusterAggregator.java        |   4 +-
 .../aggregators/ITMetricAggregator.java         |  13 +-
 ...melineMetricClusterAggregatorSecondTest.java |   1 +
 ambari-metrics/pom.xml                          |   1 +
 .../system/impl/AmbariMetricSinkImpl.java       |  10 ++
 .../1.6.1.2.2.0/package/scripts/params.py       |   2 +
 .../hadoop-metrics2-accumulo.properties.j2      |   3 +
 .../0.1.0/configuration/ams-env.xml             |   8 +
 .../0.1.0/configuration/ams-site.xml            |  11 ++
 .../AMBARI_METRICS/0.1.0/metainfo.xml           |   3 +
 .../AMBARI_METRICS/0.1.0/package/scripts/ams.py |  30 ++++
 .../0.1.0/package/scripts/params.py             |   5 +
 .../hadoop-metrics2-hbase.properties.j2         |   3 +
 .../package/templates/metric_monitor.ini.j2     |   7 +
 .../FLUME/1.4.0.2.0/package/scripts/params.py   |   3 +
 .../templates/flume-metrics2.properties.j2      |   2 +
 .../0.96.0.2.0/package/scripts/params_linux.py  |   3 +
 ...-metrics2-hbase.properties-GANGLIA-MASTER.j2 |   2 +
 ...doop-metrics2-hbase.properties-GANGLIA-RS.j2 |   2 +
 .../hadoop-metrics2.properties.xml              |   2 +
 .../0.12.0.2.0/package/scripts/params_linux.py  |   2 +
 .../hadoop-metrics2-hivemetastore.properties.j2 |   2 +
 .../hadoop-metrics2-hiveserver2.properties.j2   |   2 +
 .../templates/hadoop-metrics2-llapdaemon.j2     |   2 +
 .../hadoop-metrics2-llaptaskscheduler.j2        |   2 +
 .../2.1.0.3.0/package/scripts/params_linux.py   |   3 +
 .../hadoop-metrics2-hivemetastore.properties.j2 |   2 +
 .../hadoop-metrics2-hiveserver2.properties.j2   |   2 +
 .../templates/hadoop-metrics2-llapdaemon.j2     |   2 +
 .../hadoop-metrics2-llaptaskscheduler.j2        |   2 +
 .../KAFKA/0.8.1/configuration/kafka-broker.xml  |  11 ++
 .../KAFKA/0.8.1/package/scripts/params.py       |   3 +
 .../STORM/0.9.1/package/scripts/params_linux.py |   2 +
 .../0.9.1/package/templates/config.yaml.j2      |   2 +
 .../templates/storm-metrics2.properties.j2      |   2 +
 .../2.0.6/hooks/before-START/scripts/params.py  |   3 +
 .../templates/hadoop-metrics2.properties.j2     |   2 +
 .../hadoop-metrics2.properties.xml              |   2 +
 .../3.0/hooks/before-START/scripts/params.py    |   2 +
 .../templates/hadoop-metrics2.properties.j2     |   2 +
 .../system/impl/TestAmbariMetricsSinkImpl.java  |  10 ++
 .../2.0/hooks/before-START/scripts/params.py    |   2 +
 99 files changed, 1854 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
index 2d1bf40..39526a5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -89,6 +89,16 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return false;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return 0;
+  }
+
+  @Override
   protected boolean emitMetrics(TimelineMetrics metrics) {
     return super.emitMetrics(metrics);
   }
@@ -103,4 +113,4 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
     return collectorPort;
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/pom.xml b/ambari-metrics/ambari-metrics-assembly/pom.xml
index a4b87de..6b81de5 100644
--- a/ambari-metrics/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics/ambari-metrics-assembly/pom.xml
@@ -35,6 +35,7 @@
   <properties>
     <collector.dir>${project.basedir}/../ambari-metrics-timelineservice</collector.dir>
     <monitor.dir>${project.basedir}/../ambari-metrics-host-monitoring</monitor.dir>
+    <aggregator.dir>${project.basedir}/../ambari-metrics-host-aggregator</aggregator.dir>
     <grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
     <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
@@ -599,6 +600,19 @@
                       </sources>
                     </mapping>
                     <mapping>
+                      <directory>/var/lib/ambari-metrics-monitor/lib</directory>
+                      <sources>
+                        <source>
+                          <location>
+                            ${aggregator.dir}/target/
+                          </location>
+                          <includes>
+                            <include>ambari-metrics-host-aggregator-${project.version}.jar</include>
+                          </includes>
+                        </source>
+                      </sources>
+                    </mapping>
+                    <mapping>
                       <directory>/etc/ambari-metrics-monitor/conf</directory>
                       <configuration>true</configuration>
                     </mapping>
@@ -744,6 +758,7 @@
                     <path>/var/run/ambari-metrics-grafana</path>
                     <path>/var/log/ambari-metrics-grafana</path>
                     <path>/var/lib/ambari-metrics-collector</path>
+                    <path>/var/lib/ambari-metrics-monitor/lib</path>
                     <path>/var/lib/ambari-metrics-grafana</path>
                     <path>/usr/lib/ambari-metrics-hadoop-sink</path>
                     <path>/usr/lib/ambari-metrics-kafka-sink</path>
@@ -1331,6 +1346,11 @@
       <type>pom</type>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-host-aggregator</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
index ab309a1..d015d31 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
@@ -64,6 +64,13 @@
       </includes>
     </fileSet>
     <fileSet>
+      <directory>${aggregator.dir}/conf/windows</directory>
+      <outputDirectory>conf</outputDirectory>
+      <includes>
+        <include>log4j.properties</include>
+      </includes>
+    </fileSet>
+    <fileSet>
       <directory>${monitor.dir}/conf/windows</directory>
       <outputDirectory>/</outputDirectory>
       <includes>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
index 99a41c3..448fe62 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
@@ -46,6 +46,13 @@
       </includes>
     </fileSet>
     <fileSet>
+      <directory>${aggregator.dir}/conf/unix</directory>
+      <outputDirectory>conf</outputDirectory>
+      <includes>
+        <include>log4j.properties</include>
+      </includes>
+    </fileSet>
+    <fileSet>
       <directory>${monitor.dir}/conf/unix</directory>
       <outputDirectory>bin</outputDirectory>
       <includes>
@@ -68,4 +75,4 @@
 
 
 
-</assembly>
\ No newline at end of file
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 2c6fae2..a8dc571 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -78,6 +78,8 @@ public abstract class AbstractTimelineMetricsSink {
   public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path";
   public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type";
   public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
+  public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation";
+  public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port";
   public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
   public static final String INSTANCE_ID_PROPERTY = "instanceId";
   public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
@@ -241,8 +243,14 @@ public abstract class AbstractTimelineMetricsSink {
   }
 
   protected boolean emitMetrics(TimelineMetrics metrics) {
-    String collectorHost = getCurrentCollectorHost();
-    String connectUrl = getCollectorUri(collectorHost);
+    String connectUrl;
+    if (isHostInMemoryAggregationEnabled()) {
+      connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort()));
+    } else {
+      String collectorHost  = getCurrentCollectorHost();
+      connectUrl = getCollectorUri(collectorHost);
+    }
+
     String jsonData = null;
     LOG.debug("EmitMetrics connectUrl = "  + connectUrl);
     try {
@@ -562,4 +570,16 @@ public abstract class AbstractTimelineMetricsSink {
    * @return String "host1"
    */
   abstract protected String getHostname();
+
+  /**
+   * Check if host in-memory aggregation is enabled
+   * @return
+   */
+  abstract protected boolean isHostInMemoryAggregationEnabled();
+
+  /**
+   * In memory aggregation port
+   * @return
+   */
+  abstract protected int getHostInMemoryAggregationPort();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
new file mode 100644
index 0000000..c903e3d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Set;
+
+@XmlRootElement(name="AggregationResult")
+public class AggregationResult {
+    protected Set<TimelineMetricWithAggregatedValues> result;
+    protected Long timeInMilis;
+
+    @Override
+    public String toString() {
+        return "AggregationResult{" +
+                "result=" + result +
+                ", timeInMilis=" + timeInMilis +
+                '}';
+    }
+
+    public AggregationResult() {
+    }
+
+    public AggregationResult(Set<TimelineMetricWithAggregatedValues> result, Long timeInMilis) {
+        this.result = result;
+        this.timeInMilis = timeInMilis;
+    }
+    @XmlElement
+    public Set<TimelineMetricWithAggregatedValues> getResult() {
+        return result;
+    }
+
+    public void setResult(Set<TimelineMetricWithAggregatedValues> result) {
+        this.result = result;
+    }
+    @XmlElement
+    public Long getTimeInMilis() {
+        return timeInMilis;
+    }
+
+    public void setTimeInMilis(Long timeInMilis) {
+        this.timeInMilis = timeInMilis;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
new file mode 100644
index 0000000..84cba0e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+  @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  protected Double sum = 0.0;
+  protected Double deviation;
+  protected Double max = Double.MIN_VALUE;
+  protected Double min = Double.MAX_VALUE;
+
+  public MetricAggregate() {
+  }
+
+  MetricAggregate(Double sum, Double deviation, Double max,
+                  Double min) {
+    this.sum = sum;
+    this.deviation = deviation;
+    this.max = max;
+    this.min = min;
+  }
+
+  public void updateSum(Double sum) {
+    this.sum += sum;
+  }
+
+  public void updateMax(Double max) {
+    if (max > this.max) {
+      this.max = max;
+    }
+  }
+
+  public void updateMin(Double min) {
+    if (min < this.min) {
+      this.min = min;
+    }
+  }
+
+  @JsonProperty("sum")
+  public Double getSum() {
+    return sum;
+  }
+
+  @JsonProperty("deviation")
+  public Double getDeviation() {
+    return deviation;
+  }
+
+  @JsonProperty("max")
+  public Double getMax() {
+    return max;
+  }
+
+  @JsonProperty("min")
+  public Double getMin() {
+    return min;
+  }
+
+  public void setSum(Double sum) {
+    this.sum = sum;
+  }
+
+  public void setDeviation(Double deviation) {
+    this.deviation = deviation;
+  }
+
+  public void setMax(Double max) {
+    this.max = max;
+  }
+
+  public void setMin(Double min) {
+    this.min = min;
+  }
+
+  public String toJSON() throws IOException {
+    return mapper.writeValueAsString(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..7ef2c1d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+  private int numberOfHosts;
+
+  @JsonCreator
+  public MetricClusterAggregate() {
+  }
+
+  public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+                         Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  @JsonProperty("numberOfHosts")
+  public int getNumberOfHosts() {
+    return numberOfHosts;
+  }
+
+  public void updateNumberOfHosts(int count) {
+    this.numberOfHosts += count;
+  }
+
+  public void setNumberOfHosts(int numberOfHosts) {
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  public void updateAggregates(MetricClusterAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+  }
+
+  @Override
+  public String toString() {
+    return "MetricAggregate{" +
+      "sum=" + sum +
+      ", numberOfHosts=" + numberOfHosts +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..e190913
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+  private long numberOfSamples = 0;
+
+  @JsonCreator
+  public MetricHostAggregate() {
+    super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+  }
+
+  public MetricHostAggregate(Double sum, int numberOfSamples,
+                             Double deviation,
+                             Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  @JsonProperty("numberOfSamples")
+  public long getNumberOfSamples() {
+    return numberOfSamples == 0 ? 1 : numberOfSamples;
+  }
+
+  public void updateNumberOfSamples(long count) {
+    this.numberOfSamples += count;
+  }
+
+  public void setNumberOfSamples(long numberOfSamples) {
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  public double calculateAverage() {
+    return sum / numberOfSamples;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  public void updateAggregates(MetricHostAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+  }
+
+  @Override
+  public String toString() {
+    return "MetricHostAggregate{" +
+      "sum=" + sum +
+      ", numberOfSamples=" + numberOfSamples +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 44c9d4a..edace52 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -45,7 +45,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
   private String type;
   private String units;
   private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-  private Map<String, String> metadata = new HashMap<>();
+  private HashMap<String, String> metadata = new HashMap<>();
 
   // default
   public TimelineMetric() {
@@ -151,11 +151,11 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
   }
 
   @XmlElement(name = "metadata")
-  public Map<String,String> getMetadata () {
+  public HashMap<String,String> getMetadata () {
     return metadata;
   }
 
-  public void setMetadata (Map<String,String> metadata) {
+  public void setMetadata (HashMap<String,String> metadata) {
     this.metadata = metadata;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
new file mode 100644
index 0000000..626ac5f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement(name = "TimelineMetricWithAggregatedValues")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetricWithAggregatedValues {
+    private TimelineMetric timelineMetric;
+    private MetricHostAggregate metricAggregate;
+
+    public TimelineMetricWithAggregatedValues() {
+    }
+
+    public TimelineMetricWithAggregatedValues(TimelineMetric metric, MetricHostAggregate metricAggregate) {
+        timelineMetric = metric;
+        this.metricAggregate = metricAggregate;
+    }
+
+    @XmlElement
+    public MetricHostAggregate getMetricAggregate() {
+        return metricAggregate;
+    }
+    @XmlElement
+    public TimelineMetric getTimelineMetric() {
+        return timelineMetric;
+    }
+
+    public void setTimelineMetric(TimelineMetric timelineMetric) {
+        this.timelineMetric = timelineMetric;
+    }
+
+    public void setMetricAggregate(MetricHostAggregate metricAggregate) {
+        this.metricAggregate = metricAggregate;
+    }
+
+    @Override
+    public String toString() {
+        return "TimelineMetricWithAggregatedValues{" +
+                "timelineMetric=" + timelineMetric +
+                ", metricAggregate=" + metricAggregate +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
index 9b0cdbe..ce2cf79 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
@@ -90,6 +90,16 @@ public class AbstractTimelineMetricSinkTest {
     }
 
     @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return true;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
+
+    @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
       super.init();
       return super.emitMetrics(metrics);

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
index a393a96..f0174d5 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
@@ -192,5 +192,15 @@ public class MetricCollectorHATest {
     protected String getHostname() {
       return "h1";
     }
+
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return true;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 32fe32e..4eb75eb 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -125,6 +125,16 @@ public class HandleConnectExceptionTest {
     }
 
     @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return false;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
+
+    @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
       super.init();
       return super.emitMetrics(metrics);

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 904c916..6277907 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -63,6 +63,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private int timeoutSeconds = 10;
   private boolean setInstanceId;
   private String instanceId;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
+
 
   @Override
   public void start() {
@@ -110,6 +113,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
     setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
     instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
+
+    hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
     // Initialize the collector write strategy
     super.init();
 
@@ -162,6 +168,16 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     return hostname;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   public void setPollFrequency(long pollFrequency) {
     this.pollFrequency = pollFrequency;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 11e16c2..c235c7c 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -75,6 +75,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
       return t;
     }
   });
+  private int hostInMemoryAggregationPort;
+  private boolean hostInMemoryAggregationEnabled;
 
   @Override
   public void init(SubsetConfiguration conf) {
@@ -107,7 +109,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
     collectorHosts = parseHostsStringArrayIntoCollection(conf.getStringArray(COLLECTOR_HOSTS_PROPERTY));
     port = conf.getString(COLLECTOR_PORT, "6188");
-
+    hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY);
+    hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY);
     if (collectorHosts.isEmpty()) {
       LOG.error("No Metric collector configured.");
     } else {
@@ -249,6 +252,16 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void putMetrics(MetricsRecord record) {
     try {
       String recordName = record.name();
@@ -308,9 +321,10 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
 
       int sbBaseLen = sb.length();
       List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
-      Map<String, String> metadata = null;
+      HashMap<String, String> metadata = null;
       if (skipAggregation) {
-        metadata = Collections.singletonMap("skipAggregation", "true");
+        metadata = new HashMap<>();
+        metadata.put("skipAggregation", "true");
       }
       long startTime = record.timestamp();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
new file mode 100644
index 0000000..d7ceedd
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/var/log/ambari-metrics-monitor/ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
new file mode 100644
index 0000000..d9aabab
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=\\var\\log\\ambari-metrics-monitor\\ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
new file mode 100644
index 0000000..c2c7897
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>ambari-metrics</artifactId>
+        <groupId>org.apache.ambari</groupId>
+        <version>2.0.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>ambari-metrics-host-aggregator</artifactId>
+    <packaging>jar</packaging>
+
+    <name>ambari-metrics-host-aggregator</name>
+    <url>http://maven.apache.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>3.8.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>14.0.1</version>
+        </dependency>
+        <dependency>
+              <groupId>org.apache.ambari</groupId>
+              <artifactId>ambari-metrics-common</artifactId>
+              <version>2.0.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+            <version>2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-json</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-server</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.xml.bind</groupId>
+            <artifactId>jaxb-api</artifactId>
+            <version>2.2.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-core</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.7.1.2.3.4.0-3347</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>1.6</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
+
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
new file mode 100644
index 0000000..b1f60fa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Map;
+
+/**
+ * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
+ */
+public abstract class AbstractMetricPublisherThread extends Thread {
+    protected int publishIntervalInSeconds;
+    protected String publishURL;
+    protected ObjectMapper objectMapper;
+    private Log LOG;
+    protected TimelineMetricsHolder timelineMetricsHolder;
+
+    public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) {
+        LOG = LogFactory.getLog(this.getClass());
+        this.publishURL = publishURL;
+        this.publishIntervalInSeconds = publishIntervalInSeconds;
+        this.timelineMetricsHolder = timelineMetricsHolder;
+        objectMapper = new ObjectMapper();
+        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+        objectMapper.setAnnotationIntrospector(introspector);
+        objectMapper.getSerializationConfig()
+                .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+    }
+
+    /**
+     * Publishes metrics to collector in specified intervals while not interrupted.
+     */
+    @Override
+    public void run() {
+        while (!isInterrupted()) {
+            try {
+                sleep(this.publishIntervalInSeconds * 1000);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            try {
+                processAndPublishMetrics(getMetricsFromCache());
+            } catch (Exception e) {
+                LOG.error("Couldn't process and send metrics : ",e);
+            }
+        }
+    }
+
+    /**
+     * Processes and sends metrics to collector.
+     * @param metricsFromCache
+     * @throws Exception
+     */
+    protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception {
+        if (metricsFromCache.size()==0) return;
+
+        LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
+        publishMetricsJson(processMetrics(metricsFromCache));
+    }
+
+    /**
+     * Returns metrics map. Source is based on implementation.
+     * @return
+     */
+    protected abstract Map<Long,TimelineMetrics> getMetricsFromCache();
+
+    /**
+     * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
+     * @param metricValues
+     * @return
+     */
+    protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues);
+
+    protected void publishMetricsJson(String jsonData) throws Exception {
+        int timeout = 5 * 1000;
+        HttpURLConnection connection = null;
+        if (this.publishURL == null) {
+            throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+        }
+        LOG.info("Collector URL : " + publishURL);
+        connection = (HttpURLConnection) new URL(this.publishURL).openConnection();
+
+        connection.setRequestMethod("POST");
+        connection.setRequestProperty("Content-Type", "application/json");
+        connection.setRequestProperty("Connection", "Keep-Alive");
+        connection.setConnectTimeout(timeout);
+        connection.setReadTimeout(timeout);
+        connection.setDoOutput(true);
+
+        if (jsonData != null) {
+            try (OutputStream os = connection.getOutputStream()) {
+                os.write(jsonData.getBytes("UTF-8"));
+            }
+        }
+        int responseCode = connection.getResponseCode();
+        if (responseCode != 200) {
+            throw new Exception("responseCode is " + responseCode);
+        }
+        LOG.info("Successfully sent metrics.");
+    }
+
+    /**
+     * Interrupts the thread.
+     */
+    protected void stopPublisher() {
+        this.interrupt();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
new file mode 100644
index 0000000..0540ec9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Thread that aggregates and publishes metrics to collector on specified interval.
+ */
+public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
+
+    private Log LOG;
+
+    public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
+        super(timelineMetricsHolder, collectorURL, interval);
+        LOG = LogFactory.getLog(this.getClass());
+    }
+
+    /**
+     * get metrics map form @TimelineMetricsHolder
+     * @return
+     */
+    @Override
+    protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+        return timelineMetricsHolder.extractMetricsForAggregationPublishing();
+    }
+
+    /**
+     * Aggregates given metrics and converts them into json string that will be send to collector
+     * @param metricForAggregationValues
+     * @return
+     */
+    @Override
+    protected String processMetrics(Map<Long, TimelineMetrics> metricForAggregationValues) {
+        HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
+        for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
+            for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+                if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
+                    nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
+                }
+                nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
+            }
+        }
+        Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
+        for (TimelineMetrics metrics : nameToMetricMap.values()) {
+            double sum = 0;
+            double max = Integer.MIN_VALUE;
+            double min = Integer.MAX_VALUE;
+            int count = 0;
+            for (TimelineMetric metric : metrics.getMetrics()) {
+                for (Double value : metric.getMetricValues().values()) {
+                    sum+=value;
+                    max = Math.max(max, value);
+                    min = Math.min(min, value);
+                    count++;
+                }
+            }
+            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
+            tmpMetric.setMetricValues(new TreeMap<Long, Double>());
+            metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
+        }
+        String json = null;
+        try {
+            json = objectMapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
+            LOG.debug(json);
+        } catch (Exception e) {
+            LOG.error("Failed to convert result into json", e);
+        }
+
+        return json;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
new file mode 100644
index 0000000..c6b703b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.sun.jersey.api.container.httpserver.HttpServerFactory;
+import com.sun.jersey.api.core.PackagesResourceConfig;
+import com.sun.jersey.api.core.ResourceConfig;
+import com.sun.net.httpserver.HttpServer;
+
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
+ */
+public class AggregatorApplication
+{
+    private static final int STOP_SECONDS_DELAY = 0;
+    private static final int JOIN_SECONDS_TIMEOUT = 2;
+    private static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
+    private static String AGGREGATED_POST_PREFIX = "/aggregated";
+    private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+    private static Log LOG = LogFactory.getLog("AggregatorApplication.class");
+    private final int webApplicationPort;
+    private final int rawPublishingInterval;
+    private final int aggregationInterval;
+    private Configuration configuration;
+    private String [] collectorHosts;
+    private AggregatedMetricsPublisher aggregatePublisher;
+    private RawMetricsPublisher rawPublisher;
+    private TimelineMetricsHolder timelineMetricsHolder;
+    private HttpServer httpServer;
+
+    public AggregatorApplication(String collectorHosts) {
+        initConfiguration();
+        this.collectorHosts = collectorHosts.split(",");
+        this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
+        this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
+        this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
+        this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
+        try {
+            this.httpServer = createHttpServer();
+        } catch (IOException e) {
+            LOG.error("Exception while starting HTTP server. Exiting", e);
+            System.exit(1);
+        }
+    }
+
+    private void initConfiguration() {
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        if (classLoader == null) {
+            classLoader = getClass().getClassLoader();
+        }
+
+        URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+        LOG.info("Found metric service configuration: " + amsResUrl);
+        if (amsResUrl == null) {
+            throw new IllegalStateException("Unable to initialize the metrics " +
+                    "subsystem. No ams-site present in the classpath.");
+        }
+        configuration = new Configuration(true);
+        try {
+            configuration.addResource(amsResUrl.toURI().toURL());
+        } catch (Exception e) {
+            LOG.error("Couldn't init configuration. ", e);
+            System.exit(1);
+        }
+    }
+
+    private String getHostName() {
+        String hostName = "localhost";
+        try {
+            hostName = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            LOG.error(e);
+        }
+        return hostName;
+    }
+
+    private URI getURI() {
+        URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
+        LOG.info(String.format("Web server at %s", uri));
+        return uri;
+    }
+
+    private HttpServer createHttpServer() throws IOException {
+        ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
+        HashMap<String, Object> params = new HashMap();
+        params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
+        resourceConfig.setPropertiesAndFeatures(params);
+        return HttpServerFactory.create(getURI(), resourceConfig);
+    }
+
+    private void startWebServer() {
+        LOG.info("Starting web server.");
+        this.httpServer.start();
+    }
+
+    private void startAggregatePublisherThread() {
+        LOG.info("Starting aggregated metrics publisher.");
+        String collectorURL = buildBasicCollectorURL(collectorHosts[0]) + AGGREGATED_POST_PREFIX;
+        aggregatePublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, collectorURL, aggregationInterval);
+        aggregatePublisher.start();
+    }
+
+    private void startRawPublisherThread() {
+        LOG.info("Starting raw metrics publisher.");
+        String collectorURL = buildBasicCollectorURL(collectorHosts[0]);
+        rawPublisher = new RawMetricsPublisher(timelineMetricsHolder, collectorURL, rawPublishingInterval);
+        rawPublisher.start();
+    }
+
+
+
+    private void stop() {
+        aggregatePublisher.stopPublisher();
+        rawPublisher.stopPublisher();
+        httpServer.stop(STOP_SECONDS_DELAY);
+        LOG.info("Stopped web server.");
+        try {
+            LOG.info("Waiting for threads to join.");
+            aggregatePublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+            rawPublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+            LOG.info("Gracefully stopped Aggregator Application.");
+        } catch (InterruptedException e) {
+            LOG.error("Received exception during stop : ", e);
+
+        }
+
+    }
+
+    private String buildBasicCollectorURL(String host) {
+        String port = configuration.get("timeline.metrics.service.webapp.address", "0.0.0.0:6188").split(":")[1];
+        String protocol = configuration.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
+        return String.format(BASE_POST_URL, protocol, host, port);
+    }
+
+    public static void main( String[] args ) throws Exception {
+        LOG.info("Starting aggregator application");
+        if (args.length != 1) {
+            throw new Exception("This jar should be run with 1 argument - collector hosts separated with coma");
+        }
+
+        final AggregatorApplication app = new AggregatorApplication(args[0]);
+        app.startAggregatePublisherThread();
+        app.startRawPublisherThread();
+        app.startWebServer();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                LOG.info("Stopping aggregator application");
+                app.stop();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
new file mode 100644
index 0000000..f96d0ed
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+
+@Singleton
+@Path("/ws/v1/timeline")
+public class AggregatorWebService {
+    TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance();
+
+    @GET
+    @Produces("text/json")
+    @Path("/metrics")
+    public Response helloWorld() throws IOException {
+        return Response.ok().build();
+    }
+
+    @POST
+    @Produces(MediaType.TEXT_PLAIN)
+    @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+    @Path("/metrics")
+    public Response postMetrics(
+            TimelineMetrics metrics) {
+        metricsHolder.putMetricsForAggregationPublishing(metrics);
+        metricsHolder.putMetricsForRawPublishing(metrics);
+        return Response.ok().build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
new file mode 100644
index 0000000..f317ed9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.Map;
+
+public class RawMetricsPublisher extends AbstractMetricPublisherThread {
+    private final Log LOG;
+
+    public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
+        super(timelineMetricsHolder, collectorURL, interval);
+        LOG = LogFactory.getLog(this.getClass());
+    }
+
+
+    @Override
+    protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+        return timelineMetricsHolder.extractMetricsForRawPublishing();
+    }
+
+    @Override
+    protected String processMetrics(Map<Long, TimelineMetrics> metricValues) {
+        //merge everything in one TimelineMetrics object
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        for (TimelineMetrics metrics : metricValues.values()) {
+            for (TimelineMetric timelineMetric : metrics.getMetrics())
+                timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        }
+        //map TimelineMetrics to json string
+        String json = null;
+        try {
+            json = objectMapper.writeValueAsString(timelineMetrics);
+            LOG.debug(json);
+        } catch (Exception e) {
+            LOG.error("Failed to convert result into json", e);
+        }
+        return json;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
new file mode 100644
index 0000000..b355c97
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Singleton class with 2 guava caches for raw and aggregated metrics storing
+ */
+public class TimelineMetricsHolder {
+    private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
+    private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
+    private Cache<Long, TimelineMetrics> aggregationMetricsCache;
+    private Cache<Long, TimelineMetrics> rawMetricsCache;
+    private static TimelineMetricsHolder instance = null;
+    //to ensure no metric values are expired
+    private static int EXPIRE_DELAY = 30;
+    ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock();
+    ReadWriteLock rawCacheLock = new ReentrantReadWriteLock();
+
+    private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+        this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+        this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+    }
+
+    public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+        if (instance == null) {
+            instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime);
+        }
+        return instance;
+    }
+
+    /**
+     * Uses default expiration time for caches initialization if they are not initialized yet.
+     * @return
+     */
+    public static TimelineMetricsHolder getInstance() {
+        return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME);
+    }
+
+    public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
+        aggregationCacheLock.writeLock().lock();
+        aggregationMetricsCache.put(System.currentTimeMillis(), timelineMetrics);
+        aggregationCacheLock.writeLock().unlock();
+    }
+
+    public Map<Long, TimelineMetrics> extractMetricsForAggregationPublishing() {
+        return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
+    }
+
+    public void putMetricsForRawPublishing(TimelineMetrics metrics) {
+        rawCacheLock.writeLock().lock();
+        rawMetricsCache.put(System.currentTimeMillis(), metrics);
+        rawCacheLock.writeLock().unlock();
+    }
+
+    public Map<Long, TimelineMetrics> extractMetricsForRawPublishing() {
+        return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
+    }
+
+    /**
+     * Returns values from cache and clears the cache
+     * @param cache
+     * @param lock
+     * @return
+     */
+    private Map<Long, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<Long, TimelineMetrics> cache, ReadWriteLock lock) {
+        lock.writeLock().lock();
+        Map<Long, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
+        cache.invalidateAll();
+        lock.writeLock().unlock();
+        return metricsMap;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
index 967e133..9bbb271 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
+++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
@@ -24,7 +24,7 @@ METRIC_MONITOR_PY_SCRIPT=${RESOURCE_MONITORING_DIR}/main.py
 PIDFILE=/var/run/ambari-metrics-monitor/ambari-metrics-monitor.pid
 OUTFILE=/var/log/ambari-metrics-monitor/ambari-metrics-monitor.out
 
-STOP_TIMEOUT=5
+STOP_TIMEOUT=10
 
 OK=0
 NOTOK=1

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
new file mode 100644
index 0000000..2249e53
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import threading
+import subprocess
+import logging
+import urllib2
+
+logger = logging.getLogger()
+class Aggregator(threading.Thread):
+  def __init__(self, config, stop_handler):
+    threading.Thread.__init__(self)
+    self._config = config
+    self._stop_handler = stop_handler
+    self._aggregator_process = None
+    self._sleep_interval = config.get_collector_sleep_interval()
+    self.stopped = False
+
+  def run(self):
+    java_home = self._config.get_java_home()
+    collector_hosts = self._config.get_metrics_collector_hosts_as_string()
+    jvm_agrs = self._config.get_aggregator_jvm_agrs()
+    config_dir = self._config.get_config_dir()
+    class_name = "org.apache.hadoop.metrics2.host.aggregator.AggregatorApplication"
+    ams_log_file = "ambari-metrics-aggregator.log"
+    additional_classpath = ':{0}'.format(config_dir)
+    ams_log_dir = self._config.ams_monitor_log_dir()
+    logger.info('Starting Aggregator thread.')
+    cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6}"\
+      .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, collector_hosts)
+
+    logger.info("Executing : {0}".format(cmd))
+
+    self._aggregator_process = subprocess.Popen([cmd], stdout = None, stderr = None, shell = True)
+    while not self.stopped:
+      if 0 == self._stop_handler.wait(self._sleep_interval):
+        break
+    pass
+    self.stop()
+
+  def stop(self):
+    self.stopped = True
+    if self._aggregator_process :
+      logger.info('Stopping Aggregator thread.')
+      self._aggregator_process.terminate()
+
+class AggregatorWatchdog(threading.Thread):
+  SLEEP_TIME = 30
+  CONNECTION_TIMEOUT = 5
+  AMS_AGGREGATOR_METRICS_CHECK_URL = "/ws/v1/timeline/metrics/"
+  def __init__(self, config, stop_handler):
+    threading.Thread.__init__(self)
+    self._config = config
+    self._stop_handler = stop_handler
+    self.URL = 'http://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
+    self._is_ok = threading.Event()
+    self.set_is_ok(True)
+    self.stopped = False
+
+  def run(self):
+    logger.info('Starting Aggregator Watchdog thread.')
+    while not self.stopped:
+      if 0 == self._stop_handler.wait(self.SLEEP_TIME):
+        break
+      try:
+        conn = urllib2.urlopen(self.URL, timeout=self.CONNECTION_TIMEOUT)
+        self.set_is_ok(True)
+      except (KeyboardInterrupt, SystemExit):
+        raise
+      except Exception, e:
+        self.set_is_ok(False)
+        continue
+      if conn.code != 200:
+        self.set_is_ok(False)
+        continue
+      conn.close()
+
+  def is_ok(self):
+    return self._is_ok.is_set()
+
+  def set_is_ok(self, value):
+    if value == False and self.is_ok() != value:
+      logger.warning("Watcher couldn't connect to aggregator.")
+      self._is_ok.clear()
+    else:
+      self._is_ok.set()
+
+
+  def stop(self):
+    logger.info('Stopping watcher thread.')
+    self.stopped = True
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 2670e76..d1429ed 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -30,6 +30,8 @@ from ambari_commons.os_family_impl import OsFamilyImpl
 # Abstraction for OS-dependent configuration defaults
 #
 class ConfigDefaults(object):
+  def get_config_dir(self):
+    pass
   def get_config_file_path(self):
     pass
   def get_metric_file_path(self):
@@ -40,11 +42,14 @@ class ConfigDefaults(object):
 @OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
 class ConfigDefaultsWindows(ConfigDefaults):
   def __init__(self):
+    self._CONFIG_DIR = "conf"
     self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini"
     self._METRIC_FILE_PATH = "conf\\metric_groups.conf"
     self._METRIC_FILE_PATH = "conf\\ca.pem"
     pass
 
+  def get_config_dir(self):
+    return self._CONFIG_DIR
   def get_config_file_path(self):
     return self._CONFIG_FILE_PATH
   def get_metric_file_path(self):
@@ -55,11 +60,13 @@ class ConfigDefaultsWindows(ConfigDefaults):
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class ConfigDefaultsLinux(ConfigDefaults):
   def __init__(self):
+    self._CONFIG_DIR = "/etc/ambari-metrics-monitor/conf/"
     self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
     self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
     self._CA_CERTS_FILE_PATH = "/etc/ambari-metrics-monitor/conf/ca.pem"
     pass
-
+  def get_config_dir(self):
+    return self._CONFIG_DIR
   def get_config_file_path(self):
     return self._CONFIG_FILE_PATH
   def get_metric_file_path(self):
@@ -71,6 +78,7 @@ configDefaults = ConfigDefaults()
 
 config = ConfigParser.RawConfigParser()
 
+CONFIG_DIR = configDefaults.get_config_dir()
 CONFIG_FILE_PATH = configDefaults.get_config_file_path()
 METRIC_FILE_PATH = configDefaults.get_metric_file_path()
 CA_CERTS_FILE_PATH = configDefaults.get_ca_certs_file_path()
@@ -191,6 +199,8 @@ class Configuration:
         # No hostname script identified in the ambari agent conf
         pass
     pass
+  def get_config_dir(self):
+    return CONFIG_DIR
 
   def getConfig(self):
     return self.config
@@ -214,10 +224,14 @@ class Configuration:
   def get_hostname_config(self):
     return self.get("default", "hostname", None)
 
-  def get_metrics_collector_hosts(self):
+  def get_metrics_collector_hosts_as_list(self):
     hosts = self.get("default", "metrics_servers", "localhost")
     return hosts.split(",")
 
+  def get_metrics_collector_hosts_as_string(self):
+    hosts = self.get("default", "metrics_servers", "localhost")
+    return hosts
+
   def get_failover_strategy(self):
     return self.get("collector", "failover_strategy", ROUND_ROBIN_FAILOVER_STRATEGY)
 
@@ -239,6 +253,23 @@ class Configuration:
   def is_server_https_enabled(self):
     return "true" == str(self.get("collector", "https_enabled")).lower()
 
+  def get_java_home(self):
+    return self.get("aggregation", "java_home")
+
+  def is_inmemory_aggregation_enabled(self):
+    return "true" == str(self.get("aggregation", "host_in_memory_aggregation")).lower()
+
+  def get_inmemory_aggregation_port(self):
+    return self.get("aggregation", "host_in_memory_aggregation_port")
+
+  def get_aggregator_jvm_agrs(self):
+    hosts = self.get("aggregation", "jvm_arguments", "-Xmx256m -Xms128m -XX:PermSize=68m")
+    return hosts
+
+  def ams_monitor_log_dir(self):
+    hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
+    return hosts
+
   def is_set_instanceid(self):
     return "true" == str(self.get("default", "set.instanceId", 'false')).lower()
 


Mime
View raw message