ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swa...@apache.org
Subject ambari git commit: AMBARI-12983. Optimize aggregator queries by performing GROUP BY on server. (swagle)
Date Thu, 03 Sep 2015 17:31:26 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk c2d4b1db5 -> 3e0b8f07c


AMBARI-12983. Optimize aggregator queries by performing GROUP BY on server. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3e0b8f07
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3e0b8f07
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3e0b8f07

Branch: refs/heads/trunk
Commit: 3e0b8f07c388ad2bd67ef23b6dc313f3fef4d117
Parents: c2d4b1d
Author: Siddharth Wagle <swagle@hortonworks.com>
Authored: Thu Sep 3 10:31:22 2015 -0700
Committer: Siddharth Wagle <swagle@hortonworks.com>
Committed: Thu Sep 3 10:31:22 2015 -0700

----------------------------------------------------------------------
 .../conf/unix/ambari-metrics-collector          |   2 +-
 .../ambari-metrics-timelineservice/pom.xml      |   2 +-
 .../ApplicationHistoryClientService.java        |   4 +-
 .../timeline/HBaseTimelineMetricStore.java      | 107 +++++++-------
 .../timeline/TimelineMetricConfiguration.java   |  13 ++
 .../aggregators/AbstractTimelineAggregator.java |  16 ++-
 .../TimelineMetricAggregatorFactory.java        |  72 +++++++++-
 .../v2/TimelineMetricClusterAggregator.java     | 100 +++++++++++++
 .../v2/TimelineMetricHostAggregator.java        |  73 ++++++++++
 .../metrics/timeline/query/Condition.java       |   1 +
 .../timeline/query/DefaultCondition.java        |   5 +
 .../metrics/timeline/query/EmptyCondition.java  | 139 +++++++++++++++++++
 .../timeline/query/PhoenixTransactSQL.java      | 121 +++++++++-------
 .../query/SplitByMetricNamesCondition.java      |   5 +
 .../metrics/timeline/ITClusterAggregator.java   |  85 ++++++++++--
 .../metrics/timeline/ITMetricAggregator.java    |  84 ++++++++++-
 .../server/configuration/Configuration.java     |   2 +-
 .../AmbariManagementControllerImpl.java         |   2 +-
 .../0.1.0/configuration/ams-hbase-site.xml      |  21 ++-
 .../0.1.0/configuration/ams-site.xml            |   8 ++
 20 files changed, 736 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector b/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
index bdf00b5..fe73ff1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
+++ b/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
@@ -176,7 +176,7 @@ function start()
     rm -f "${PIDFILE}" >/dev/null 2>&1
   fi
 
-  nohup "${JAVA}" "-Xmx$AMS_COLLECTOR_HEAPSIZE" "${AMS_COLLECTOR_OPTS}" "-cp" "/usr/lib/ambari-metrics-collector/*:${COLLECTOR_CONF_DIR}" "-Djava.net.preferIPv4Stack=true" "-Dams.log.dir=${AMS_COLLECTOR_LOG_DIR}" "-Dproc_${DAEMON_NAME}" "${CLASS}" "$@" > $OUTFILE 2>&1 &
+  nohup "${JAVA}" "-Xms$AMS_COLLECTOR_HEAPSIZE" "-Xmx$AMS_COLLECTOR_HEAPSIZE" "${AMS_COLLECTOR_OPTS}" "-cp" "/usr/lib/ambari-metrics-collector/*:${COLLECTOR_CONF_DIR}" "-Djava.net.preferIPv4Stack=true" "-Dams.log.dir=${AMS_COLLECTOR_LOG_DIR}" "-Dproc_${DAEMON_NAME}" "${CLASS}" "$@" > $OUTFILE 2>&1 &
   PID=$!
   write_pidfile "${PIDFILE}"
   sleep 2

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index a630fcc..705ea0f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -35,7 +35,7 @@
     <!--<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>-->
     <protobuf.version>2.5.0</protobuf.version>
     <hadoop.version>(2.6.0.2.2.0.0, 2.6.0.2.2.1.0)</hadoop.version>
