Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C7AE4200CA7 for ; Tue, 30 May 2017 20:29:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C66D8160BB1; Tue, 30 May 2017 18:29:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 00ADF160BE4 for ; Tue, 30 May 2017 20:29:27 +0200 (CEST) Received: (qmail 12565 invoked by uid 500); 30 May 2017 18:29:27 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 9525 invoked by uid 99); 30 May 2017 18:29:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 May 2017 18:29:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 71A6AE35ED; Tue, 30 May 2017 18:29:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rlevas@apache.org To: commits@ambari.apache.org Date: Tue, 30 May 2017 18:30:10 -0000 Message-Id: <61fe83dc9524448eac5a00ee55341a91@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [48/50] [abbrv] ambari git commit: AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen) archived-at: Tue, 30 May 2017 18:29:30 -0000 http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py index c0feed5..e5da9ba 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py @@ -27,6 +27,9 @@ from event_definition import HostMetricCollectEvent, ProcessMetricCollectEvent from metric_collector import MetricsCollector from emitter import Emitter from host_info import HostInfo +from aggregator import Aggregator +from aggregator import AggregatorWatchdog + logger = logging.getLogger() @@ -50,11 +53,15 @@ class Controller(threading.Thread): self.initialize_events_cache() self.emitter = Emitter(self.config, self.application_metric_map, stop_handler) self._t = None + self.aggregator = None + self.aggregator_watchdog = None def run(self): logger.info('Running Controller thread: %s' % threading.currentThread().getName()) self.start_emitter() + if self.config.is_inmemory_aggregation_enabled(): + self.start_aggregator_with_watchdog() # Wake every 5 seconds to push events to the queue while True: @@ -62,6 +69,10 @@ class Controller(threading.Thread): logger.warn('Event Queue full!! Suspending further collections.') else: self.enqueque_events() + # restart aggregator if needed + if self.config.is_inmemory_aggregation_enabled() and not self.aggregator_watchdog.is_ok(): + logger.warning("Aggregator is not available. Restarting aggregator.") + self.start_aggregator_with_watchdog() pass # Wait for the service stop event instead of sleeping blindly if 0 == self._stop_handler.wait(self.sleep_interval): @@ -75,6 +86,12 @@ class Controller(threading.Thread): # The emitter thread should have stopped by now, just ensure it has shut # down properly self.emitter.join(5) + + if self.config.is_inmemory_aggregation_enabled(): + self.aggregator.stop() + self.aggregator_watchdog.stop() + self.aggregator.join(5) + self.aggregator_watchdog.join(5) pass # TODO: Optimize to not use Timer class and use the Queue instead @@ -115,3 +132,14 @@ class Controller(threading.Thread): def start_emitter(self): self.emitter.start() + + # Start aggregator and watcher threads + def start_aggregator_with_watchdog(self): + if self.aggregator: + self.aggregator.stop() + if self.aggregator_watchdog: + self.aggregator.stop() + self.aggregator = Aggregator(self.config, self._stop_handler) + self.aggregator_watchdog = AggregatorWatchdog(self.config, self._stop_handler) + self.aggregator.start() + self.aggregator_watchdog.start() http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py index e2a7f0d..77b8c23 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py @@ -44,10 +44,16 @@ class Emitter(threading.Thread): self._stop_handler = stop_handler self.application_metric_map = application_metric_map self.collector_port = config.get_server_port() - self.all_metrics_collector_hosts = config.get_metrics_collector_hosts() + self.all_metrics_collector_hosts = config.get_metrics_collector_hosts_as_list() self.is_server_https_enabled = config.is_server_https_enabled() self.set_instanceid = config.is_set_instanceid() self.instanceid = config.get_instanceid() + self.is_inmemory_aggregation_enabled = config.is_inmemory_aggregation_enabled() + + if self.is_inmemory_aggregation_enabled: + self.collector_port = config.get_inmemory_aggregation_port() + self.all_metrics_collector_hosts = ['localhost'] + self.is_server_https_enabled = False if self.is_server_https_enabled: self.ca_certs = config.get_ca_certs() http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py index bfb6957..7a9fbec 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py @@ -117,7 +117,8 @@ class StopHandlerLinux(StopHandler): def wait(self, timeout=None): # Stop process when stop event received - if self.stop_event.wait(timeout): + self.stop_event.wait(timeout) + if self.stop_event.isSet(): logger.info("Stop event received") return 0 # Timeout http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py index d218015..53d27f8 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py @@ -21,7 +21,7 @@ limitations under the License. import logging import os import sys - +import signal from ambari_commons.os_utils import remove_file from core.controller import Controller @@ -73,6 +73,10 @@ def server_process_main(stop_handler, scmStatus=None): if scmStatus is not None: scmStatus.reportStarted() + # For some reason this is needed to catch system signals like SIGTERM + # TODO fix if possible + signal.pause() + #The controller thread finishes when the stop event is signaled controller.join() http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index 211e9cd..76b1c15 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -72,6 +72,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY; private static final String TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + INSTANCE_ID_PROPERTY; private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY; + private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY; + private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY; private static final String TIMELINE_DEFAULT_HOST = "localhost"; private static final String TIMELINE_DEFAULT_PORT = "6188"; private static final String TIMELINE_DEFAULT_PROTOCOL = "http"; @@ -96,6 +98,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private String[] includedMetricsPrefixes; // Local cache to avoid prefix matching everytime private Set excludedMetrics = new HashSet<>(); + private boolean hostInMemoryAggregationEnabled; + private int hostInMemoryAggregationPort; @Override protected String getCollectorUri(String host) { @@ -132,6 +136,17 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink return hostname; } + + @Override + protected boolean isHostInMemoryAggregationEnabled() { + return hostInMemoryAggregationEnabled; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return hostInMemoryAggregationPort; + } + public void setMetricsCache(TimelineMetricsCache metricsCache) { this.metricsCache = metricsCache; } @@ -169,6 +184,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY); setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY); + hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false); + hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888); setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval)); if (metricCollectorProtocol.contains("https")) { http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java index 08f0598..24b2c8b 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java @@ -55,6 +55,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink private NimbusClient nimbusClient; private String applicationId; private int timeoutSeconds; + private boolean hostInMemoryAggregationEnabled; + private int hostInMemoryAggregationPort; public StormTimelineMetricsReporter() { @@ -96,6 +98,16 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink } @Override + protected boolean isHostInMemoryAggregationEnabled() { + return hostInMemoryAggregationEnabled; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return hostInMemoryAggregationPort; + } + + @Override public void prepare(Map conf) { LOG.info("Preparing Storm Metrics Reporter"); try { @@ -130,6 +142,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink applicationId = cf.get(APP_ID).toString(); setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString()); instanceId = cf.get(INSTANCE_ID_PROPERTY).toString(); + hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString()); + hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString()); collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port); if (protocol.contains("https")) { http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 20f60e1..c9c0538 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -61,6 +61,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem private String applicationId; private boolean setInstanceId; private String instanceId; + private boolean hostInMemoryAggregationEnabled; + private int hostInMemoryAggregationPort; @Override protected String getCollectorUri(String host) { @@ -98,6 +100,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem } @Override + protected boolean isHostInMemoryAggregationEnabled() { + return hostInMemoryAggregationEnabled; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return hostInMemoryAggregationPort; + } + + @Override public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) { LOG.info("Preparing Storm Metrics Sink"); try { @@ -126,6 +138,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY); setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false")); + hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY)); + hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY)); // Initialize the collector write strategy super.init(); http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java index 14f160b..5b75065 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java @@ -50,6 +50,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink private String instanceId; private String applicationId; private int timeoutSeconds; + private boolean hostInMemoryAggregationEnabled; + private int hostInMemoryAggregationPort; public StormTimelineMetricsReporter() { @@ -91,6 +93,16 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink } @Override + protected boolean isHostInMemoryAggregationEnabled() { + return hostInMemoryAggregationEnabled; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return hostInMemoryAggregationPort; + } + + @Override public void prepare(Object registrationArgument) { LOG.info("Preparing Storm Metrics Reporter"); try { @@ -119,6 +131,10 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID); setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY)); instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY); + + hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY)); + hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY)); + if (protocol.contains("https")) { String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim(); String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim(); http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 425201c..320e177 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -70,6 +70,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem private String applicationId; private String instanceId; private boolean setInstanceId; + private boolean hostInMemoryAggregationEnabled; + private int hostInMemoryAggregationPort; @Override protected String getCollectorUri(String host) { @@ -107,6 +109,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem } @Override + protected boolean isHostInMemoryAggregationEnabled() { + return hostInMemoryAggregationEnabled; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return hostInMemoryAggregationPort; + } + + @Override public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) { LOG.info("Preparing Storm Metrics Sink"); try { @@ -137,6 +149,10 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem port = configuration.getProperty(COLLECTOR_PORT, "6188"); instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY); setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false")); + + hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY)); + hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY)); + // Initialize the collector write strategy super.init(); http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index c242a2f..f984253 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -24,10 +24,13 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; import org.apache.hadoop.service.AbstractService; @@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction; @@ -62,6 +66,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; @@ -152,10 +157,14 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin scheduleAggregatorThread(dailyClusterAggregator); // Start the minute host aggregator - TimelineMetricAggregator minuteHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute( - hBaseAccessor, metricsConf, haController); - scheduleAggregatorThread(minuteHostAggregator); + if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) { + LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector"); + } else { + TimelineMetricAggregator minuteHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(minuteHostAggregator); + } // Start the hourly host aggregator TimelineMetricAggregator hourlyHostAggregator = @@ -390,6 +399,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin } @Override + public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException { + Map aggregateMap = new HashMap<>(); + for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) { + aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate()); + } + hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME); + + + return new TimelinePutResponse(); + } + + @Override public Map>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException { http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index fb369e8..3b2a119 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -40,8 +42,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index 0d5042f..023465b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -296,6 +296,8 @@ public class TimelineMetricConfiguration { public static final String AMSHBASE_METRICS_WHITESLIST_FILE = "amshbase_metrics_whitelist"; + public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = "timeline.metrics.host.inmemory.aggregation"; + private Configuration hbaseConf; private Configuration metricsConf; private Configuration amsEnvConf; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index bde09cb..d052d54 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -80,6 +81,7 @@ public interface TimelineMetricStore { */ Map> getTimelineMetricMetadata(String query) throws SQLException, IOException; + TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException; /** * Returns all hosts that have written metrics with the apps on the host * @return { hostname : [ appIds ] } http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java index 65d54c0..7b03b30 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java @@ -19,10 +19,10 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import java.util.Map; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; /** http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java deleted file mode 100644 index 825ac25..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.codehaus.jackson.annotate.JsonProperty; -import org.codehaus.jackson.annotate.JsonSubTypes; -import org.codehaus.jackson.map.ObjectMapper; - -import java.io.IOException; - -/** -* -*/ -@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class), - @JsonSubTypes.Type(value = MetricHostAggregate.class)}) -@InterfaceAudience.Public -@InterfaceStability.Unstable -public class MetricAggregate { - private static final ObjectMapper mapper = new ObjectMapper(); - - protected Double sum = 0.0; - protected Double deviation; - protected Double max = Double.MIN_VALUE; - protected Double min = Double.MAX_VALUE; - - public MetricAggregate() { - } - - MetricAggregate(Double sum, Double deviation, Double max, - Double min) { - this.sum = sum; - this.deviation = deviation; - this.max = max; - this.min = min; - } - - public void updateSum(Double sum) { - this.sum += sum; - } - - public void updateMax(Double max) { - if (max > this.max) { - this.max = max; - } - } - - public void updateMin(Double min) { - if (min < this.min) { - this.min = min; - } - } - - @JsonProperty("sum") - public Double getSum() { - return sum; - } - - @JsonProperty("deviation") - public Double getDeviation() { - return deviation; - } - - @JsonProperty("max") - public Double getMax() { - return max; - } - - @JsonProperty("min") - public Double getMin() { - return min; - } - - public void setSum(Double sum) { - this.sum = sum; - } - - public void setDeviation(Double deviation) { - this.deviation = deviation; - } - - public void setMax(Double max) { - this.max = max; - } - - public void setMin(Double min) { - this.min = min; - } - - public String toJSON() throws IOException { - return mapper.writeValueAsString(this); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java deleted file mode 100644 index 9c837b6..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - - -import org.codehaus.jackson.annotate.JsonCreator; -import org.codehaus.jackson.annotate.JsonProperty; - -/** -* -*/ -public class MetricClusterAggregate extends MetricAggregate { - private int numberOfHosts; - - @JsonCreator - public MetricClusterAggregate() { - } - - public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation, - Double max, Double min) { - super(sum, deviation, max, min); - this.numberOfHosts = numberOfHosts; - } - - @JsonProperty("numberOfHosts") - public int getNumberOfHosts() { - return numberOfHosts; - } - - public void updateNumberOfHosts(int count) { - this.numberOfHosts += count; - } - - public void setNumberOfHosts(int numberOfHosts) { - this.numberOfHosts = numberOfHosts; - } - - /** - * Find and update min, max and avg for a minute - */ - public void updateAggregates(MetricClusterAggregate hostAggregate) { - updateMax(hostAggregate.getMax()); - updateMin(hostAggregate.getMin()); - updateSum(hostAggregate.getSum()); - updateNumberOfHosts(hostAggregate.getNumberOfHosts()); - } - - @Override - public String toString() { - return "MetricAggregate{" + - "sum=" + sum + - ", numberOfHosts=" + numberOfHosts + - ", deviation=" + deviation + - ", max=" + max + - ", min=" + min + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java deleted file mode 100644 index 340ec75..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - - -import org.codehaus.jackson.annotate.JsonCreator; -import org.codehaus.jackson.annotate.JsonProperty; - -/** - * Represents a collection of minute based aggregation of values for - * resolution greater than a minute. - */ -public class MetricHostAggregate extends MetricAggregate { - - private long numberOfSamples = 0; - - @JsonCreator - public MetricHostAggregate() { - super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE); - } - - public MetricHostAggregate(Double sum, int numberOfSamples, - Double deviation, - Double max, Double min) { - super(sum, deviation, max, min); - this.numberOfSamples = numberOfSamples; - } - - @JsonProperty("numberOfSamples") - public long getNumberOfSamples() { - return numberOfSamples == 0 ? 1 : numberOfSamples; - } - - public void updateNumberOfSamples(long count) { - this.numberOfSamples += count; - } - - public void setNumberOfSamples(long numberOfSamples) { - this.numberOfSamples = numberOfSamples; - } - - public double getAvg() { - return sum / numberOfSamples; - } - - /** - * Find and update min, max and avg for a minute - */ - public void updateAggregates(MetricHostAggregate hostAggregate) { - updateMax(hostAggregate.getMax()); - updateMin(hostAggregate.getMin()); - updateSum(hostAggregate.getSum()); - updateNumberOfSamples(hostAggregate.getNumberOfSamples()); - } - - @Override - public String toString() { - return "MetricHostAggregate{" + - "sum=" + sum + - ", numberOfSamples=" + numberOfSamples + - ", deviation=" + deviation + - ", max=" + max + - ", min=" + min + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java index 44aca03..9eaf456 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java @@ -21,6 +21,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java index 0934356..ba16b43 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index a5a3499..34b1f9b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -38,6 +38,7 @@ import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java index 0ea9c08..a17433b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java index b5f49fb..672f85f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index 9da921a..50cfb08 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; @@ -285,6 +286,36 @@ public class TimelineWebServices { } } + /** + * Store the given metrics into the timeline store, and return errors that + * happened during storing. + */ + @Path("/metrics/aggregated") + @POST + @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public TimelinePutResponse postAggregatedMetrics( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + AggregationResult metrics) { + + init(res); + if (metrics == null) { + return new TimelinePutResponse(); + } + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing aggregated metrics: " + + TimelineUtils.dumpTimelineRecordtoJSON(metrics, true)); + } + + return timelineMetricStore.putHostAggregatedMetrics(metrics); + } catch (Exception e) { + LOG.error("Error saving metrics.", e); + throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + @Path("/containermetrics") @POST @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java index 0087fd9..d5baaef 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java @@ -26,12 +26,12 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java index 37ec134..7eeb9c4 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java index a910cc2..d668178 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java @@ -22,11 +22,11 @@ import com.google.common.collect.Multimap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java index 44f48e8..3009163 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java @@ -18,7 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics .timeline; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -34,7 +34,7 @@ public class TestMetricHostAggregate { assertThat(aggregate.getSum()).isEqualTo(3.0); assertThat(aggregate.getMin()).isEqualTo(1.0); assertThat(aggregate.getMax()).isEqualTo(2.0); - assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2); + assertThat(aggregate.calculateAverage()).isEqualTo(3.0 / 2); } @Test @@ -50,7 +50,7 @@ public class TestMetricHostAggregate { assertThat(aggregate.getSum()).isEqualTo(12.0); assertThat(aggregate.getMin()).isEqualTo(0.5); assertThat(aggregate.getMax()).isEqualTo(7.5); - assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5); + assertThat(aggregate.calculateAverage()).isEqualTo((3.0 + 8.0 + 1.0) / 5); } static MetricHostAggregate createAggregate (Double sum, Double min, @@ -63,4 +63,4 @@ public class TestMetricHostAggregate { aggregate.setNumberOfSamples(samplesCount); return aggregate; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java index f00906e..ac2f9d7 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -92,6 +93,11 @@ public class TestTimelineMetricStore implements TimelineMetricStore { } @Override + public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException { + return null; + } + + @Override public Map> getHostAppsMetadata() throws SQLException, IOException { return Collections.emptyMap(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java index fa0cfe9..53f6f6c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java index f083731..07fd85d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java @@ -20,13 +20,13 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java index 9873643..75b3f91 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest; @@ -124,14 +125,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { assertEquals(0.0, currentHostAggregate.getMin()); assertEquals(20, currentHostAggregate.getNumberOfSamples()); assertEquals(15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); count++; } else if ("mem_free".equals(currentMetric.getMetricName())) { assertEquals(2.0, currentHostAggregate.getMax()); assertEquals(0.0, currentHostAggregate.getMin()); assertEquals(20, currentHostAggregate.getNumberOfSamples()); assertEquals(15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); count++; } else { fail("Unexpected entry"); @@ -198,7 +199,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { assertEquals(0.0, currentHostAggregate.getMin()); assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); assertEquals(12 * 15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); } } } @@ -260,7 +261,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { assertEquals(0.0, currentHostAggregate.getMin()); assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); assertEquals(12 * 15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); } } } @@ -309,14 +310,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { assertEquals(0.0, currentHostAggregate.getMin()); assertEquals(20, currentHostAggregate.getNumberOfSamples()); assertEquals(15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); count++; } else if ("mem_free".equals(currentMetric.getMetricName())) { assertEquals(2.0, currentHostAggregate.getMax()); assertEquals(0.0, currentHostAggregate.getMin()); assertEquals(20, currentHostAggregate.getNumberOfSamples()); assertEquals(15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); count++; } else { fail("Unexpected entry"); http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java index 78db11d..6541b2c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml index 2d88912..02f9574 100644 --- a/ambari-metrics/pom.xml +++ b/ambari-metrics/pom.xml @@ -33,6 +33,7 @@ ambari-metrics-host-monitoring ambari-metrics-grafana ambari-metrics-assembly + ambari-metrics-host-aggregator UTF-8 http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java index 8d1f63f..a0765bf 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java @@ -300,6 +300,16 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements return hostName; } + @Override + protected boolean isHostInMemoryAggregationEnabled() { + return false; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return 0; + } + private List getFilteredMetricList(List metrics) { final List metricList = new ArrayList<>(); for (SingleMetric metric : metrics) { http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py index 150b0a8..5d21514 100644 --- a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py @@ -153,6 +153,8 @@ if has_metric_collector: pass metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60) metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10) +host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True) +host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888) # if accumulo is selected accumulo_tserver_hosts should not be empty, but still default just in case if 'slave_hosts' in config['clusterHostInfo']: http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2 b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2 index 6873c85..742ea3c 100644 --- a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2 +++ b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2 @@ -16,6 +16,9 @@ # Poll collectors every {{metrics_report_interval}} seconds *.period={{metrics_collection_period}} +*.host_in_memory_aggregation = {{host_in_memory_aggregation}} +*.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}} + {% if has_metric_collector %} *.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml index cb66537..4d33661 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml @@ -101,6 +101,14 @@ + timeline.metrics.host.inmemory.aggregation.jvm.arguments + -Xmx256m -Xms128m -XX:PermSize=68m + + Local aggregator jvm extra arguments separated with spaces + + + + timeline.metrics.skip.network.interfaces.patterns None http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml index 8e1671e..1b085f6 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml @@ -787,4 +787,15 @@ {{cluster_zookeeper_clientPort}} + + timeline.metrics.host.inmemory.aggregation + false + if set to "true" host metrics will be aggregated in memory on each host + + + + timeline.metrics.host.inmemory.aggregation.port + 61888 + + http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml index 740a91a..9031b46 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml @@ -93,6 +93,9 @@ true + + ams-site + http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py index a929847..f49d47d 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py @@ -163,6 +163,20 @@ def ams(name=None): create_parents = True ) + if params.host_in_memory_aggregation and params.log4j_props is not None: + File(os.path.join(params.ams_monitor_conf_dir, "log4j.properties"), + owner=params.ams_user, + content=params.log4j_props + ) + + XmlConfig("ams-site.xml", + conf_dir=params.ams_monitor_conf_dir, + configurations=params.config['configurations']['ams-site'], + configuration_attributes=params.config['configuration_attributes']['ams-site'], + owner=params.ams_user, + group=params.user_group + ) + TemplateConfig( os.path.join(params.ams_monitor_conf_dir, "metric_monitor.ini"), owner=params.ams_user, @@ -366,6 +380,22 @@ def ams(name=None, action=None): create_parents = True ) + if params.host_in_memory_aggregation and params.log4j_props is not None: + File(format("{params.ams_monitor_conf_dir}/log4j.properties"), + mode=0644, + group=params.user_group, + owner=params.ams_user, + content=InlineTemplate(params.log4j_props) + ) + + XmlConfig("ams-site.xml", + conf_dir=params.ams_monitor_conf_dir, + configurations=params.config['configurations']['ams-site'], + configuration_attributes=params.config['configuration_attributes']['ams-site'], + owner=params.ams_user, + group=params.user_group + ) + Execute(format("{sudo} chown -R {ams_user}:{user_group} {ams_monitor_log_dir}") ) http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py index 50dde1c..b8c14f4 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py @@ -224,6 +224,11 @@ metrics_collector_heapsize = check_append_heap_property(str(metrics_collector_he master_heapsize = check_append_heap_property(str(master_heapsize), "m") regionserver_heapsize = check_append_heap_property(str(regionserver_heapsize), "m") +host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True) +host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888) +host_in_memory_aggregation_jvm_arguments = default("/configurations/ams-env/timeline.metrics.host.inmemory.aggregation.jvm.arguments", + "-Xmx256m -Xms128m -XX:PermSize=68m") + regionserver_xmn_max = default('/configurations/ams-hbase-env/hbase_regionserver_xmn_max', None) if regionserver_xmn_max: regionserver_xmn_max = int(trim_heap_property(str(regionserver_xmn_max), "m")) http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2 index 9729bbe..bb0db4f 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2 +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2 @@ -58,6 +58,9 @@ rpc.protocol={{metric_collector_protocol}} *.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar *.sink.timeline.slave.host.name={{hostname}} +*.host_in_memory_aggregation = {{host_in_memory_aggregation}} +*.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}} + hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink hbase.sink.timeline.period={{metrics_collection_period}} hbase.sink.timeline.sendInterval={{metrics_report_interval}}000 http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 index 769ad67..b7dee50 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 @@ -38,3 +38,10 @@ failover_strategy = {{failover_strategy}} failover_strategy_blacklisted_interval_seconds = {{failover_strategy_blacklisted_interval_seconds}} port = {{metric_collector_port}} https_enabled = {{metric_collector_https_enabled}} + +[aggregation] +host_in_memory_aggregation = {{host_in_memory_aggregation}} +host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}} +java_home = {{java64_home}} +jvm_arguments = {{host_in_memory_aggregation_jvm_arguments}} +ams_monitor_log_dir = {{ams_monitor_log_dir}} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py index 86a290f..0e0c9aa 100644 --- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py @@ -124,6 +124,9 @@ if has_metric_collector: metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60) metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10) +host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True) +host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888) + # Cluster Zookeeper quorum zookeeper_quorum = None if not len(default("/clusterHostInfo/zookeeper_hosts", [])) == 0: