ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From avija...@apache.org
Subject [4/5] ambari git commit: Revert "AMBARI-15623. Support distributed aggregation for multiple AMS instances. (swagle)"
Date Thu, 19 May 2016 20:59:43 GMT
Revert "AMBARI-15623. Support distributed aggregation for multiple AMS instances. (swagle)"

This reverts commit dfa4454e7d69fb160924a3877d3f3f1a6314c7bd.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/342c510e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/342c510e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/342c510e

Branch: refs/heads/branch-2.4
Commit: 342c510e5fd0605c4263a53de83510083857ea9c
Parents: 75c1981
Author: Aravindan Vijayan <avijayan@hortonworks.com>
Authored: Wed May 18 14:08:13 2016 -0700
Committer: Aravindan Vijayan <avijayan@hortonworks.com>
Committed: Thu May 19 13:44:25 2016 -0700

----------------------------------------------------------------------
 .../ambari-metrics-timelineservice/pom.xml      |  29 +-
 .../timeline/HBaseTimelineMetricStore.java      |  70 +----
 .../timeline/TimelineMetricConfiguration.java   |  52 ----
 .../metrics/timeline/TimelineMetricStore.java   |   6 -
 .../aggregators/AbstractTimelineAggregator.java | 106 ++-----
 .../aggregators/TimelineMetricAggregator.java   |  26 +-
 .../TimelineMetricAggregatorFactory.java        | 100 +++----
 .../TimelineMetricClusterAggregator.java        |   9 +-
 .../TimelineMetricClusterAggregatorSecond.java  |  13 +-
 .../TimelineMetricHostAggregator.java           |  11 +-
 .../v2/TimelineMetricClusterAggregator.java     |  11 +-
 .../v2/TimelineMetricHostAggregator.java        |  10 +-
 .../availability/AggregationTaskRunner.java     | 144 ----------
 .../availability/CheckpointManager.java         |  98 -------
 .../OnlineOfflineStateModelFactory.java         |  69 -----
 .../TimelineMetricHAController.java             | 276 -------------------
 .../TimelineMetricMetadataManager.java          |   4 -
 .../query/DefaultPhoenixDataSource.java         |   2 +-
 .../webapp/TimelineWebServices.java             |  12 -
 .../timeline/ITPhoenixHBaseAccessor.java        |  12 +-
 .../timeline/TestTimelineMetricStore.java       |   5 -
 .../AbstractTimelineAggregatorTest.java         |   7 +-
 .../aggregators/ITClusterAggregator.java        |  20 +-
 .../aggregators/ITMetricAggregator.java         |  19 +-
 ...melineMetricClusterAggregatorSecondTest.java |   9 +-
 .../server/upgrade/UpgradeCatalog240.java       |   2 +-
 .../0.1.0/configuration/ams-env.xml             |   3 -
 .../AMBARI_METRICS/0.1.0/metainfo.xml           |   2 +-
 28 files changed, 143 insertions(+), 984 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index d962b8b..9fc3679 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -34,9 +34,9 @@
     <!-- Needed for generating FindBugs warnings using parent pom -->
     <!--<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>-->
     <protobuf.version>2.5.0</protobuf.version>
-    <hadoop.version>[2.7.1.2.5.0.0,2.7.1.2.5.0.0-9999)</hadoop.version>
-    <phoenix.version>[4.4.0.2.5.0.0,4.4.0.2.5.0.0-9999)</phoenix.version>
-    <hbase.version>[1.1.2.2.5.0.0,1.1.2.2.5.0.0-9999)</hbase.version>
+    <hadoop.version>2.7.1.2.3.4.0-3347</hadoop.version>
+    <phoenix.version>4.4.0.2.3.4.0-3347</phoenix.version>
+    <hbase.version>1.1.2.2.3.4.0-3347</hbase.version>
   </properties>
 
   <build>
@@ -249,25 +249,6 @@
   </build>
 
   <dependencies>
-
-    <dependency>
-      <groupId>org.apache.helix</groupId>
-      <artifactId>helix-core</artifactId>
-      <version>0.6.5</version>
-      <exclusions>
-        <exclusion>
-          <artifactId>zookeeper</artifactId>
-          <groupId>org.apache.zookeeper</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <artifactId>zookeeper</artifactId>
-      <groupId>org.apache.zookeeper</groupId>
-      <version>3.4.8</version>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix-core</artifactId>
@@ -584,10 +565,6 @@
             <groupId>org.jruby</groupId>
             <artifactId>jruby-complete</artifactId>
           </exclusion>
-          <exclusion>
-            <artifactId>zookeeper</artifactId>
-            <groupId>org.apache.zookeeper</groupId>
-          </exclusion>
         </exclusions>
       </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 974f951..83aa36b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
@@ -43,21 +41,17 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
 
 public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore {
 
@@ -65,10 +59,8 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
   private final TimelineMetricConfiguration configuration;
   private PhoenixHBaseAccessor hBaseAccessor;
   private static volatile boolean isInitialized = false;
-  private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
-  private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
+  private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
   private TimelineMetricMetadataManager metricMetadataManager;
-  private TimelineMetricHAController haController;
   private Integer defaultTopNHostsLimit;
 
   /**
@@ -97,18 +89,6 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
       metricMetadataManager.initializeMetadata();
       // Initialize policies before TTL update
       hBaseAccessor.initPoliciesAndTTL();
-      // Start HA service
-      if (configuration.isDistributedOperationModeEnabled()) {
-        // Start the controller
-        haController = new TimelineMetricHAController(configuration);
-        try {
-          haController.initializeHAController();
-        } catch (Exception e) {
-          LOG.error(e);
-          throw new MetricsSystemInitializationException("Unable to " +
-            "initialize HA controller", e);
-        }
-      }
 
       defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
       if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
@@ -117,53 +97,46 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
 
       // Start the cluster aggregator second
       TimelineMetricAggregator secondClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
-          hBaseAccessor, metricsConf, metricMetadataManager, haController);
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager);
       scheduleAggregatorThread(secondClusterAggregator);
 
       // Start the minute cluster aggregator
       TimelineMetricAggregator minuteClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
-          hBaseAccessor, metricsConf, haController);
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
       scheduleAggregatorThread(minuteClusterAggregator);
 
       // Start the hourly cluster aggregator
       TimelineMetricAggregator hourlyClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
-          hBaseAccessor, metricsConf, haController);
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
       scheduleAggregatorThread(hourlyClusterAggregator);
 
       // Start the daily cluster aggregator
       TimelineMetricAggregator dailyClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
-          hBaseAccessor, metricsConf, haController);
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
       scheduleAggregatorThread(dailyClusterAggregator);
 
       // Start the minute host aggregator
       TimelineMetricAggregator minuteHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
-          hBaseAccessor, metricsConf, haController);
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
       scheduleAggregatorThread(minuteHostAggregator);
 
       // Start the hourly host aggregator
       TimelineMetricAggregator hourlyHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
-          hBaseAccessor, metricsConf, haController);
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
       scheduleAggregatorThread(hourlyHostAggregator);
 
       // Start the daily host aggregator
       TimelineMetricAggregator dailyHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
-          hBaseAccessor, metricsConf, haController);
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
       scheduleAggregatorThread(dailyHostAggregator);
 
       if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
         int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
         int delay = configuration.getTimelineMetricsServiceWatcherDelay();
         // Start the watchdog
-        watchdogExecutorService.scheduleWithFixedDelay(
-          new TimelineMetricStoreWatcher(this, configuration),
-          initDelay, delay, TimeUnit.SECONDS);
+        executorService.scheduleWithFixedDelay(
+          new TimelineMetricStoreWatcher(this, configuration), initDelay, delay,
+          TimeUnit.SECONDS);
         LOG.info("Started watchdog for timeline metrics store with initial " +
           "delay = " + initDelay + ", delay = " + delay);
       }
@@ -349,30 +322,13 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
     return metricMetadataManager.getHostedAppsCache();
   }
 
-  @Override
-  public List<String> getLiveInstances() {
-    return haController.getLiveInstanceHostNames();
-  }
-
-  private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
+  private void scheduleAggregatorThread(TimelineMetricAggregator aggregator) {
+    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
     if (!aggregator.isDisabled()) {
-      ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
-          }
-        }
-      );
-      scheduledExecutors.put(aggregator.getName(), executorService);
       executorService.scheduleAtFixedRate(aggregator,
         0l,
         aggregator.getSleepIntervalMillis(),
         TimeUnit.MILLISECONDS);
-      LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
-        + aggregator.getSleepIntervalMillis() + " milliseconds.");
-    } else {
-      LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 5a04ad2..8e9a8d6 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -23,11 +23,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 
-import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.net.UnknownHostException;
 
 /**
  * Configuration class that reads properties from ams-site.xml. All values
@@ -40,7 +38,6 @@ public class TimelineMetricConfiguration {
 
   public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml";
   public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
-  public static final String METRICS_ENV_CONFIGURATION_FILE = "ams-env.xml";
 
   public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
     "timeline.metrics.aggregator.checkpoint.dir";
@@ -228,25 +225,16 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_METRICS_TABLES_DURABILITY =
     "timeline.metrics.tables.durability";
 
-<<<<<<< HEAD
-  public static final String TIMELINE_METRIC_METADATA_FILTERS =
-    "timeline.metrics.service.metadata.filters";
-
   public static final String HBASE_BLOCKING_STORE_FILES =
     "hbase.hstore.blockingStoreFiles";
 
   public static final String DEFAULT_TOPN_HOSTS_LIMIT =
     "timeline.metrics.default.topn.hosts.limit";
 
-=======
->>>>>>> parent of e3c9816... AMBARI-15902. Refactor Metadata manager for supporting distributed collector. (swagle)
   public static final String HOST_APP_ID = "HOST";
 
-  public static final String DEFAULT_INSTANCE_PORT = "12001";
-
   private Configuration hbaseConf;
   private Configuration metricsConf;
-  private Configuration amsEnvConf;
   private volatile boolean isInitialized = false;
 
   public void initialize() throws URISyntaxException, MalformedURLException {
@@ -273,7 +261,6 @@ public class TimelineMetricConfiguration {
     hbaseConf.addResource(hbaseResUrl.toURI().toURL());
     metricsConf = new Configuration(true);
     metricsConf.addResource(amsResUrl.toURI().toURL());
-
     isInitialized = true;
   }
 
@@ -291,37 +278,6 @@ public class TimelineMetricConfiguration {
     return metricsConf;
   }
 
-  public String getZKClientPort() throws MalformedURLException, URISyntaxException {
-    if (!isInitialized) {
-      initialize();
-    }
-    return hbaseConf.getTrimmed("hbase.zookeeper.property.clientPort", "2181");
-  }
-
-  public String getZKQuorum() throws MalformedURLException, URISyntaxException {
-    if (!isInitialized) {
-      initialize();
-    }
-    return hbaseConf.getTrimmed("hbase.zookeeper.quorum");
-  }
-
-  public String getInstanceHostnameFromEnv() throws UnknownHostException {
-    String amsInstanceName = System.getProperty("AMS_INSTANCE_NAME");
-    if (amsInstanceName == null) {
-      amsInstanceName = InetAddress.getLocalHost().getHostName();
-    }
-    return amsInstanceName;
-  }
-
-  public String getInstancePort() throws MalformedURLException, URISyntaxException {
-    String amsInstancePort = System.getProperty("AMS_INSTANCE_PORT");
-    if (amsInstancePort == null) {
-      // Check config
-      return getMetricsConf().get("timeline.metrics.availability.instance.port", DEFAULT_INSTANCE_PORT);
-    }
-    return DEFAULT_INSTANCE_PORT;
-  }
-
   public String getWebappAddress() {
     String defaultHttpAddress = "0.0.0.0:6188";
     if (metricsConf != null) {
@@ -379,12 +335,4 @@ public class TimelineMetricConfiguration {
     }
     return defaultRpcAddress;
   }
-
-  public boolean isDistributedOperationModeEnabled() {
-    try {
-      return getMetricsConf().get("timeline.metrics.service.operation.mode").equals("distributed");
-    } catch (Exception e) {
-      return false;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index e37bc4d..3e70330 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -85,10 +85,4 @@ public interface TimelineMetricStore {
    * @throws IOException
    */
   Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException;
-
-  /**
-   * Return a list of known live collector nodes
-   * @return [ hostname ]
-   */
-  List<String> getLiveInstances();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index ae87cf1..ba7807b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -20,9 +20,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.slf4j.LoggerFactory;
@@ -37,7 +34,6 @@ import java.util.Date;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
 
 /**
  * Base class for all runnable aggregators. Provides common functions like
@@ -56,12 +52,11 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
   protected String tableName;
   protected String outputTableName;
   protected Long nativeTimeRangeDelay;
-  protected AggregationTaskRunner taskRunner;
 
   // Explicitly name aggregators for logging needs
-  private final AGGREGATOR_NAME aggregatorName;
+  private final String aggregatorName;
 
-  AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
+  AbstractTimelineAggregator(String aggregatorName,
                              PhoenixHBaseAccessor hBaseAccessor,
                              Configuration metricsConf) {
     this.aggregatorName = aggregatorName;
@@ -69,10 +64,10 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     this.metricsConf = metricsConf;
     this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
     this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
-    this.LOG = LoggerFactory.getLogger(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
+    this.LOG = LoggerFactory.getLogger(aggregatorName);
   }
 
-  public AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
+  public AbstractTimelineAggregator(String aggregatorName,
                                     PhoenixHBaseAccessor hBaseAccessor,
                                     Configuration metricsConf,
                                     String checkpointLocation,
@@ -81,8 +76,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
                                     String aggregatorDisableParam,
                                     String tableName,
                                     String outputTableName,
-                                    Long nativeTimeRangeDelay,
-                                    TimelineMetricHAController haController) {
+                                    Long nativeTimeRangeDelay) {
     this(aggregatorName, hBaseAccessor, metricsConf);
     this.checkpointLocation = checkpointLocation;
     this.sleepIntervalMillis = sleepIntervalMillis;
@@ -90,9 +84,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     this.aggregatorDisableParam = aggregatorDisableParam;
     this.tableName = tableName;
     this.outputTableName = outputTableName;
-    this.nativeTimeRangeDelay = nativeTimeRangeDelay;
-    this.taskRunner = haController != null && haController.isInitialized() ?
-      haController.getAggregationTaskRunner() : null;
+    this.nativeTimeRangeDelay =  nativeTimeRangeDelay;
   }
 
   @Override
@@ -106,39 +98,25 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
    * Access relaxed for tests
    */
   public void runOnce(Long SLEEP_INTERVAL) {
-    boolean performAggregationFunction = true;
-    if (taskRunner != null) {
-      switch (getAggregatorType()) {
-        case HOST:
-          performAggregationFunction = taskRunner.performsHostAggregation();
-          break;
-        case CLUSTER:
-          performAggregationFunction = taskRunner.performsClusterAggregation();
-      }
-    }
 
-    if (performAggregationFunction) {
-      long currentTime = System.currentTimeMillis();
-      long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
+    long currentTime = System.currentTimeMillis();
+    long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
 
-      if (lastCheckPointTime != -1) {
-        LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
-          + ((currentTime - lastCheckPointTime) / 1000)
-          + " seconds.");
-
-        boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
-
-        if (success) {
-          try {
-            saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
-          } catch (IOException io) {
-            LOG.warn("Error saving checkpoint, restarting aggregation at " +
-              "previous checkpoint.");
-          }
+    if (lastCheckPointTime != -1) {
+      LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+        + ((currentTime - lastCheckPointTime) / 1000)
+        + " seconds.");
+
+      boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
+
+      if (success) {
+        try {
+          saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+        } catch (IOException io) {
+          LOG.warn("Error saving checkpoint, restarting aggregation at " +
+            "previous checkpoint.");
         }
       }
-    } else {
-      LOG.info("Skipping aggregation function not owned by this instance.");
     }
   }
 
@@ -196,9 +174,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
   }
 
   protected long readCheckPoint() {
-    if (taskRunner != null) {
-      return taskRunner.getCheckpointManager().readCheckpoint(aggregatorName);
-    }
     try {
       File checkpoint = new File(getCheckpointLocation());
       if (checkpoint.exists()) {
@@ -214,23 +189,15 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
   }
 
   protected void saveCheckPoint(long checkpointTime) throws IOException {
-    if (taskRunner != null) {
-      boolean success = taskRunner.getCheckpointManager().writeCheckpoint(aggregatorName, checkpointTime);
-      if (!success) {
-        LOG.error("Error saving checkpoint with AggregationTaskRunner, " +
-          "aggregator = " + aggregatorName + "value = " + checkpointTime);
+    File checkpoint = new File(getCheckpointLocation());
+    if (!checkpoint.exists()) {
+      boolean done = checkpoint.createNewFile();
+      if (!done) {
+        throw new IOException("Could not create checkpoint at location, " +
+          getCheckpointLocation());
       }
-    } else {
-      File checkpoint = new File(getCheckpointLocation());
-      if (!checkpoint.exists()) {
-        boolean done = checkpoint.createNewFile();
-        if (!done) {
-          throw new IOException("Could not create checkpoint at location, " +
-            getCheckpointLocation());
-        }
-      }
-      FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
     }
+    FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
   }
 
   /**
@@ -350,21 +317,4 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     return currentTime - (currentTime % aggregatorPeriod);
   }
 
-  /**
-   * Get @AGGREGATOR_TYPE based on the output table.
-   * This is solely used by the HAController to determine which lock to acquire.
-   */
-  public AGGREGATOR_TYPE getAggregatorType() {
-    if (outputTableName.contains("RECORD")) {
-      return AGGREGATOR_TYPE.HOST;
-    } else if (outputTableName.contains("AGGREGATE")) {
-      return AGGREGATOR_TYPE.CLUSTER;
-    }
-    return null;
-  }
-
-  @Override
-  public AGGREGATOR_NAME getName() {
-    return aggregatorName;
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
index 150e3f1..295db0e 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
@@ -1,7 +1,5 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -22,38 +20,22 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 public interface TimelineMetricAggregator extends Runnable {
   /**
    * Aggregate metric data within the time bounds.
-   *
    * @param startTime start time millis
-   * @param endTime   end time millis
+   * @param endTime end time millis
    * @return success
    */
-  boolean doWork(long startTime, long endTime);
+  public boolean doWork(long startTime, long endTime);
 
   /**
    * Is aggregator is disabled by configuration.
-   *
    * @return true/false
    */
-  boolean isDisabled();
+  public boolean isDisabled();
 
   /**
    * Return aggregator Interval
-   *
    * @return Interval in Millis
    */
-  Long getSleepIntervalMillis();
-
-  /**
-   * Get aggregator name
-   * @return @AGGREGATOR_NAME
-   */
-  AGGREGATOR_NAME getName();
+  public Long getSleepIntervalMillis();
 
-  /**
-   * Known aggregator types
-   */
-  enum AGGREGATOR_TYPE {
-    CLUSTER,
-    HOST
   }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index 4c44f9e..cc85c56 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -30,12 +29,12 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
@@ -49,13 +48,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
@@ -94,8 +86,7 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 5 mins
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricHAController haController) {
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -113,7 +104,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
-        METRIC_RECORD_MINUTE,
+        "TimelineMetricHostAggregatorMinute",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -121,13 +112,12 @@ public class TimelineMetricAggregatorFactory {
         hostAggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        120000l,
-        haController
+        120000l
       );
     }
 
     return new TimelineMetricHostAggregator(
-      METRIC_RECORD_MINUTE,
+      "TimelineMetricHostAggregatorMinute",
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -135,8 +125,7 @@ public class TimelineMetricAggregatorFactory {
       hostAggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      120000l,
-      haController);
+      120000l);
   }
 
   /**
@@ -144,8 +133,7 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 1 hour
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricHAController haController) {
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -163,7 +151,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
-        METRIC_RECORD_HOURLY,
+        "TimelineMetricHostAggregatorHourly",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -171,13 +159,12 @@ public class TimelineMetricAggregatorFactory {
         hostAggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        3600000l,
-        haController
+        3600000l
       );
     }
 
     return new TimelineMetricHostAggregator(
-      METRIC_RECORD_HOURLY,
+      "TimelineMetricHostAggregatorHourly",
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -185,8 +172,7 @@ public class TimelineMetricAggregatorFactory {
       hostAggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      3600000l,
-      haController);
+      3600000l);
   }
 
   /**
@@ -194,8 +180,7 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 1 day
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricHAController haController) {
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -213,7 +198,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
-        METRIC_RECORD_DAILY,
+        "TimelineMetricHostAggregatorDaily",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -221,13 +206,12 @@ public class TimelineMetricAggregatorFactory {
         hostAggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        3600000l,
-        haController
+        3600000l
       );
     }
 
     return new TimelineMetricHostAggregator(
-      METRIC_RECORD_DAILY,
+      "TimelineMetricHostAggregatorDaily",
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -235,8 +219,7 @@ public class TimelineMetricAggregatorFactory {
       hostAggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      3600000l,
-      haController);
+      3600000l);
   }
 
   /**
@@ -246,8 +229,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricMetadataManager metadataManager,
-    TimelineMetricHAController haController) {
+    TimelineMetricMetadataManager metadataManager) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -270,7 +252,7 @@ public class TimelineMetricAggregatorFactory {
 
     // Second based aggregation have added responsibility of time slicing
     return new TimelineMetricClusterAggregatorSecond(
-      METRIC_AGGREGATE_SECOND,
+      "TimelineClusterAggregatorSecond",
       metadataManager,
       hBaseAccessor, metricsConf,
       checkpointLocation,
@@ -280,8 +262,7 @@ public class TimelineMetricAggregatorFactory {
       inputTableName,
       outputTableName,
       120000l,
-      timeSliceIntervalMillis,
-      haController
+      timeSliceIntervalMillis
     );
   }
 
@@ -290,8 +271,7 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 5 mins
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricHAController haController) {
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -311,7 +291,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
-        METRIC_AGGREGATE_MINUTE,
+        "TimelineClusterAggregatorMinute",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -319,13 +299,12 @@ public class TimelineMetricAggregatorFactory {
         aggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        120000l,
-        haController
+        120000l
       );
     }
 
     return new TimelineMetricClusterAggregator(
-      METRIC_AGGREGATE_MINUTE,
+      "TimelineClusterAggregatorMinute",
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -333,8 +312,7 @@ public class TimelineMetricAggregatorFactory {
       aggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      120000l,
-      haController
+      120000l
     );
   }
 
@@ -343,8 +321,7 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 1 hour
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricHAController haController) {
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -364,7 +341,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
-        METRIC_AGGREGATE_HOURLY,
+        "TimelineClusterAggregatorHourly",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -372,13 +349,12 @@ public class TimelineMetricAggregatorFactory {
         aggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        120000l,
-        haController
+        120000l
       );
     }
 
     return new TimelineMetricClusterAggregator(
-      METRIC_AGGREGATE_HOURLY,
+      "TimelineClusterAggregatorHourly",
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -386,8 +362,7 @@ public class TimelineMetricAggregatorFactory {
       aggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      120000l,
-      haController
+      120000l
     );
   }
 
@@ -396,8 +371,7 @@ public class TimelineMetricAggregatorFactory {
    * Interval : 1 day
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricHAController haController) {
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -417,7 +391,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
-        METRIC_AGGREGATE_DAILY,
+        "TimelineClusterAggregatorDaily",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -425,13 +399,12 @@ public class TimelineMetricAggregatorFactory {
         aggregatorDisabledParam,
         inputTableName,
         outputTableName,
-        120000l,
-        haController
+        120000l
       );
     }
 
     return new TimelineMetricClusterAggregator(
-      METRIC_AGGREGATE_DAILY,
+      "TimelineClusterAggregatorDaily",
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -439,8 +412,7 @@ public class TimelineMetricAggregatorFactory {
       aggregatorDisabledParam,
       inputTableName,
       outputTableName,
-      120000l,
-      haController
+      120000l
     );
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 6438256..f90b01f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 
@@ -38,7 +36,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
   private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true);
   private final boolean isClusterPrecisionInputTable;
 
-  public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
+  public TimelineMetricClusterAggregator(String aggregatorName,
                                          PhoenixHBaseAccessor hBaseAccessor,
                                          Configuration metricsConf,
                                          String checkpointLocation,
@@ -47,12 +45,11 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
                                          String hostAggregatorDisabledParam,
                                          String inputTableName,
                                          String outputTableName,
-                                         Long nativeTimeRangeDelay,
-                                         TimelineMetricHAController haController) {
+                                         Long nativeTimeRangeDelay) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier,
       hostAggregatorDisabledParam, inputTableName, outputTableName,
-      nativeTimeRangeDelay, haController);
+      nativeTimeRangeDelay);
     isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 6a16ee6..a8d3086 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -22,12 +22,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -41,6 +39,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 
 /**
  * Aggregates a metric across all hosts in the cluster. Reads metrics from
@@ -55,7 +54,8 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
   private final Long serverTimeShiftAdjustment;
   private final boolean interpolationEnabled;
 
-  public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
+
+  public TimelineMetricClusterAggregatorSecond(String aggregatorName,
                                                TimelineMetricMetadataManager metadataManager,
                                                PhoenixHBaseAccessor hBaseAccessor,
                                                Configuration metricsConf,
@@ -66,11 +66,10 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
                                                String tableName,
                                                String outputTableName,
                                                Long nativeTimeRangeDelay,
-                                               Long timeSliceInterval,
-                                               TimelineMetricHAController haController) {
+                                               Long timeSliceInterval) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
-      tableName, outputTableName, nativeTimeRangeDelay, haController);
+      tableName, outputTableName, nativeTimeRangeDelay);
 
     appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf);
     this.timeSliceIntervalMillis = timeSliceInterval;

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 364a4b5..26e73b0 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -22,24 +22,20 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
-
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
 
 public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
   private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
   TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
-  public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
+  public TimelineMetricHostAggregator(String aggregatorName,
                                       PhoenixHBaseAccessor hBaseAccessor,
                                       Configuration metricsConf,
                                       String checkpointLocation,
@@ -48,11 +44,10 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
                                       String hostAggregatorDisabledParam,
                                       String tableName,
                                       String outputTableName,
-                                      Long nativeTimeRangeDelay,
-                                      TimelineMetricHAController haController) {
+                                      Long nativeTimeRangeDelay) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
-      tableName, outputTableName, nativeTimeRangeDelay, haController);
+      tableName, outputTableName, nativeTimeRangeDelay);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
index aeddf06..c056d79 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -20,23 +20,19 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
-
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Date;
-
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 
 public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
   private final String aggregateColumnName;
 
-  public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
+  public TimelineMetricClusterAggregator(String aggregatorName,
                                          PhoenixHBaseAccessor hBaseAccessor,
                                          Configuration metricsConf,
                                          String checkpointLocation,
@@ -45,12 +41,11 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
                                          String hostAggregatorDisabledParam,
                                          String inputTableName,
                                          String outputTableName,
-                                         Long nativeTimeRangeDelay,
-                                         TimelineMetricHAController haController) {
+                                         Long nativeTimeRangeDelay) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier,
       hostAggregatorDisabledParam, inputTableName, outputTableName,
-      nativeTimeRangeDelay, haController);
+      nativeTimeRangeDelay);
 
     if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
       aggregateColumnName = "HOSTS_COUNT";

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
index 0df8329..118c695 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -20,11 +20,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
-
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -34,7 +31,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 
 public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
 
-  public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
+  public TimelineMetricHostAggregator(String aggregatorName,
                                       PhoenixHBaseAccessor hBaseAccessor,
                                       Configuration metricsConf,
                                       String checkpointLocation,
@@ -43,11 +40,10 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
                                       String hostAggregatorDisabledParam,
                                       String tableName,
                                       String outputTableName,
-                                      Long nativeTimeRangeDelay,
-                                      TimelineMetricHAController haController) {
+                                      Long nativeTimeRangeDelay) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
-      tableName, outputTableName, nativeTimeRangeDelay, haController);
+      tableName, outputTableName, nativeTimeRangeDelay);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
deleted file mode 100644
index 4a1f17b..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.participant.StateMachineEngine;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.HOST;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
-
-public class AggregationTaskRunner {
-  private final String instanceName;
-  private final String zkAddress;
-  private HelixManager manager;
-  private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class);
-  private CheckpointManager checkpointManager;
-  // Map partition name to an aggregator dimension
-  static final Map<String, AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new HashMap<>();
-  // Ownership flags to be set by the State transitions
-  private final AtomicBoolean performsClusterAggregation = new AtomicBoolean(false);
-  private final AtomicBoolean performsHostAggregation = new AtomicBoolean(false);
-
-  public enum AGGREGATOR_NAME {
-    METRIC_RECORD_MINUTE,
-    METRIC_RECORD_HOURLY,
-    METRIC_RECORD_DAILY,
-    METRIC_AGGREGATE_SECOND,
-    METRIC_AGGREGATE_MINUTE,
-    METRIC_AGGREGATE_HOURLY,
-    METRIC_AGGREGATE_DAILY,
-  }
-
-  public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = new HashMap<>();
-
-  static {
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, "TimelineMetricHostAggregatorMinute");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, "TimelineMetricHostAggregatorHourly");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, "TimelineMetricHostAggregatorDaily");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, "TimelineClusterAggregatorSecond");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, "TimelineClusterAggregatorMinute");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, "TimelineClusterAggregatorHourly");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, "TimelineClusterAggregatorDaily");
-
-    // Partition name to task assignment
-    PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_0", CLUSTER);
-    PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_1", HOST);
-  }
-
-  public AggregationTaskRunner(String instanceName, String zkAddress) {
-    this.instanceName = instanceName;
-    this.zkAddress = zkAddress;
-  }
-
-  public void initialize() throws Exception {
-    manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName,
-      InstanceType.PARTICIPANT, zkAddress);
-
-    OnlineOfflineStateModelFactory stateModelFactory =
-      new OnlineOfflineStateModelFactory(instanceName, this);
-
-    StateMachineEngine stateMach = manager.getStateMachineEngine();
-    stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory);
-    manager.connect();
-
-    checkpointManager = new CheckpointManager(manager.getHelixPropertyStore());
-  }
-
-  public boolean performsClusterAggregation() {
-    return performsClusterAggregation.get();
-  }
-
-  public boolean performsHostAggregation() {
-    return performsHostAggregation.get();
-  }
-
-  public CheckpointManager getCheckpointManager() {
-    return checkpointManager;
-  }
-
-  public void setPartitionAggregationFunction(AGGREGATOR_TYPE type) {
-    switch (type) {
-      case HOST:
-        performsHostAggregation.set(true);
-        LOG.info("Set host aggregator function for : " + instanceName);
-        break;
-      case CLUSTER:
-        performsClusterAggregation.set(true);
-        LOG.info("Set cluster aggregator function for : " + instanceName);
-    }
-  }
-
-  public void unsetPartitionAggregationFunction(AGGREGATOR_TYPE type) {
-    switch (type) {
-      case HOST:
-        performsHostAggregation.set(false);
-        LOG.info("Unset host aggregator function for : " + instanceName);
-        break;
-      case CLUSTER:
-        performsClusterAggregation.set(false);
-        LOG.info("Unset cluster aggregator function for : " + instanceName);
-    }
-  }
-
-  /**
-   * Disconnect participant before controller shutdown
-   */
-  void stop() {
-    manager.disconnect();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
deleted file mode 100644
index 439102f..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.zookeeper.data.Stat;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
-
-public class CheckpointManager {
-  private final ZkHelixPropertyStore<ZNRecord> propertyStore;
-  private static final Log LOG = LogFactory.getLog(CheckpointManager.class);
-
-  static final String ZNODE_FIELD = "checkpoint";
-  static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS";
-
-  public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    this.propertyStore = propertyStore;
-  }
-
-  /**
-   * Read aggregator checkpoint from zookeeper
-   *
-   * @return timestamp
-   */
-  public long readCheckpoint(AGGREGATOR_NAME aggregatorName) {
-    String path = getCheckpointZKPath(aggregatorName);
-    LOG.debug("Reading checkpoint at " + path);
-    Stat stat = new Stat();
-    ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Stat => " + stat);
-    }
-    long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, -1) : -1;
-    LOG.debug("Checkpoint value = " + checkpoint);
-    return checkpoint;
-  }
-
-  /**
-   * Write aggregator checkpoint in zookeeper
-   *
-   * @param value timestamp
-   * @return sucsess
-   */
-  public boolean writeCheckpoint(AGGREGATOR_NAME aggregatorName, long value) {
-    String path = getCheckpointZKPath(aggregatorName);
-    LOG.debug(String.format("Saving checkpoint at %s with value %s", path, value));
-    return propertyStore.update(path, new CheckpointDataUpdater(path, value), AccessOption.PERSISTENT);
-  }
-
-  static class CheckpointDataUpdater implements DataUpdater<ZNRecord> {
-    final String path;
-    final Long value;
-
-    public CheckpointDataUpdater(String path, Long value) {
-      this.path = path;
-      this.value = value;
-    }
-
-    @Override
-    public ZNRecord update(ZNRecord currentData) {
-      if (currentData == null) {
-        currentData = new ZNRecord(path);
-      }
-      currentData.setLongField(ZNODE_FIELD, value);
-      return currentData;
-    }
-  }
-
-  String getCheckpointZKPath(AGGREGATOR_NAME aggregatorName) {
-    StringBuilder sb = new StringBuilder("/");
-    sb.append(CHECKPOINT_PATH_PREFIX);
-    sb.append("/");
-    sb.append(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
deleted file mode 100644
index 7d3350b..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES;
-
-public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
-  private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class);
-  private final String instanceName;
-  private final AggregationTaskRunner taskRunner;
-
-  public OnlineOfflineStateModelFactory(String instanceName, AggregationTaskRunner taskRunner) {
-    this.instanceName = instanceName;
-    this.taskRunner = taskRunner;
-  }
-
-  @Override
-  public StateModel createNewStateModel(String resourceName, String partition) {
-    LOG.info("Received request to process partition = " + partition + ", for " +
-      "resource = " + resourceName + ", at " + instanceName);
-    return new OnlineOfflineStateModel();
-  }
-
-  public class OnlineOfflineStateModel extends StateModel {
-    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
-      String partitionName = message.getPartitionName();
-      LOG.info("Received transition to Online from Offline for partition: " + partitionName);
-      AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
-      taskRunner.setPartitionAggregationFunction(type);
-    }
-
-    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
-      String partitionName = message.getPartitionName();
-      LOG.info("Received transition to Offline from Online for partition: " + partitionName);
-      AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
-      taskRunner.unsetPartitionAggregationFunction(type);
-    }
-
-    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
-      String partitionName = message.getPartitionName();
-      LOG.info("Received transition to Dropped from Offline for partition: " + partitionName);
-      AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
-      taskRunner.unsetPartitionAggregationFunction(type);
-    }
-  }
-}
\ No newline at end of file


Mime
View raw message