ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swa...@apache.org
Subject [49/50] [abbrv] ambari git commit: Merge branch 'trunk' into branch-3.0-ams
Date Tue, 26 Sep 2017 22:26:48 GMT
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-common/pom.xml
index ebd0fc9,cae9734..1a7fef3
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@@ -106,12 -106,12 +106,16 @@@
                  </relocation>
                  <relocation>
                    <pattern>org.jboss</pattern>
 -                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jboss</shadedPattern>
 +                  <shadedPattern>org.apache.ambari.metrics.sink.relocated.jboss</shadedPattern>
 +                </relocation>
 +                <relocation>
 +                  <pattern>net.sf.ehcache</pattern>
 +                  <shadedPattern>org.apache.ambari.metrics.sink.relocated.ehcache</shadedPattern>
                  </relocation>
+                 <relocation>
+                   <pattern>org.apache.http</pattern>
+                   <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.apache.http</shadedPattern>
+                 </relocation>
                </relocations>
              </configuration>
            </execution>

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 73ed3c4,337f640..3c06032
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@@ -245,14 -290,15 +291,19 @@@ public abstract class AbstractTimelineM
  
    protected boolean emitMetrics(TimelineMetrics metrics) {
      String connectUrl;
+     boolean validCollectorHost = true;
+ 
      if (isHostInMemoryAggregationEnabled()) {
 -      connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort()));
 +      String hostname = "localhost";
 +      if (getHostInMemoryAggregationProtocol().equalsIgnoreCase("https")) {
 +        hostname = getHostname();
 +      }
 +      connectUrl = constructTimelineMetricUri(getHostInMemoryAggregationProtocol(), hostname, String.valueOf(getHostInMemoryAggregationPort()));
      } else {
        String collectorHost  = getCurrentCollectorHost();
+       if (collectorHost == null) {
+         validCollectorHost = false;
+       }
        connectUrl = getCollectorUri(collectorHost);
      }
  

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
index f8ed95f,1e5cc82..c73cbce
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@@ -6,9 -6,9 +6,9 @@@
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
++ * <p>
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * <p>
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@@ -17,227 -17,190 +17,227 @@@
   */
  package org.apache.hadoop.metrics2.host.aggregator;
  
--import com.sun.jersey.api.container.httpserver.HttpServerFactory;
--import com.sun.jersey.api.core.PackagesResourceConfig;
--import com.sun.jersey.api.core.ResourceConfig;
--import com.sun.net.httpserver.HttpServer;
--
- import javax.net.ssl.SSLContext;
--import javax.ws.rs.core.UriBuilder;
 -import java.io.IOException;
  import java.net.InetAddress;
  import java.net.URI;
  import java.net.URL;
  import java.net.UnknownHostException;
  import java.util.HashMap;
  
- import com.sun.net.httpserver.HttpsConfigurator;
- import com.sun.net.httpserver.HttpsServer;
++import javax.net.ssl.SSLContext;
++import javax.ws.rs.core.UriBuilder;
++
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
  import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
  import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
 +import org.eclipse.jetty.util.ssl.SslContextFactory;
 +
++import com.sun.jersey.api.container.httpserver.HttpServerFactory;
++import com.sun.jersey.api.core.PackagesResourceConfig;
++import com.sun.jersey.api.core.ResourceConfig;
++import com.sun.net.httpserver.HttpServer;
++import com.sun.net.httpserver.HttpsConfigurator;
++import com.sun.net.httpserver.HttpsServer;
+ 
  /**
   * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
   */
--public class AggregatorApplication
--{
--    private static final int STOP_SECONDS_DELAY = 0;
--    private static final int JOIN_SECONDS_TIMEOUT = 5;
--    private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
-     private static final String METRICS_SSL_SERVER_CONFIGURATION_FILE = "ssl-server.xml";
--    private Log LOG;
--    private final int webApplicationPort;
--    private final int rawPublishingInterval;
--    private final int aggregationInterval;
-     private final String webServerProtocol;
--    private Configuration configuration;
--    private Thread aggregatePublisherThread;
--    private Thread rawPublisherThread;
--    private TimelineMetricsHolder timelineMetricsHolder;
--    private HttpServer httpServer;
--
--    public AggregatorApplication(String hostname, String collectorHosts) {
--        LOG = LogFactory.getLog(this.getClass());
--        configuration = new Configuration(true);
--        initConfiguration();
--        configuration.set("timeline.metrics.collector.hosts", collectorHosts);
--        configuration.set("timeline.metrics.hostname", hostname);
--        configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration());
--        this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
--        this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
--        this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
-         this.webServerProtocol = configuration.get("timeline.metrics.host.inmemory.aggregation.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
--        this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
--        try {
--            this.httpServer = createHttpServer();
-         } catch (Exception e) {
 -        } catch (IOException e) {
--            LOG.error("Exception while starting HTTP server. Exiting", e);
--            System.exit(1);
--        }
++public class AggregatorApplication {
++  private static final int STOP_SECONDS_DELAY = 0;
++  private static final int JOIN_SECONDS_TIMEOUT = 5;
++  private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
++  private static final String METRICS_SSL_SERVER_CONFIGURATION_FILE = "ssl-server.xml";
++  private Log LOG;
++  private final int webApplicationPort;
++  private final int rawPublishingInterval;
++  private final int aggregationInterval;
++  private final String webServerProtocol;
++  private Configuration configuration;
++  private Thread aggregatePublisherThread;
++  private Thread rawPublisherThread;
++  private TimelineMetricsHolder timelineMetricsHolder;
++  private HttpServer httpServer;
++
++  public AggregatorApplication(String hostname, String collectorHosts) {
++    LOG = LogFactory.getLog(this.getClass());
++    configuration = new Configuration(true);
++    initConfiguration();
++    configuration.set("timeline.metrics.collector.hosts", collectorHosts);
++    configuration.set("timeline.metrics.hostname", hostname);
++    configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration());
++    this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
++    this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
++    this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
++    this.webServerProtocol = configuration.get("timeline.metrics.host.inmemory.aggregation.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
++    this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
++    try {
++      this.httpServer = createHttpServer();
++    } catch (Exception e) {
++      LOG.error("Exception while starting HTTP server. Exiting", e);
++      System.exit(1);
      }
--
--    private String getZkQuorumFromConfiguration() {
--        String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181");
--        String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", "");
--        return getZkConnectionUrl(zkClientPort, zkServerHosts);
++  }
++
++  private String getZkQuorumFromConfiguration() {
++    String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181");
++    String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", "");
++    return getZkConnectionUrl(zkClientPort, zkServerHosts);
++  }
++
++  protected void initConfiguration() {
++    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
++    if (classLoader == null) {
++      classLoader = getClass().getClassLoader();
      }
  
--    protected void initConfiguration() {
--        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
--        if (classLoader == null) {
--            classLoader = getClass().getClassLoader();
--        }
--
--        URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
--        LOG.info("Found metric service configuration: " + amsResUrl);
-         URL sslConfUrl = classLoader.getResource(METRICS_SSL_SERVER_CONFIGURATION_FILE);
-         LOG.info("Found metric service configuration: " + sslConfUrl);
--        if (amsResUrl == null) {
-             throw new IllegalStateException(String.format("Unable to initialize the metrics " +
-                     "subsystem. No %s present in the classpath.", METRICS_SITE_CONFIGURATION_FILE));
-         }
-         if (sslConfUrl == null) {
-             throw new IllegalStateException(String.format("Unable to initialize the metrics " +
-                     "subsystem. No %s present in the classpath.", METRICS_SSL_SERVER_CONFIGURATION_FILE));
 -            throw new IllegalStateException("Unable to initialize the metrics " +
 -                    "subsystem. No ams-site present in the classpath.");
--        }
--
--        try {
--            configuration.addResource(amsResUrl.toURI().toURL());
-             configuration.addResource(sslConfUrl.toURI().toURL());
--        } catch (Exception e) {
--            LOG.error("Couldn't init configuration. ", e);
--            System.exit(1);
--        }
++    URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
++    LOG.info("Found metric service configuration: " + amsResUrl);
++    URL sslConfUrl = classLoader.getResource(METRICS_SSL_SERVER_CONFIGURATION_FILE);
++    LOG.info("Found metric service configuration: " + sslConfUrl);
++    if (amsResUrl == null) {
++      throw new IllegalStateException(String.format("Unable to initialize the metrics " +
++        "subsystem. No %s present in the classpath.", METRICS_SITE_CONFIGURATION_FILE));
      }
  
--    protected String getHostName() {
--        String hostName = "localhost";
--        try {
--            hostName = InetAddress.getLocalHost().getCanonicalHostName();
--        } catch (UnknownHostException e) {
--            LOG.error(e);
--        }
--        return hostName;
++    if (sslConfUrl == null) {
++      throw new IllegalStateException(String.format("Unable to initialize the metrics " +
++        "subsystem. No %s present in the classpath.", METRICS_SSL_SERVER_CONFIGURATION_FILE));
      }
  
--    protected URI getURI() {
-         URI uri = UriBuilder.fromUri("/").scheme(this.webServerProtocol).host(getHostName()).port(this.webApplicationPort).build();
 -        URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
--        LOG.info(String.format("Web server at %s", uri));
--        return uri;
++    try {
++      configuration.addResource(amsResUrl.toURI().toURL());
++      configuration.addResource(sslConfUrl.toURI().toURL());
++    } catch (Exception e) {
++      LOG.error("Couldn't init configuration. ", e);
++      System.exit(1);
      }
--
-     protected HttpServer createHttpServer() throws Exception {
 -    protected HttpServer createHttpServer() throws IOException {
--        ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
--        HashMap<String, Object> params = new HashMap();
--        params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
--        resourceConfig.setPropertiesAndFeatures(params);
-         HttpServer server = HttpServerFactory.create(getURI(), resourceConfig);
- 
-         if (webServerProtocol.equalsIgnoreCase("https")) {
-             HttpsServer httpsServer = (HttpsServer) server;
-             SslContextFactory sslContextFactory = new SslContextFactory();
-             String keyStorePath = configuration.get("ssl.server.keystore.location");
-             String keyStorePassword = configuration.get("ssl.server.keystore.password");
-             String keyManagerPassword = configuration.get("ssl.server.keystore.keypassword");
-             String trustStorePath = configuration.get("ssl.server.truststore.location");
-             String trustStorePassword = configuration.get("ssl.server.truststore.password");
- 
-             sslContextFactory.setKeyStorePath(keyStorePath);
-             sslContextFactory.setKeyStorePassword(keyStorePassword);
-             sslContextFactory.setKeyManagerPassword(keyManagerPassword);
-             sslContextFactory.setTrustStorePath(trustStorePath);
-             sslContextFactory.setTrustStorePassword(trustStorePassword);
- 
-             sslContextFactory.start();
-             SSLContext sslContext = sslContextFactory.getSslContext();
-             sslContextFactory.stop();
-             HttpsConfigurator httpsConfigurator = new HttpsConfigurator(sslContext);
-             httpsServer.setHttpsConfigurator(httpsConfigurator);
-             server = httpsServer;
-         }
-         return server;
 -        return HttpServerFactory.create(getURI(), resourceConfig);
++  }
++
++  protected String getHostName() {
++    String hostName = "localhost";
++    try {
++      hostName = InetAddress.getLocalHost().getCanonicalHostName();
++    } catch (UnknownHostException e) {
++      LOG.error(e);
      }
--
--    private void startWebServer() {
--        LOG.info("Starting web server.");
--        this.httpServer.start();
++    return hostName;
++  }
++
++  protected URI getURI() {
++    URI uri = UriBuilder.fromUri("/").scheme(this.webServerProtocol).host(getHostName()).port(this.webApplicationPort).build();
++    LOG.info(String.format("Web server at %s", uri));
++    return uri;
++  }
++
++  protected HttpServer createHttpServer() throws Exception {
++      ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
++      HashMap<String, Object> params = new HashMap<>();
++      params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
++      resourceConfig.setPropertiesAndFeatures(params);
++      HttpServer server = HttpServerFactory.create(getURI(), resourceConfig);
++
++      if (webServerProtocol.equalsIgnoreCase("https")) {
++        HttpsServer httpsServer = (HttpsServer) server;
++        SslContextFactory sslContextFactory = new SslContextFactory();
++        String keyStorePath = configuration.get("ssl.server.keystore.location");
++        String keyStorePassword = configuration.get("ssl.server.keystore.password");
++        String keyManagerPassword = configuration.get("ssl.server.keystore.keypassword");
++        String trustStorePath = configuration.get("ssl.server.truststore.location");
++        String trustStorePassword = configuration.get("ssl.server.truststore.password");
++
++        sslContextFactory.setKeyStorePath(keyStorePath);
++        sslContextFactory.setKeyStorePassword(keyStorePassword);
++        sslContextFactory.setKeyManagerPassword(keyManagerPassword);
++        sslContextFactory.setTrustStorePath(trustStorePath);
++        sslContextFactory.setTrustStorePassword(trustStorePassword);
++
++        sslContextFactory.start();
++        SSLContext sslContext = sslContextFactory.getSslContext();
++        sslContextFactory.stop();
++        HttpsConfigurator httpsConfigurator = new HttpsConfigurator(sslContext);
++        httpsServer.setHttpsConfigurator(httpsConfigurator);
++        server = httpsServer;
++      }
++      return server;
      }
  
--    private void startAggregatePublisherThread() {
--        LOG.info("Starting aggregated metrics publisher.");
--        AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval);
--        aggregatePublisherThread = new Thread(metricPublisher);
--        aggregatePublisherThread.start();
--    }
++  private void startWebServer() {
++    LOG.info("Starting web server.");
++    this.httpServer.start();
++  }
++
++  private void startAggregatePublisherThread() {
++    LOG.info("Starting aggregated metrics publisher.");
++    AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval);
++    aggregatePublisherThread = new Thread(metricPublisher);
++    aggregatePublisherThread.start();
++  }
++
++  private void startRawPublisherThread() {
++    LOG.info("Starting raw metrics publisher.");
++    AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval);
++    rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher);
++    aggregatePublisherThread.start();
++  }
++
++
++  private void stop() {
++    LOG.info("Stopping aggregator application");
++    aggregatePublisherThread.interrupt();
++    rawPublisherThread.interrupt();
++    httpServer.stop(STOP_SECONDS_DELAY);
++    LOG.info("Stopped web server.");
++    try {
++      LOG.info("Waiting for threads to join.");
++      aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
++      rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
++      LOG.info("Gracefully stopped Aggregator Application.");
++    } catch (InterruptedException e) {
++      LOG.error("Received exception during stop : ", e);
  
--    private void startRawPublisherThread() {
--        LOG.info("Starting raw metrics publisher.");
--        AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval);
--        rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher);
--        aggregatePublisherThread.start();
      }
  
--
--
--    private void stop() {
--        LOG.info("Stopping aggregator application");
--        aggregatePublisherThread.interrupt();
--        rawPublisherThread.interrupt();
--        httpServer.stop(STOP_SECONDS_DELAY);
--        LOG.info("Stopped web server.");
--        try {
--            LOG.info("Waiting for threads to join.");
--            aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
--            rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
--            LOG.info("Gracefully stopped Aggregator Application.");
--        } catch (InterruptedException e) {
--            LOG.error("Received exception during stop : ", e);
--
--        }
--
++  }
++
++  private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
++    StringBuilder sb = new StringBuilder();
++    String[] quorumParts = zkQuorum.split(",");
++    String prefix = "";
++    for (String part : quorumParts) {
++      sb.append(prefix);
++      sb.append(part.trim());
++      if (!part.contains(":")) {
++        sb.append(":");
++        sb.append(zkClientPort);
++      }
++      prefix = ",";
      }
++    return sb.toString();
++  }
  
--    private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
--        StringBuilder sb = new StringBuilder();
--        String[] quorumParts = zkQuorum.split(",");
--        String prefix = "";
--        for (String part : quorumParts) {
--            sb.append(prefix);
--            sb.append(part.trim());
--            if (!part.contains(":")) {
--                sb.append(":");
--                sb.append(zkClientPort);
--            }
--            prefix = ",";
--        }
--        return sb.toString();
++  public static void main(String[] args) throws Exception {
++    if (args.length != 2) {
++      throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " +
++        "2nd - collector hosts separated with coma");
      }
  
--    public static void main( String[] args ) throws Exception {
--        if (args.length != 2) {
--            throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " +
--                    "2nd - collector hosts separated with coma");
--        }
++    final AggregatorApplication app = new AggregatorApplication(args[0], args[1]);
  
--        final AggregatorApplication app = new AggregatorApplication(args[0], args[1]);
++    app.startWebServerAndPublishersThreads();
  
--        app.startWebServerAndPublishersThreads();
++    Runtime.getRuntime().addShutdownHook(new Thread() {
++      public void run() {
++        app.stop();
++      }
++    });
++  }
  
--        Runtime.getRuntime().addShutdownHook(new Thread() {
--            public void run() {
--                app.stop();
--            }
--        });
--    }
--
--    private void startWebServerAndPublishersThreads() {
--        LOG.info("Starting aggregator application");
--        startAggregatePublisherThread();
--        startRawPublisherThread();
--        startWebServer();
--    }
++  private void startWebServerAndPublishersThreads() {
++    LOG.info("Starting aggregator application");
++    startAggregatePublisherThread();
++    startRawPublisherThread();
++    startWebServer();
++  }
  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
index b151209,b151209..a6cbc2d
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
@@@ -6,9 -6,9 +6,9 @@@
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
++ * <p>
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * <p>
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@@ -18,8 -18,8 +18,8 @@@
  package org.apache.hadoop.metrics2.host.aggregator;
  
  
--
  import com.sun.jersey.spi.resource.Singleton;
++
  import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
  
  import javax.ws.rs.Consumes;
@@@ -29,28 -29,28 +29,29 @@@ import javax.ws.rs.Path
  import javax.ws.rs.Produces;
  import javax.ws.rs.core.MediaType;
  import javax.ws.rs.core.Response;
++
  import java.io.IOException;
  
  @Singleton
  @Path("/ws/v1/timeline")
  public class AggregatorWebService {
--    TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance();
--
--    @GET
--    @Produces("text/json")
--    @Path("/metrics")
--    public Response getOkResponse() throws IOException {
--        return Response.ok().build();
--    }
--
--    @POST
--    @Produces(MediaType.TEXT_PLAIN)
--    @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
--    @Path("/metrics")
--    public Response postMetrics(
--            TimelineMetrics metrics) {
--        metricsHolder.putMetricsForAggregationPublishing(metrics);
--        metricsHolder.putMetricsForRawPublishing(metrics);
--        return Response.ok().build();
--    }
++  TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance();
++
++  @GET
++  @Produces("text/json")
++  @Path("/metrics")
++  public Response getOkResponse() throws IOException {
++    return Response.ok().build();
++  }
++
++  @POST
++  @Produces(MediaType.TEXT_PLAIN)
++  @Consumes({MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
++  @Path("/metrics")
++  public Response postMetrics(
++    TimelineMetrics metrics) {
++    metricsHolder.putMetricsForAggregationPublishing(metrics);
++    metricsHolder.putMetricsForRawPublishing(metrics);
++    return Response.ok().build();
++  }
  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
index 03b6542,03b6542..3a8ae41
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
@@@ -6,9 -6,9 +6,9 @@@
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
++ * <p>
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * <p>
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@@ -17,11 -17,11 +17,6 @@@
   */
  package org.apache.hadoop.metrics2.host.aggregator;
  
--import com.google.common.cache.Cache;
--import com.google.common.cache.CacheBuilder;
--import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
--import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
--
  import java.util.List;
  import java.util.Map;
  import java.util.TreeMap;
@@@ -29,80 -29,80 +24,86 @@@ import java.util.concurrent.TimeUnit
  import java.util.concurrent.locks.ReadWriteLock;
  import java.util.concurrent.locks.ReentrantReadWriteLock;
  
++import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
++import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
++
++import com.google.common.cache.Cache;
++import com.google.common.cache.CacheBuilder;
++
  /**
   * Singleton class with 2 guava caches for raw and aggregated metrics storing
   */
  public class TimelineMetricsHolder {
--    private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
--    private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
--    private Cache<String, TimelineMetrics> aggregationMetricsCache;
--    private Cache<String, TimelineMetrics> rawMetricsCache;
--    private static TimelineMetricsHolder instance = null;
--    //to ensure no metric values are expired
--    private static int EXPIRE_DELAY = 30;
--    ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock();
--    ReadWriteLock rawCacheLock = new ReentrantReadWriteLock();
++  private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
++  private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
++  private Cache<String, TimelineMetrics> aggregationMetricsCache;
++  private Cache<String, TimelineMetrics> rawMetricsCache;
++  private static TimelineMetricsHolder instance = null;
++  //to ensure no metric values are expired
++  private static int EXPIRE_DELAY = 30;
++  ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock();
++  ReadWriteLock rawCacheLock = new ReentrantReadWriteLock();
  
--    private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) {
--        this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
--        this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
--    }
++  private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) {
++    this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
++    this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
++  }
  
--    public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) {
--        if (instance == null) {
--            instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime);
--        }
--        return instance;
++  public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) {
++    if (instance == null) {
++      instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime);
      }
++    return instance;
++  }
  
--    /**
--     * Uses default expiration time for caches initialization if they are not initialized yet.
--     * @return
--     */
--    public static TimelineMetricsHolder getInstance() {
--        return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME);
--    }
++  /**
++   * Uses default expiration time for caches initialization if they are not initialized yet.
++   * @return
++   */
++  public static TimelineMetricsHolder getInstance() {
++    return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME);
++  }
  
--    public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
--        aggregationCacheLock.writeLock().lock();
--        aggregationMetricsCache.put(calculateCacheKey(timelineMetrics), timelineMetrics);
--        aggregationCacheLock.writeLock().unlock();
--    }
++  public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
++    aggregationCacheLock.writeLock().lock();
++    aggregationMetricsCache.put(calculateCacheKey(timelineMetrics), timelineMetrics);
++    aggregationCacheLock.writeLock().unlock();
++  }
  
--    private String calculateCacheKey(TimelineMetrics timelineMetrics) {
--        List<TimelineMetric>  metrics =  timelineMetrics.getMetrics();
--        if (metrics.size() > 0) {
--            return  metrics.get(0).getAppId() + System.currentTimeMillis();
--        }
--        return String.valueOf(System.currentTimeMillis());
++  private String calculateCacheKey(TimelineMetrics timelineMetrics) {
++    List<TimelineMetric> metrics = timelineMetrics.getMetrics();
++    if (metrics.size() > 0) {
++      return metrics.get(0).getAppId() + System.currentTimeMillis();
      }
++    return String.valueOf(System.currentTimeMillis());
++  }
  
--    public Map<String, TimelineMetrics> extractMetricsForAggregationPublishing() {
--        return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
--    }
++  public Map<String, TimelineMetrics> extractMetricsForAggregationPublishing() {
++    return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
++  }
  
--    public void putMetricsForRawPublishing(TimelineMetrics metrics) {
--        rawCacheLock.writeLock().lock();
--        rawMetricsCache.put(calculateCacheKey(metrics), metrics);
--        rawCacheLock.writeLock().unlock();
--    }
++  public void putMetricsForRawPublishing(TimelineMetrics metrics) {
++    rawCacheLock.writeLock().lock();
++    rawMetricsCache.put(calculateCacheKey(metrics), metrics);
++    rawCacheLock.writeLock().unlock();
++  }
  
--    public Map<String, TimelineMetrics> extractMetricsForRawPublishing() {
--        return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
--    }
++  public Map<String, TimelineMetrics> extractMetricsForRawPublishing() {
++    return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
++  }
  
--    /**
--     * Returns values from cache and clears the cache
--     * @param cache
--     * @param lock
--     * @return
--     */
--    private Map<String, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<String, TimelineMetrics> cache, ReadWriteLock lock) {
--        lock.writeLock().lock();
--        Map<String, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
--        cache.invalidateAll();
--        lock.writeLock().unlock();
--        return metricsMap;
--    }
++  /**
++   * Returns values from cache and clears the cache
++   * @param cache
++   * @param lock
++   * @return
++   */
++  private Map<String, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<String, TimelineMetrics> cache, ReadWriteLock lock) {
++    lock.writeLock().lock();
++    Map<String, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
++    cache.invalidateAll();
++    lock.writeLock().unlock();
++    return metricsMap;
++  }
  
  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