-    <phoenix.version>4.2.0.2.2.0.0-2041</phoenix.version>
+    <phoenix.version>4.2.0.2.2.1.0-2340</phoenix.version>
     <hbase.version>0.98.4.2.2.0.0-2041-hadoop2</hbase.version>
   </properties>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
index 8a37a57..a12e373 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
@@ -90,9 +90,7 @@ public class ApplicationHistoryClientService extends AbstractService {
 
     server =
         rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler,
-          address, conf, null, conf.getInt(
-            YarnConfiguration.TIMELINE_SERVICE_HANDLER_THREAD_COUNT,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT));
+          address, conf, null, metricConfiguration.getTimelineMetricsServiceHandlerThreadCount());
 
     server.start();
     this.bindAddress =

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index c615804..9c0b94d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -40,12 +40,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-public class HBaseTimelineMetricStore extends AbstractService
-    implements TimelineMetricStore {
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+
+public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore {
 
   static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class);
   private final TimelineMetricConfiguration configuration;
   private PhoenixHBaseAccessor hBaseAccessor;
+  private static volatile boolean isInitialized = false;
 
   /**
    * Construct the service.
@@ -62,58 +64,67 @@ public class HBaseTimelineMetricStore extends AbstractService
     initializeSubsystem(configuration.getHbaseConf(), configuration.getMetricsConf());
   }
 
-  private void initializeSubsystem(Configuration hbaseConf,
-                                   Configuration metricsConf) {
-    hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
-    hBaseAccessor.initMetricSchema();
-
-    // Start the cluster aggregator minute
-    TimelineMetricAggregator minuteClusterAggregator =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
-    if (!minuteClusterAggregator.isDisabled()) {
-      Thread aggregatorThread = new Thread(minuteClusterAggregator);
-      aggregatorThread.start();
-    }
+  private synchronized void initializeSubsystem(Configuration hbaseConf,
+                                                Configuration metricsConf) {
+    if (!isInitialized) {
+      hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
+      hBaseAccessor.initMetricSchema();
 
-    // Start the hourly cluster aggregator
-    TimelineMetricAggregator hourlyClusterAggregator =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
-    if (!hourlyClusterAggregator.isDisabled()) {
-      Thread aggregatorThread = new Thread(hourlyClusterAggregator);
-      aggregatorThread.start();
-    }
+      if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
+        LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
+      }
 
-    // Start the daily cluster aggregator
-    TimelineMetricAggregator dailyClusterAggregator =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
-    if (!dailyClusterAggregator.isDisabled()) {
-      Thread aggregatorThread = new Thread(dailyClusterAggregator);
-      aggregatorThread.start();
-    }
+      // Start the cluster aggregator minute
+      TimelineMetricAggregator minuteClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
+      if (!minuteClusterAggregator.isDisabled()) {
+        Thread aggregatorThread = new Thread(minuteClusterAggregator);
+        aggregatorThread.start();
+      }
 
-    // Start the minute host aggregator
-    TimelineMetricAggregator minuteHostAggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
-    if (!minuteHostAggregator.isDisabled()) {
-      Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
-      minuteAggregatorThread.start();
-    }
+      // Start the hourly cluster aggregator
+      TimelineMetricAggregator hourlyClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
+      if (!hourlyClusterAggregator.isDisabled()) {
+        Thread aggregatorThread = new Thread(hourlyClusterAggregator);
+        aggregatorThread.start();
+      }
 
-    // Start the hourly host aggregator
-    TimelineMetricAggregator hourlyHostAggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
-    if (!hourlyHostAggregator.isDisabled()) {
-      Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
-      aggregatorHourlyThread.start();
-    }
+      // Start the daily cluster aggregator
+      TimelineMetricAggregator dailyClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
+      if (!dailyClusterAggregator.isDisabled()) {
+        Thread aggregatorThread = new Thread(dailyClusterAggregator);
+        aggregatorThread.start();
+      }
+
+      // Start the minute host aggregator
+      TimelineMetricAggregator minuteHostAggregator =
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
+      if (!minuteHostAggregator.isDisabled()) {
+        Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
+        minuteAggregatorThread.start();
+      }
+
+      // Start the hourly host aggregator
+      TimelineMetricAggregator hourlyHostAggregator =
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
+      if (!hourlyHostAggregator.isDisabled()) {
+        Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
+        aggregatorHourlyThread.start();
+      }
+
+      // Start the daily host aggregator
+      TimelineMetricAggregator dailyHostAggregator =
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
+      if (!dailyHostAggregator.isDisabled()) {
+        Thread aggregatorDailyThread = new Thread(dailyHostAggregator);
+        aggregatorDailyThread.start();
+      }
 
-    // Start the daily host aggregator
-    TimelineMetricAggregator dailyHostAggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
-    if (!dailyHostAggregator.isDisabled()) {
-      Thread aggregatorDailyThread = new Thread(dailyHostAggregator);
-      aggregatorDailyThread.start();
+      isInitialized = true;
     }
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index d4f919e..d70d77d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -162,6 +162,12 @@ public class TimelineMetricConfiguration {
   public static final String OUT_OFF_BAND_DATA_TIME_ALLOWANCE =
     "timeline.metrics.service.outofband.time.allowance.millis";
 
+  public static final String USE_GROUPBY_AGGREGATOR_QUERIES =
+    "timeline.metrics.service.use.groupBy.aggregators";
+
+  public static final String HANDLER_THREAD_COUNT =
+    "timeline.metrics.service.handler.thread.count";
+
   public static final String HOST_APP_ID = "HOST";
 
   private Configuration hbaseConf;
@@ -217,6 +223,13 @@ public class TimelineMetricConfiguration {
     return defaultHttpAddress;
   }
 
+  public int getTimelineMetricsServiceHandlerThreadCount() {
+    if (metricsConf != null) {
+      return Integer.parseInt(metricsConf.get(HANDLER_THREAD_COUNT, "20"));
+    }
+    return 20;
+  }
+
   public String getTimelineServiceRpcAddress() {
     String defaultRpcAddress = "0.0.0.0:60200";
     if (metricsConf != null) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index 37fb088..8bdddf2 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -122,8 +122,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
         + " seconds.");
 
       long startTime = clock.getTime();
-      boolean success = doWork(lastCheckPointTime,
-        lastCheckPointTime + SLEEP_INTERVAL);
+      boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
       long executionTime = clock.getTime() - startTime;
       long delta = SLEEP_INTERVAL - executionTime;
 
@@ -242,16 +241,19 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
       stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
 
       LOG.debug("Query issued @: " + new Date());
-      rs = stmt.executeQuery();
+      if (condition.doUpdate()) {
+        int rows = stmt.executeUpdate();
+        conn.commit();
+        LOG.info(rows + " row(s) updated.");
+      } else {
+        rs = stmt.executeQuery();
+      }
       LOG.debug("Query returned @: " + new Date());
 
       aggregate(rs, startTime, endTime);
       LOG.info("End aggregation cycle @ " + new Date());
 
-    } catch (SQLException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } catch (IOException e) {
+    } catch (SQLException | IOException e) {
       LOG.error("Exception during aggregating metrics.", e);
       success = false;
     } finally {

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index 642fcfe..f07918c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -43,6 +43,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
@@ -53,7 +54,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 
 /**
  * Factory class that knows how to create a aggregator instance using
- * @TimelineMetricConfiguration
+ * TimelineMetricConfiguration
  */
 public class TimelineMetricAggregatorFactory {
   private static final String HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE =
@@ -69,6 +70,10 @@ public class TimelineMetricAggregatorFactory {
   private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE =
     "timeline-metrics-cluster-aggregator-daily-checkpoint";
 
+  private static boolean useGroupByAggregator(Configuration metricsConf) {
+    return Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"));
+  }
+
   /**
    * Minute based aggregation for hosts.
    */
@@ -89,6 +94,19 @@ public class TimelineMetricAggregatorFactory {
     String inputTableName = METRICS_RECORD_TABLE_NAME;
     String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
 
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        hostAggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l
+      );
+    }
+
     return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -119,6 +137,19 @@ public class TimelineMetricAggregatorFactory {
     String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
     String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        hostAggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        3600000l
+      );
+    }
+
     return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -149,6 +180,19 @@ public class TimelineMetricAggregatorFactory {
     String inputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
     String outputTableName = METRICS_AGGREGATE_DAILY_TABLE_NAME;
 
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        hostAggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        3600000l
+      );
+    }
+
     return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -220,6 +264,19 @@ public class TimelineMetricAggregatorFactory {
     String outputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
     String aggregatorDisabledParam = CLUSTER_AGGREGATOR_HOUR_DISABLED;
 
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l
+      );
+    }
+
     return new TimelineMetricClusterAggregator(
       hBaseAccessor, metricsConf,
       checkpointLocation,
@@ -254,6 +311,19 @@ public class TimelineMetricAggregatorFactory {
     String outputTableName = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
     String aggregatorDisabledParam = CLUSTER_AGGREGATOR_DAILY_DISABLED;
 
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l
+      );
+    }
+
     return new TimelineMetricClusterAggregator(
       hBaseAccessor, metricsConf,
       checkpointLocation,

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..ca95206
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
+  private final String aggregateColumnName;
+
+  public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                         Configuration metricsConf,
+                                         String checkpointLocation,
+                                         Long sleepIntervalMillis,
+                                         Integer checkpointCutOffMultiplier,
+                                         String hostAggregatorDisabledParam,
+                                         String inputTableName,
+                                         String outputTableName,
+                                         Long nativeTimeRangeDelay) {
+    super(hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam, inputTableName, outputTableName,
+      nativeTimeRangeDelay);
+
+    if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
+      aggregateColumnName = "HOSTS_COUNT";
+    } else {
+      aggregateColumnName = "METRIC_COUNT";
+    }
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    EmptyCondition condition = new EmptyCondition();
+    condition.setDoUpdate(true);
+
+    /*
+    UPSERT INTO METRIC_AGGREGATE_HOURLY (METRIC_NAME, APP_ID, INSTANCE_ID,
+    SERVER_TIME, UNITS, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN)
+    SELECT METRIC_NAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS,
+    SUM(METRIC_SUM), SUM(HOSTS_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN)
+    FROM METRIC_AGGREGATE WHERE SERVER_TIME >= 1441155600000 AND
+    SERVER_TIME < 1441159200000 GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS;
+     */
+
+    condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
+      outputTableName, aggregateColumnName, tableName,
+      startTime, endTime));
+
+    return condition;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+    LOG.info("Aggregated cluster metrics for " + outputTableName +
+      ", with startTime = " + new Date(startTime) +
+      ", endTime = " + new Date(endTime));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
new file mode 100644
index 0000000..1e1712f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL;
+
+public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
+
+  public TimelineMetricHostAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                      Configuration metricsConf,
+                                      String checkpointLocation,
+                                      Long sleepIntervalMillis,
+                                      Integer checkpointCutOffMultiplier,
+                                      String hostAggregatorDisabledParam,
+                                      String tableName,
+                                      String outputTableName,
+                                      Long nativeTimeRangeDelay) {
+    super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
+      checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName,
+      outputTableName, nativeTimeRangeDelay);
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+
+    LOG.info("Aggregated host metrics for " + outputTableName +
+      ", with startTime = " + new Date(startTime) +
+      ", endTime = " + new Date(endTime));
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    EmptyCondition condition = new EmptyCondition();
+    condition.setDoUpdate(true);
+
+    condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
+      outputTableName, tableName, startTime, endTime));
+
+    return condition;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
index e0cb3d0..06a3d79 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
@@ -43,4 +43,5 @@ public interface Condition {
   void setFetchSize(Integer fetchSize);
   void addOrderByColumn(String column);
   void setNoLimit();
+  boolean doUpdate();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
index 462c8d9..98af2aa 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
@@ -199,6 +199,11 @@ public class DefaultCondition implements Condition {
     this.noLimit = true;
   }
 
+  @Override
+  public boolean doUpdate() {
+    return false;
+  }
+
   public Integer getLimit() {
     if (noLimit) {
       return null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
new file mode 100644
index 0000000..cf4395f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.List;
+
+/**
+ * Encapsulate a Condition with pre-formatted and pre-parsed query string.
+ */
+public class EmptyCondition implements Condition {
+  String statement;
+  boolean doUpdate = false;
+
+  @Override
+  public boolean isEmpty() {
+    return false;
+  }
+
+  @Override
+  public List<String> getMetricNames() {
+    return null;
+  }
+
+  @Override
+  public boolean isPointInTime() {
+    return false;
+  }
+
+  @Override
+  public boolean isGrouped() {
+    return true;
+  }
+
+  @Override
+  public void setStatement(String statement) {
+    this.statement = statement;
+  }
+
+  @Override
+  public List<String> getHostnames() {
+    return null;
+  }
+
+  @Override
+  public Precision getPrecision() {
+    return null;
+  }
+
+  @Override
+  public void setPrecision(Precision precision) {
+
+  }
+
+  @Override
+  public String getAppId() {
+    return null;
+  }
+
+  @Override
+  public String getInstanceId() {
+    return null;
+  }
+
+  @Override
+  public StringBuilder getConditionClause() {
+    return null;
+  }
+
+  @Override
+  public String getOrderByClause(boolean asc) {
+    return null;
+  }
+
+  @Override
+  public String getStatement() {
+    return statement;
+  }
+
+  @Override
+  public Long getStartTime() {
+    return null;
+  }
+
+  @Override
+  public Long getEndTime() {
+    return null;
+  }
+
+  @Override
+  public Integer getLimit() {
+    return null;
+  }
+
+  @Override
+  public Integer getFetchSize() {
+    return null;
+  }
+
+  @Override
+  public void setFetchSize(Integer fetchSize) {
+
+  }
+
+  @Override
+  public void addOrderByColumn(String column) {
+
+  }
+
+  @Override
+  public void setNoLimit() {
+
+  }
+
+  public void setDoUpdate(boolean doUpdate) {
+    this.doUpdate = doUpdate;
+  }
+
+  @Override
+  public boolean doUpdate() {
+    return doUpdate;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index ef0f4ce..77652dc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -56,50 +56,50 @@ public class PhoenixTransactSQL {
 
   public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS %s " +
-      "(METRIC_NAME VARCHAR, " +
-      "HOSTNAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE," +
-      "METRIC_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE," +
-      "METRIC_MIN DOUBLE CONSTRAINT pk " +
-      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
-      " COMPRESSION='%s'";
+    "(METRIC_NAME VARCHAR, " +
+    "HOSTNAME VARCHAR, " +
+    "APP_ID VARCHAR, " +
+    "INSTANCE_ID VARCHAR, " +
+    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+    "UNITS CHAR(20), " +
+    "METRIC_SUM DOUBLE," +
+    "METRIC_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE," +
+    "METRIC_MIN DOUBLE CONSTRAINT pk " +
+    "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+    "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+    " COMPRESSION='%s'";
 
   public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS %s " +
-      "(METRIC_NAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE, " +
-      "HOSTS_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE, " +
-      "METRIC_MIN DOUBLE " +
-      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-      "TTL=%s, COMPRESSION='%s'";
+    "(METRIC_NAME VARCHAR, " +
+    "APP_ID VARCHAR, " +
+    "INSTANCE_ID VARCHAR, " +
+    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+    "UNITS CHAR(20), " +
+    "METRIC_SUM DOUBLE, " +
+    "HOSTS_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE, " +
+    "METRIC_MIN DOUBLE " +
+    "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+    "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+    "TTL=%s, COMPRESSION='%s'";
 
   // HOSTS_COUNT vs METRIC_COUNT
   public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS %s " +
-      "(METRIC_NAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE, " +
-      "METRIC_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE, " +
-      "METRIC_MIN DOUBLE " +
-      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-      "TTL=%s, COMPRESSION='%s'";
+    "(METRIC_NAME VARCHAR, " +
+    "APP_ID VARCHAR, " +
+    "INSTANCE_ID VARCHAR, " +
+    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+    "UNITS CHAR(20), " +
+    "METRIC_SUM DOUBLE, " +
+    "METRIC_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE, " +
+    "METRIC_MIN DOUBLE " +
+    "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+    "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+    "TTL=%s, COMPRESSION='%s'";
 
   /**
    * ALTER table to set new options
@@ -234,6 +234,29 @@ public class PhoenixTransactSQL {
     "METRIC_MIN " +
     "FROM %s";
 
+  /**
+   * Aggregate host metrics using a GROUP BY clause to take advantage of
+   * N - way parallel scan where N = number of regions.
+   */
+  public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT %s " +
+    "INTO %s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " +
+    "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
+    "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS, " +
+    "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
+    "FROM %s WHERE SERVER_TIME >= %s AND SERVER_TIME < %s " +
+    "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS";
+
+  /**
+   * Aggregate app metrics using a GROUP BY clause to take advantage of
+   * N - way parallel scan where N = number of regions.
+   */
+  public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT %s " +
+    "INTO %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " +
+    "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT METRIC_NAME, APP_ID, " +
+    "INSTANCE_ID, MAX(SERVER_TIME), UNITS, SUM(METRIC_SUM), SUM(%s), " +
+    "MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE SERVER_TIME >= %s AND " +
+    "SERVER_TIME < %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS";
+
   public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
   public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
     "METRIC_RECORD_MINUTE";
@@ -263,8 +286,8 @@ public class PhoenixTransactSQL {
     return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
   }
 
-  public static PreparedStatement prepareGetMetricsSqlStmt(
-    Connection connection, Condition condition) throws SQLException {
+  public static PreparedStatement prepareGetMetricsSqlStmt(Connection connection,
+      Condition condition) throws SQLException {
 
     validateConditionIsNotEmpty(condition);
     validateRowCountLimit(condition);
@@ -323,15 +346,18 @@ public class PhoenixTransactSQL {
     }
 
     StringBuilder sb = new StringBuilder(stmtStr);
-    sb.append(" WHERE ");
-    sb.append(condition.getConditionClause());
-    String orderByClause = condition.getOrderByClause(true);
 
-    if (orderByClause != null) {
-      sb.append(orderByClause);
-    } else {
-      sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
+    if (!(condition instanceof EmptyCondition)) {
+      sb.append(" WHERE ");
+      sb.append(condition.getConditionClause());
+      String orderByClause = condition.getOrderByClause(true);
+      if (orderByClause != null) {
+        sb.append(orderByClause);
+      } else {
+        sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
+      }
     }
+
     if (condition.getLimit() != null) {
       sb.append(" LIMIT ").append(condition.getLimit());
     }
@@ -339,6 +365,7 @@ public class PhoenixTransactSQL {
     if (LOG.isDebugEnabled()) {
       LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
     }
+
     PreparedStatement stmt = null;
     try {
       stmt = connection.prepareStatement(sb.toString());
@@ -404,7 +431,7 @@ public class PhoenixTransactSQL {
 
   private static void validateRowCountLimit(Condition condition) {
     if (condition.getMetricNames() == null
-      || condition.getMetricNames().isEmpty() ) {
+        || condition.getMetricNames().isEmpty() ) {
       //aggregator can use empty metrics query
       return;
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
index c8b8709..969215b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
@@ -168,6 +168,11 @@ public class SplitByMetricNamesCondition implements Condition {
     adaptee.setNoLimit();
   }
 
+  @Override
+  public boolean doUpdate() {
+    return false;
+  }
+
   public List<String> getOriginalMetricNames() {
     return adaptee.getMetricNames();
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
index 13fa348..b7b1737 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -64,6 +66,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
 
   @Before
   public void setUp() throws Exception {
+    Logger.getLogger("org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline").setLevel(Level.DEBUG);
     hdb = createTestableHBaseAccessor();
     // inits connection, starts mini cluster
     conn = getConnection(getUrl());
@@ -87,11 +90,17 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     conn.close();
   }
 
+  private Configuration getConfigurationForTest(boolean useGroupByAggregators) {
+    Configuration configuration = new Configuration();
+    configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators));
+    return configuration;
+  }
+
   @Test
   public void testShouldAggregateClusterProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -143,7 +152,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateClusterIgnoringInstance() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -218,7 +227,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // here we put some metrics tha will be aggregated
@@ -282,7 +291,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testAggregateDailyClusterMetrics() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false));
 
     // this time can be virtualized! or made independent from real clock
     long startTime = System.currentTimeMillis();
@@ -327,7 +336,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateClusterOnHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false));
 
     // this time can be virtualized! or made independent from real clock
     long startTime = System.currentTimeMillis();
@@ -371,7 +380,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false));
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -431,7 +440,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
 
   @Test
   public void testAppLevelHostMetricAggregates() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = getConfigurationForTest(false);
     conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, conf);
@@ -485,7 +494,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   @Test
   public void testClusterAggregateMetricNormalization() throws Exception {
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // Sample data
@@ -558,6 +567,66 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     Assert.assertEquals(9, recordCount);
   }
 
+  @Test
+  public void testAggregationUsingGroupByQuery() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true));
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    hdb.saveClusterAggregateRecords(records);
+
+    // WHEN
+    agg.doWork(startTime, ctime + minute);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+    int count = 0;
+    while (rs.next()) {
+      if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
+        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+        assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
+        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
+      }
+      count++;
+    }
+    assertEquals("Two hourly aggregated row expected ", 2, count);
+  }
+
   private ResultSet executeQuery(String query) throws SQLException {
     Connection conn = getConnection(getUrl());
     Statement stmt = conn.createStatement();

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
index b480b7a..a3640d0 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,6 +60,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
 
   @Before
   public void setUp() throws Exception {
+    Logger.getLogger("org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline").setLevel(Level.DEBUG);
     hdb = createTestableHBaseAccessor();
     // inits connection, starts mini cluster
     conn = getConnection(getUrl());
@@ -105,11 +108,18 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
       .containsExactlyElementsOf(recordRead.getMetrics());
   }
 
+  private Configuration getConfigurationForTest(boolean useGroupByAggregators) {
+    Configuration configuration = new Configuration();
+    configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators));
+    return configuration;
+  }
+
   @Test
   public void testShouldAggregateMinuteProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator aggregatorMinute =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
+        getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -132,8 +142,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       METRICS_AGGREGATE_MINUTE_TABLE_NAME));
 
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
     ResultSet rs = pstmt.executeQuery();
     MetricHostAggregate expectedAggregate =
       MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
@@ -170,7 +179,8 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
    public void testShouldAggregateHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator aggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb,
+        getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
     long startTime = System.currentTimeMillis();
 
@@ -209,8 +219,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       METRICS_AGGREGATE_HOURLY_TABLE_NAME));
 
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
     ResultSet rs = pstmt.executeQuery();
 
     while (rs.next()) {
@@ -233,7 +242,8 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
   public void testMetricAggregateDaily() throws Exception {
     // GIVEN
     TimelineMetricAggregator aggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb,
+        getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
     long startTime = System.currentTimeMillis();
 
@@ -290,6 +300,66 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     }
   }
 
+  @Test
+  public void testAggregationUsingGroupByQuery() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregatorMinute =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
+        getConfigurationForTest(true));
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+
+    long endTime = startTime + 1000 * 60 * 4;
+    boolean success = aggregatorMinute.doWork(startTime, endTime);
+    assertTrue(success);
+
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        count++;
+      } else if ("mem_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        count++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+    assertEquals("Two aggregated entries expected", 2, count);
+  }
+
   private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
     new Comparator<TimelineMetric>() {
       @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 06e153b..813b3d9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -418,7 +418,7 @@ public class Configuration {
   private static final String TIMELINE_METRICS_CACHE_TTL = "server.timeline.metrics.cache.entry.ttl.seconds";
   private static final String DEFAULT_TIMELINE_METRICS_CACHE_TTL = "3600";
   private static final String TIMELINE_METRICS_CACHE_IDLE_TIME = "server.timeline.metrics.cache.entry.idle.seconds";
-  private static final String DEFAULT_TIMELINE_METRICS_CACHE_IDLE_TIME = "300";
+  private static final String DEFAULT_TIMELINE_METRICS_CACHE_IDLE_TIME = "1800";
   private static final String TIMELINE_METRICS_REQUEST_READ_TIMEOUT = "server.timeline.metrics.cache.read.timeout.millis";
   private static final String DEFAULT_TIMELINE_METRICS_REQUEST_READ_TIMEOUT = "10000";
   private static final String TIMELINE_METRICS_REQUEST_INTERVAL_READ_TIMEOUT = "server.timeline.metrics.cache.interval.read.timeout.millis";

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 6d98c01..859b361 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -4209,7 +4209,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
       widgetEntity.setProperties(gson.toJson(layoutInfo.getProperties()));
       widgetEntity.setWidgetValues(gson.toJson(layoutInfo.getValues()));
       widgetEntity.setListWidgetLayoutUserWidgetEntity(new LinkedList<WidgetLayoutUserWidgetEntity>());
-      LOG.debug("Creating cluster widget with: name = " +
+      LOG.info("Creating cluster widget with: name = " +
         layoutInfo.getWidgetName() + ", type = " + layoutInfo.getType() + ", " +
         "cluster = " + clusterEntity.getClusterName());
       // Persisting not visible widgets

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-hbase-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-hbase-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-hbase-site.xml
index 2d11412..b2cfccd 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-hbase-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-hbase-site.xml
@@ -109,7 +109,7 @@
   </property>
   <property>
     <name>phoenix.query.spoolThresholdBytes</name>
-    <value>12582912</value>
+    <value>20971520</value>
     <description>
       Threshold size in bytes after which results from parallelly executed
       query results are spooled to disk. Default is 20 mb.
@@ -293,4 +293,23 @@
       different mount point from the one for hbase.rootdir in embedded mode.
     </description>
   </property>
+  <property>
+    <name>phoenix.mutate.batchSize</name>
+    <value>10000</value>
+    <description>
+      The number of rows that are batched together and automatically committed
+      during the execution of an UPSERT SELECT or DELETE statement.
+      This affects performance of group by aggregators if they are being used.
+    </description>
+  </property>
+  <property>
+    <name>phoenix.query.rowKeyOrderSaltedTable</name>
+    <value>true</value>
+    <description>
+      When set, we disallow user specified split points on salted table to ensure
+      that each bucket will only contains entries with the same salt byte.
+      When this property is turned on, the salted table would behave just like
+      a normal table and would return items in rowkey order for scans
+    </description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3e0b8f07/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index c716bea..fa30e19 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -318,4 +318,12 @@
       an application. Example: bytes_read across Yarn Nodemanagers.
     </description>
   </property>
+  <property>
+    <name>timeline.metrics.service.use.groupBy.aggregators</name>
+    <value>true</value>
+    <description>
+      Use a groupBy aggregated query to perform host level aggregations vs
+      in-memory aggregations.
+    </description>
+  </property>
 </configuration>


Mime
View raw message