index 7ce0815,5af115f..8211476
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
@@@ -6,9 -6,9 +6,9 @@@
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
++ * <p>
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * <p>
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@@ -17,153 -17,153 +17,152 @@@
   */
  package org.apache.hadoop.metrics2.sink.timeline;
  
++import java.util.Collection;
++import java.util.Map;
++
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
  
--import java.util.Collection;
--import java.util.Map;
--
  /**
   * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
   */
  public abstract class AbstractMetricPublisher extends AbstractTimelineMetricsSink implements Runnable {
--
-     private static final String AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
-     private static final String AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY = "ssl.server.truststore.type";
-     private static final String AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.password";
 -    private static final String AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
 -    private static final String AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY = "ssl.server.truststore.password";
 -    private static final String AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.type";
--    private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy";
--    private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address";
--    private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts";
--    private static final String PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY = "timeline.metrics.zk.quorum";
--    private static final String PUBLISHER_HOSTNAME_PROPERTY = "timeline.metrics.hostname";
--    protected static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
--    protected int publishIntervalInSeconds;
--    private Log LOG;
--    protected TimelineMetricsHolder timelineMetricsHolder;
--    protected Configuration configuration;
--    private String collectorProtocol;
--    private String collectorPort;
--    private Collection<String> collectorHosts;
--    private String hostname;
--    private String zkQuorum;
--
--    public AbstractMetricPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int publishIntervalInSeconds) {
--        LOG = LogFactory.getLog(this.getClass());
--        this.configuration = configuration;
--        this.publishIntervalInSeconds = publishIntervalInSeconds;
--        this.timelineMetricsHolder = timelineMetricsHolder;
--        configure();
--    }
--
--    protected void configure() {
--        collectorProtocol = configuration.get(AMS_SITE_HTTP_POLICY_PROPERTY, "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
--        collectorPort = configuration.getTrimmed(AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY, "0.0.0.0:6188").split(":")[1];
--        collectorHosts = parseHostsStringIntoCollection(configuration.getTrimmed(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
--        zkQuorum = configuration.get(PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY, "");
--        hostname = configuration.get(PUBLISHER_HOSTNAME_PROPERTY, "localhost");
--        collectorHosts = parseHostsStringIntoCollection(configuration.get(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
--        if (collectorHosts.isEmpty()) {
--            LOG.error("No Metric collector configured.");
--        } else {
--            if (collectorProtocol.contains("https")) {
-                 String trustStorePath = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY).trim();
-                 String trustStoreType = configuration.get(AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY).trim();
-                 String trustStorePwd = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY).trim();
 -                String trustStorePath = configuration.get(AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY).trim();
 -                String trustStoreType = configuration.get(AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY).trim();
 -                String trustStorePwd = configuration.get(AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
--                loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
--            }
--        }
--    }
--
--    /**
--     * Publishes metrics to collector in specified intervals while not interrupted.
--     */
--    @Override
--    public void run() {
--        while (!Thread.currentThread().isInterrupted()) {
--            try {
--                Thread.sleep(this.publishIntervalInSeconds * 1000);
--            } catch (InterruptedException e) {
--                //Ignore
--            }
--            try {
--                processAndPublishMetrics(getMetricsFromCache());
--            } catch (Exception e) {
--                //ignore
--            }
--        }
--    }
--
--    /**
--     * Processes and sends metrics to collector.
--     * @param metricsFromCache
--     * @throws Exception
--     */
--    protected void processAndPublishMetrics(Map<String, TimelineMetrics> metricsFromCache) throws Exception {
--        if (metricsFromCache.size()==0) return;
--
--        LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
--        emitMetricsJson(getCollectorUri(getCurrentCollectorHost()), processMetrics(metricsFromCache));
--    }
--
--    /**
--     * Returns metrics map. Source is based on implementation.
--     * @return
--     */
--    protected abstract Map<String,TimelineMetrics> getMetricsFromCache();
--
--    /**
--     * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
--     * @param metricValues
--     * @return
--     */
--    protected abstract String processMetrics(Map<String, TimelineMetrics> metricValues);
--
--    protected abstract String getPostUrl();
--
--    @Override
--    protected String getCollectorUri(String host) {
--        return String.format(getPostUrl(), getCollectorProtocol(), host, getCollectorPort());
++  private static final String AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
++  private static final String AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY = "ssl.server.truststore.type";
++  private static final String AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.password";
++  private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy";
++  private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address";
++  private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts";
++  private static final String PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY = "timeline.metrics.zk.quorum";
++  private static final String PUBLISHER_HOSTNAME_PROPERTY = "timeline.metrics.hostname";
++  protected static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
++  protected int publishIntervalInSeconds;
++  private Log LOG;
++  protected TimelineMetricsHolder timelineMetricsHolder;
++  protected Configuration configuration;
++  private String collectorProtocol;
++  private String collectorPort;
++  private Collection<String> collectorHosts;
++  private String hostname;
++  private String zkQuorum;
++
++  public AbstractMetricPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int publishIntervalInSeconds) {
++    LOG = LogFactory.getLog(this.getClass());
++    this.configuration = configuration;
++    this.publishIntervalInSeconds = publishIntervalInSeconds;
++    this.timelineMetricsHolder = timelineMetricsHolder;
++    configure();
++  }
++
++  protected void configure() {
++    collectorProtocol = configuration.get(AMS_SITE_HTTP_POLICY_PROPERTY, "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
++    collectorPort = configuration.getTrimmed(AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY, "0.0.0.0:6188").split(":")[1];
++    collectorHosts = parseHostsStringIntoCollection(configuration.getTrimmed(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
++    zkQuorum = configuration.get(PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY, "");
++    hostname = configuration.get(PUBLISHER_HOSTNAME_PROPERTY, "localhost");
++    collectorHosts = parseHostsStringIntoCollection(configuration.get(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
++    if (collectorHosts.isEmpty()) {
++      LOG.error("No Metric collector configured.");
++    } else {
++      if (collectorProtocol.contains("https")) {
++        String trustStorePath = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY).trim();
++        String trustStoreType = configuration.get(AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY).trim();
++        String trustStorePwd = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY).trim();
++        loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
++      }
      }
--
--    @Override
--    protected String getCollectorProtocol() {
--        return collectorProtocol;
--    }
--
--    @Override
--    protected String getCollectorPort() {
--        return collectorPort;
--    }
--
--    @Override
--    protected int getTimeoutSeconds() {
--        return DEFAULT_POST_TIMEOUT_SECONDS;
--    }
--
--    @Override
--    protected String getZookeeperQuorum() {
--        return zkQuorum;
--    }
--
--    @Override
--    protected Collection<String> getConfiguredCollectorHosts() {
--        return collectorHosts;
--    }
--
--    @Override
--    protected String getHostname() {
--        return hostname;
--    }
--
--    @Override
--    protected boolean isHostInMemoryAggregationEnabled() {
--        return false;
--    }
--
--    @Override
--    protected int getHostInMemoryAggregationPort() {
--        return 0;
++  }
++
++  /**
++   * Publishes metrics to collector in specified intervals while not interrupted.
++   */
++  @Override
++  public void run() {
++    while (!Thread.currentThread().isInterrupted()) {
++      try {
++        Thread.sleep(this.publishIntervalInSeconds * 1000);
++      } catch (InterruptedException e) {
++        //Ignore
++      }
++      try {
++        processAndPublishMetrics(getMetricsFromCache());
++      } catch (Exception e) {
++        //ignore
++      }
      }
++  }
++
++  /**
++   * Processes and sends metrics to collector.
++   * @param metricsFromCache
++   * @throws Exception
++   */
++  protected void processAndPublishMetrics(Map<String, TimelineMetrics> metricsFromCache) throws Exception {
++    if (metricsFromCache.size() == 0) return;
++
++    LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
++    emitMetricsJson(getCollectorUri(getCurrentCollectorHost()), processMetrics(metricsFromCache));
++  }
++
++  /**
++   * Returns metrics map. Source is based on implementation.
++   * @return
++   */
++  protected abstract Map<String, TimelineMetrics> getMetricsFromCache();
++
++  /**
++   * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
++   * @param metricValues
++   * @return
++   */
++  protected abstract String processMetrics(Map<String, TimelineMetrics> metricValues);
++
++  protected abstract String getPostUrl();
++
++  @Override
++  protected String getCollectorUri(String host) {
++    return String.format(getPostUrl(), getCollectorProtocol(), host, getCollectorPort());
++  }
++
++  @Override
++  protected String getCollectorProtocol() {
++    return collectorProtocol;
++  }
++
++  @Override
++  protected String getCollectorPort() {
++    return collectorPort;
++  }
++
++  @Override
++  protected int getTimeoutSeconds() {
++    return DEFAULT_POST_TIMEOUT_SECONDS;
++  }
++
++  @Override
++  protected String getZookeeperQuorum() {
++    return zkQuorum;
++  }
++
++  @Override
++  protected Collection<String> getConfiguredCollectorHosts() {
++    return collectorHosts;
++  }
++
++  @Override
++  protected String getHostname() {
++    return hostname;
++  }
++
++  @Override
++  protected boolean isHostInMemoryAggregationEnabled() {
++    return false;
++  }
++
++  @Override
++  protected int getHostInMemoryAggregationPort() {
++    return 0;
++  }
  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
index fa0c8fb,c8dffab..f1ed90b
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
@@@ -6,9 -6,9 +6,9 @@@
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
++ * <p>
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * <p>
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@@ -18,91 -18,86 +18,90 @@@
  package org.apache.hadoop.metrics2.sink.timeline;
  
  
--
--import org.apache.commons.logging.Log;
--import org.apache.commons.logging.LogFactory;
--import org.apache.hadoop.conf.Configuration;
--import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
--
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Map;
  import java.util.Set;
  import java.util.TreeMap;
  
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
++
  /**
   * Thread that aggregates and publishes metrics to collector on specified interval.
   */
  public class AggregatedMetricsPublisher extends AbstractMetricPublisher {
--    private static String AGGREGATED_POST_PREFIX = "/aggregated";
--    private Log LOG;
++  private static String AGGREGATED_POST_PREFIX = "/aggregated";
++  private Log LOG;
  
--    public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
--        super(timelineMetricsHolder, configuration, interval);
--        LOG = LogFactory.getLog(this.getClass());
--    }
++  public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
++    super(timelineMetricsHolder, configuration, interval);
++    LOG = LogFactory.getLog(this.getClass());
++  }
  
--    /**
--     * get metrics map form @TimelineMetricsHolder
--     * @return
--     */
--    @Override
--    protected Map<String, TimelineMetrics> getMetricsFromCache() {
--        return timelineMetricsHolder.extractMetricsForAggregationPublishing();
--    }
++  /**
++   * get metrics map form @TimelineMetricsHolder
++   * @return
++   */
++  @Override
++  protected Map<String, TimelineMetrics> getMetricsFromCache() {
++    return timelineMetricsHolder.extractMetricsForAggregationPublishing();
++  }
  
--    /**
--     * Aggregates given metrics and converts them into json string that will be send to collector
--     * @param metricForAggregationValues
--     * @return
--     */
--    @Override
--    protected String processMetrics(Map<String, TimelineMetrics> metricForAggregationValues) {
--        HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
--        for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
--            for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
--                if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
--                    nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
--                }
--                nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
--            }
++  /**
++   * Aggregates given metrics and converts them into json string that will be send to collector
++   * @param metricForAggregationValues
++   * @return
++   */
++  @Override
++  protected String processMetrics(Map<String, TimelineMetrics> metricForAggregationValues) {
++    HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
++    for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
++      for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
++        if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
++          nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
          }
--        Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
--        for (TimelineMetrics metrics : nameToMetricMap.values()) {
--            double sum = 0;
--            double max = Integer.MIN_VALUE;
--            double min = Integer.MAX_VALUE;
--            int count = 0;
--            for (TimelineMetric metric : metrics.getMetrics()) {
--                for (Double value : metric.getMetricValues().values()) {
--                    sum+=value;
--                    max = Math.max(max, value);
--                    min = Math.min(min, value);
--                    count++;
--                }
--            }
--            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
--            tmpMetric.setMetricValues(new TreeMap<Long, Double>());
--            metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
--        }
--        String json = null;
--        try {
--            json = mapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
--            LOG.debug(json);
--        } catch (Exception e) {
--            LOG.error("Failed to convert result into json", e);
++        nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
++      }
++    }
++    Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
++    for (TimelineMetrics metrics : nameToMetricMap.values()) {
++      double sum = 0;
++      double max = Integer.MIN_VALUE;
++      double min = Integer.MAX_VALUE;
++      int count = 0;
++      for (TimelineMetric metric : metrics.getMetrics()) {
++        for (Double value : metric.getMetricValues().values()) {
++          sum += value;
++          max = Math.max(max, value);
++          min = Math.min(min, value);
++          count++;
          }
--
--        return json;
++      }
++      TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
++      tmpMetric.setMetricValues(new TreeMap<Long, Double>());
++      metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
      }
--
--    @Override
--    protected String getPostUrl() {
--        return BASE_POST_URL + AGGREGATED_POST_PREFIX;
++    String json = null;
++    try {
++      json = mapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
++      LOG.debug(json);
++    } catch (Exception e) {
++      LOG.error("Failed to convert result into json", e);
      }
 +
-     @Override
-     protected String getHostInMemoryAggregationProtocol() {
-         return "http";
-     }
++    return json;
++  }
++
++  @Override
++  protected String getPostUrl() {
++    return BASE_POST_URL + AGGREGATED_POST_PREFIX;
++  }
++
++  @Override
++  protected String getHostInMemoryAggregationProtocol() {
++    return "http";
++  }
  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
index 2469449,89addb7..74b841b
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
@@@ -6,9 -6,9 +6,9 @@@
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
++ * <p>
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * <p>
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@@ -18,53 -18,48 +18,53 @@@
  package org.apache.hadoop.metrics2.sink.timeline;
  
  
++import java.util.Map;
++
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
  
--import java.util.Map;
--
  public class RawMetricsPublisher extends AbstractMetricPublisher {
--    private final Log LOG;
++  private final Log LOG;
  
--    public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
--        super(timelineMetricsHolder, configuration, interval);
--        LOG = LogFactory.getLog(this.getClass());
--    }
++  public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
++    super(timelineMetricsHolder, configuration, interval);
++    LOG = LogFactory.getLog(this.getClass());
++  }
  
  
--    @Override
--    protected Map<String, TimelineMetrics> getMetricsFromCache() {
--        return timelineMetricsHolder.extractMetricsForRawPublishing();
--    }
++  @Override
++  protected Map<String, TimelineMetrics> getMetricsFromCache() {
++    return timelineMetricsHolder.extractMetricsForRawPublishing();
++  }
  
--    @Override
--    protected String processMetrics(Map<String, TimelineMetrics> metricValues) {
--        //merge everything in one TimelineMetrics object
--        TimelineMetrics timelineMetrics = new TimelineMetrics();
--        for (TimelineMetrics metrics : metricValues.values()) {
--            for (TimelineMetric timelineMetric : metrics.getMetrics())
--                timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
--        }
--        //map TimelineMetrics to json string
--        String json = null;
--        try {
--            json = mapper.writeValueAsString(timelineMetrics);
--            LOG.debug(json);
--        } catch (Exception e) {
--            LOG.error("Failed to convert result into json", e);
--        }
--        return json;
++  @Override
++  protected String processMetrics(Map<String, TimelineMetrics> metricValues) {
++    //merge everything in one TimelineMetrics object
++    TimelineMetrics timelineMetrics = new TimelineMetrics();
++    for (TimelineMetrics metrics : metricValues.values()) {
++      for (TimelineMetric timelineMetric : metrics.getMetrics())
++        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
      }
--
--    @Override
--    protected String getPostUrl() {
--        return BASE_POST_URL;
++    //map TimelineMetrics to json string
++    String json = null;
++    try {
++      json = mapper.writeValueAsString(timelineMetrics);
++      LOG.debug(json);
++    } catch (Exception e) {
++      LOG.error("Failed to convert result into json", e);
      }
++    return json;
++  }
 +
-     @Override
-     protected String getHostInMemoryAggregationProtocol() {
-         return "http";
-     }
++  @Override
++  protected String getPostUrl() {
++    return BASE_POST_URL;
++  }
++
++  @Override
++  protected String getHostInMemoryAggregationProtocol() {
++    return "http";
++  }
  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
index 8c17ba1,3413052..cacc98b
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
@@@ -6,9 -6,9 +6,9 @@@
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
++ * <p>
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * <p>
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@@ -17,138 -17,138 +17,138 @@@
   */
  package org.apache.hadoop.metrics2.sink.timeline;
  
--import junit.framework.Assert;
--import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
--import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
--import org.junit.Test;
--
--import org.apache.hadoop.conf.Configuration;
--
  import java.util.Collection;
  import java.util.List;
  import java.util.Map;
  import java.util.TreeMap;
  
--public class AggregatedMetricsPublisherTest {
--
--    @Test
--    public void testProcessMetrics() throws Exception {
--        Configuration configuration = new Configuration();
--        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
--        timelineMetricsHolder.extractMetricsForAggregationPublishing();
--        timelineMetricsHolder.extractMetricsForRawPublishing();
--
--        TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
--        metric1App1Metrics.put(1L, 1d);
--        metric1App1Metrics.put(2L, 2d);
--        metric1App1Metrics.put(3L, 3d);
--        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
--
--        TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
--        metric2App2Metrics.put(1L, 4d);
--        metric2App2Metrics.put(2L, 5d);
--        metric2App2Metrics.put(3L, 6d);
--        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
--
--        TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
--        metric3App3Metrics.put(1L, 7d);
--        metric3App3Metrics.put(2L, 8d);
--        metric3App3Metrics.put(3L, 9d);
--
--        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
--
--
--        AggregatedMetricsPublisher aggregatedMetricsPublisher =
--                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
--
--        String aggregatedJson = aggregatedMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForAggregationPublishing());
-         String expectedMetric1App1 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}";
-         String expectedMetric2App2 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}";
-         String expectedMetric3App3 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}";
 -        String expectedMetric1App1 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}";
 -        String expectedMetric2App2 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}";
 -        String expectedMetric3App3 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}";
--        Assert.assertNotNull(aggregatedJson);
--        Assert.assertTrue(aggregatedJson.contains(expectedMetric1App1));
--        Assert.assertTrue(aggregatedJson.contains(expectedMetric3App3));
--        Assert.assertTrue(aggregatedJson.contains(expectedMetric2App2));
--    }
--
--    @Test
--    public void testGetPostUrl() {
--        Configuration configuration = new Configuration();
--        AggregatedMetricsPublisher aggregatedMetricsPublisher =
--                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
--        String actualURL = aggregatedMetricsPublisher.getPostUrl();
--        String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics/aggregated";
--        Assert.assertNotNull(actualURL);
--        Assert.assertEquals(expectedURL, actualURL);
--    }
--
--    @Test
--    public void testGetCollectorUri() {
--        //default configuration
--        Configuration configuration = new Configuration();
--        AbstractMetricPublisher aggregatedMetricsPublisher =
--                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
--        String actualURL = aggregatedMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
--        String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
--        Assert.assertNotNull(actualURL);
--        Assert.assertEquals(expectedURL, actualURL);
--
--        //https configuration
--        configuration = new Configuration();
--        configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
--        aggregatedMetricsPublisher =
--                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
--        actualURL = aggregatedMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
--        expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
--        Assert.assertNotNull(actualURL);
--        Assert.assertEquals(expectedURL, actualURL);
--
--        //custom port configuration
--        configuration = new Configuration();
--        configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
--        aggregatedMetricsPublisher =
--                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
--        actualURL = aggregatedMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
--        expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics/aggregated";
--        Assert.assertNotNull(actualURL);
--        Assert.assertEquals(expectedURL, actualURL);
--    }
--
--    @Test
--    public void testGetMetricsFromCache() throws InterruptedException {
--        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
--        timelineMetricsHolder.extractMetricsForAggregationPublishing();
--        timelineMetricsHolder.extractMetricsForRawPublishing();
--
--        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr1"));
--        timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
--        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr2"));
--
--        Configuration configuration = new Configuration();
--        AggregatedMetricsPublisher aggregatedMetricsPublisher =
--                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
++import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
++import org.junit.Test;
  
--        Map<String, TimelineMetrics> metricsFromCache = aggregatedMetricsPublisher.getMetricsFromCache();
--        Assert.assertNotNull(metricsFromCache);
--        Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
--        Assert.assertNotNull(actualTimelineMetrics);
--        Assert.assertEquals(2, actualTimelineMetrics.size());
++import junit.framework.Assert;
  
--        for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
--            List<TimelineMetric> metrics = timelineMetrics.getMetrics();
--            Assert.assertEquals(1, metrics.size());
--            Assert.assertTrue(metrics.get(0).getAppId().contains("aggr"));
--        }
++public class AggregatedMetricsPublisherTest {
  
++  @Test
++  public void testProcessMetrics() throws Exception {
++    Configuration configuration = new Configuration();
++    TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
++    timelineMetricsHolder.extractMetricsForAggregationPublishing();
++    timelineMetricsHolder.extractMetricsForRawPublishing();
++
++    TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
++    metric1App1Metrics.put(1L, 1d);
++    metric1App1Metrics.put(2L, 2d);
++    metric1App1Metrics.put(3L, 3d);
++    timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
++
++    TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
++    metric2App2Metrics.put(1L, 4d);
++    metric2App2Metrics.put(2L, 5d);
++    metric2App2Metrics.put(3L, 6d);
++    timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
++
++    TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
++    metric3App3Metrics.put(1L, 7d);
++    metric3App3Metrics.put(2L, 8d);
++    metric3App3Metrics.put(3L, 9d);
++
++    timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
++
++
++    AggregatedMetricsPublisher aggregatedMetricsPublisher =
++      new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
++
++    String aggregatedJson = aggregatedMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForAggregationPublishing());
++    String expectedMetric1App1 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}";
++    String expectedMetric2App2 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}";
++    String expectedMetric3App3 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}";
++    Assert.assertNotNull(aggregatedJson);
++    Assert.assertTrue(aggregatedJson.contains(expectedMetric1App1));
++    Assert.assertTrue(aggregatedJson.contains(expectedMetric3App3));
++    Assert.assertTrue(aggregatedJson.contains(expectedMetric2App2));
++  }
++
++  @Test
++  public void testGetPostUrl() {
++    Configuration configuration = new Configuration();
++    AggregatedMetricsPublisher aggregatedMetricsPublisher =
++      new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
++    String actualURL = aggregatedMetricsPublisher.getPostUrl();
++    String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics/aggregated";
++    Assert.assertNotNull(actualURL);
++    Assert.assertEquals(expectedURL, actualURL);
++  }
++
++  @Test
++  public void testGetCollectorUri() {
++    //default configuration
++    Configuration configuration = new Configuration();
++    AbstractMetricPublisher aggregatedMetricsPublisher =
++      new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
++    String actualURL = aggregatedMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
++    String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
++    Assert.assertNotNull(actualURL);
++    Assert.assertEquals(expectedURL, actualURL);
++
++    //https configuration
++    configuration = new Configuration();
++    configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
++    aggregatedMetricsPublisher =
++      new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
++    actualURL = aggregatedMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
++    expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
++    Assert.assertNotNull(actualURL);
++    Assert.assertEquals(expectedURL, actualURL);
++
++    //custom port configuration
++    configuration = new Configuration();
++    configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
++    aggregatedMetricsPublisher =
++      new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
++    actualURL = aggregatedMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
++    expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics/aggregated";
++    Assert.assertNotNull(actualURL);
++    Assert.assertEquals(expectedURL, actualURL);
++  }
++
++  @Test
++  public void testGetMetricsFromCache() throws InterruptedException {
++    TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4, 4);
++    timelineMetricsHolder.extractMetricsForAggregationPublishing();
++    timelineMetricsHolder.extractMetricsForRawPublishing();
++
++    timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr1"));
++    timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
++    timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr2"));
++
++    Configuration configuration = new Configuration();
++    AggregatedMetricsPublisher aggregatedMetricsPublisher =
++      new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
++
++    Map<String, TimelineMetrics> metricsFromCache = aggregatedMetricsPublisher.getMetricsFromCache();
++    Assert.assertNotNull(metricsFromCache);
++    Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
++    Assert.assertNotNull(actualTimelineMetrics);
++    Assert.assertEquals(2, actualTimelineMetrics.size());
++
++    for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
++      List<TimelineMetric> metrics = timelineMetrics.getMetrics();
++      Assert.assertEquals(1, metrics.size());
++      Assert.assertTrue(metrics.get(0).getAppId().contains("aggr"));
      }
  
--    TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
--        TimelineMetric timelineMetric = new TimelineMetric();
--        timelineMetric.setMetricName(metricName);
--        timelineMetric.setAppId(appId);
--        timelineMetric.setMetricValues(metricValues);
--        TimelineMetrics timelineMetrics = new TimelineMetrics();
--        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
--        return timelineMetrics;
--    }
++  }
++
++  TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
++    TimelineMetric timelineMetric = new TimelineMetric();
++    timelineMetric.setMetricName(metricName);
++    timelineMetric.setAppId(appId);
++    timelineMetric.setMetricValues(metricValues);
++    TimelineMetrics timelineMetrics = new TimelineMetrics();
++    timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
++    return timelineMetrics;
++  }
  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
index b43a87c,60510d2..252f7d4
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
@@@ -62,7 -62,7 +62,11 @@@ public class RawMetricsPublisherTest 
                  new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
  
          String rawJson = rawMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForRawPublishing());
++<<<<<<< HEAD
 +        String expectedResult = "{\"metrics\":[{\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}";
++=======
+         String expectedResult = "{\"metrics\":[{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}";
++>>>>>>> trunk
          Assert.assertNotNull(rawJson);
          Assert.assertEquals(expectedResult, rawJson);
      }

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index 371907d,f19434d..df79d69
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@@ -105,15 -111,22 +115,24 @@@ class Emitter(threading.Thread)
      if not success:
        logger.info('No valid collectors found...')
        for collector_host in self.active_collector_hosts:
 -        success = self.try_with_collector_host(collector_host, data)
 +        success = self.try_with_collector(self.collector_protocol, collector_host, self.ollector_port, data)
 +        if success:
 +          break
        pass
  
 -  def try_with_collector_host(self, collector_host, data):
 +  def try_with_collector(self, collector_protocol, collector_host, collector_port, data):
      headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
 -    connection = self.get_connection(collector_host)
 +    connection = self.get_connection(collector_protocol, collector_host, collector_port)
      logger.debug("message to send: %s" % data)
+ 
+     try:
+       if self.cookie_cached[connection.host]:
+         headers["Cookie"] = self.cookie_cached[connection.host]
+         logger.debug("Cookie: %s" % self.cookie_cached[connection.host])
+     except Exception, e:
+       self.cookie_cached = {}
+     pass
+ 
      retry_count = 0
      while retry_count < self.MAX_RETRY_COUNT:
        response = self.get_response_from_submission(connection, data, headers)

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------


Mime
View raw message