ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject ambari git commit: AMBARI-16766 Implement in ams collector batch insert operations to ams-hbase (dsen)
Date Mon, 23 May 2016 15:29:29 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 476f943e9 -> 7dfce9b11


AMBARI-16766 Implement in ams collector batch insert operations to ams-hbase (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7dfce9b1
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7dfce9b1
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7dfce9b1

Branch: refs/heads/branch-2.4
Commit: 7dfce9b11c60f8ba1f9dbffacd57c07fca89f82b
Parents: 476f943
Author: Dmytro Sen <dsen@apache.org>
Authored: Mon May 23 18:26:58 2016 +0300
Committer: Dmytro Sen <dsen@apache.org>
Committed: Mon May 23 18:29:20 2016 +0300

----------------------------------------------------------------------
 .../timeline/HBaseTimelineMetricStore.java      |   2 +-
 .../timeline/MetricsCacheCommitterThread.java   |  38 ++++
 .../metrics/timeline/PhoenixHBaseAccessor.java  | 223 ++++++++++++-------
 .../timeline/TimelineMetricConfiguration.java   |   9 +
 .../timeline/PhoenixHBaseAccessorTest.java      |  39 +++-
 .../timeline/discovery/TestMetadataManager.java |   2 +-
 .../0.1.0/configuration/ams-site.xml            |  24 ++
 .../0.1.0/package/scripts/service_check.py      |  78 ++++---
 8 files changed, 303 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7dfce9b1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 83aa36b..8b8b796 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -285,7 +285,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements
Timelin
     // Error indicated by the Sql exception
     TimelinePutResponse response = new TimelinePutResponse();
 
-    hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics);
+    hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
 
     return response;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7dfce9b1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsCacheCommitterThread.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsCacheCommitterThread.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsCacheCommitterThread.java
new file mode 100644
index 0000000..d858b84
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsCacheCommitterThread.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class MetricsCacheCommitterThread implements Runnable {
+
+    private static final Log LOG = LogFactory.getLog(MetricsCacheCommitterThread.class);
+    private static PhoenixHBaseAccessor phoenixHBaseAccessor;
+
+    public MetricsCacheCommitterThread(PhoenixHBaseAccessor phoenixHBaseAccessor) {
+        this.phoenixHBaseAccessor = phoenixHBaseAccessor;
+    }
+    @Override
+    public void run() {
+        LOG.debug("Checking if metrics cache is empty");
+        if (!phoenixHBaseAccessor.isInsertCacheEmpty()) {
+            phoenixHBaseAccessor.commitMetricsFromCache();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/7dfce9b1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 52ab083..47962cb 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -60,14 +60,20 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -91,6 +97,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
@@ -149,6 +158,12 @@ public class PhoenixHBaseAccessor {
   private final RetryCounterFactory retryCounterFactory;
   private final PhoenixConnectionProvider dataSource;
   private final long outOfBandTimeAllowance;
+  private final int cacheSize;
+  private final boolean cacheEnabled;
+  private final BlockingQueue<TimelineMetrics> insertCache;
+  private ScheduledExecutorService scheduledExecutorService;
+  private MetricsCacheCommitterThread metricsCommiterThread;
+  private final int cacheCommitInterval;
   private final boolean skipBlockCacheForAggregatorsEnabled;
   private final String timelineMetricsTablesDurability;
 
@@ -182,9 +197,13 @@ public class PhoenixHBaseAccessor {
     }
     this.dataSource = dataSource;
     this.retryCounterFactory = new RetryCounterFactory(metricsConf.getInt(GLOBAL_MAX_RETRIES,
10),
-      (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5)));
+      (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 3)));
     this.outOfBandTimeAllowance = metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE,
       DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE);
+    this.cacheEnabled = Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_ENABLED, "true"));
+    this.cacheSize = Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_SIZE, "150"));
+    this.cacheCommitInterval = Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL,
"3"));
+    this.insertCache = new ArrayBlockingQueue<TimelineMetrics>(cacheSize);
     this.skipBlockCacheForAggregatorsEnabled = metricsConf.getBoolean(AGGREGATORS_SKIP_BLOCK_CACHE,
false);
     this.timelineMetricsTablesDurability = metricsConf.get(TIMELINE_METRICS_TABLES_DURABILITY,
"");
 
@@ -197,6 +216,107 @@ public class PhoenixHBaseAccessor {
     tableTTL.put(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, metricsConf.get(CLUSTER_MINUTE_TABLE_TTL,
String.valueOf(30 * 86400))); //30 days
     tableTTL.put(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, metricsConf.get(CLUSTER_HOUR_TABLE_TTL,
String.valueOf(365 * 86400))); //1 year
     tableTTL.put(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, metricsConf.get(CLUSTER_DAILY_TABLE_TTL,
String.valueOf(730 * 86400))); //2 years
+
+    if (cacheEnabled) {
+      LOG.debug("Initialising and starting metrics cache committer thread...");
+      metricsCommiterThread = new MetricsCacheCommitterThread(this);
+      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+      scheduledExecutorService.scheduleWithFixedDelay(metricsCommiterThread, 0, cacheCommitInterval,
TimeUnit.SECONDS);
+    }
+  }
+
+  public boolean isInsertCacheEmpty() {
+    return insertCache.isEmpty();
+  }
+
+  public void commitMetricsFromCache() {
+    LOG.debug("Clearing metrics cache");
+    List<TimelineMetrics> metricsArray = new ArrayList<TimelineMetrics>(insertCache.size());
+    while (!insertCache.isEmpty()) {
+      metricsArray.add(insertCache.poll());
+    }
+    if (metricsArray.size() > 0) {
+      commitMetrics(metricsArray);
+    }
+  }
+
+  public void commitMetrics(TimelineMetrics timelineMetrics) {
+    commitMetrics(Collections.singletonList(timelineMetrics));
+  }
+
+  public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection)
{
+    LOG.debug("Committing metrics to store");
+    Connection conn = null;
+    PreparedStatement metricRecordStmt = null;
+    long currentTime = System.currentTimeMillis();
+
+    try {
+      conn = getConnection();
+      metricRecordStmt = conn.prepareStatement(String.format(
+              UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
+      for (TimelineMetrics timelineMetrics : timelineMetricsCollection) {
+        for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+          if (Math.abs(currentTime - metric.getStartTime()) > outOfBandTimeAllowance)
{
+            // If timeseries start time is way in the past : discard
+            LOG.debug("Discarding out of band timeseries, currentTime = "
+                    + currentTime + ", startTime = " + metric.getStartTime()
+                    + ", hostname = " + metric.getHostName());
+            continue;
+          }
+
+          metricRecordStmt.clearParameters();
+
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("host: " + metric.getHostName() + ", " +
+                    "metricName = " + metric.getMetricName() + ", " +
+                    "values: " + metric.getMetricValues());
+          }
+          double[] aggregates = AggregatorUtils.calculateAggregates(
+                  metric.getMetricValues());
+
+          metricRecordStmt.setString(1, metric.getMetricName());
+          metricRecordStmt.setString(2, metric.getHostName());
+          metricRecordStmt.setString(3, metric.getAppId());
+          metricRecordStmt.setString(4, metric.getInstanceId());
+          metricRecordStmt.setLong(5, currentTime);
+          metricRecordStmt.setLong(6, metric.getStartTime());
+          metricRecordStmt.setString(7, metric.getUnits());
+          metricRecordStmt.setDouble(8, aggregates[0]);
+          metricRecordStmt.setDouble(9, aggregates[1]);
+          metricRecordStmt.setDouble(10, aggregates[2]);
+          metricRecordStmt.setLong(11, (long) aggregates[3]);
+          String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+          metricRecordStmt.setString(12, json);
+
+          try {
+            metricRecordStmt.executeUpdate();
+          } catch (SQLException sql) {
+            LOG.error("Failed on insert records to store.", sql);
+          }
+        }
+      }
+
+      // commit() blocked if HBase unavailable
+      conn.commit();
+    } catch (Exception exception){
+      exception.printStackTrace();
+    }
+    finally {
+      if (metricRecordStmt != null) {
+        try {
+          metricRecordStmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
   }
 
   private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
@@ -553,94 +673,45 @@ public class PhoenixHBaseAccessor {
   }
 
   public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager,
-                                              TimelineMetrics metrics) throws SQLException,
IOException {
+                                              TimelineMetrics metrics, boolean skipCache)
throws SQLException, IOException {
     List<TimelineMetric> timelineMetrics = metrics.getMetrics();
     if (timelineMetrics == null || timelineMetrics.isEmpty()) {
       LOG.debug("Empty metrics insert request.");
       return;
     }
-
-    Connection conn = getConnection();
-    PreparedStatement metricRecordStmt = null;
-    long currentTime = System.currentTimeMillis();
-
-    try {
-      metricRecordStmt = conn.prepareStatement(String.format(
-        UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
-
-      for (TimelineMetric metric : timelineMetrics) {
-        if (Math.abs(currentTime - metric.getStartTime()) > outOfBandTimeAllowance) {
-          // If timeseries start time is way in the past : discard
-          LOG.debug("Discarding out of band timeseries, currentTime = "
-            + currentTime + ", startTime = " + metric.getStartTime()
-            + ", hostname = " + metric.getHostName());
-          continue;
-        }
-
-        metricRecordStmt.clearParameters();
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("host: " + metric.getHostName() + ", " +
-            "metricName = " + metric.getMetricName() + ", " +
-            "values: " + metric.getMetricValues());
-        }
-        double[] aggregates =  AggregatorUtils.calculateAggregates(
-          metric.getMetricValues());
-
-        metricRecordStmt.setString(1, metric.getMetricName());
-        metricRecordStmt.setString(2, metric.getHostName());
-        metricRecordStmt.setString(3, metric.getAppId());
-        metricRecordStmt.setString(4, metric.getInstanceId());
-        metricRecordStmt.setLong(5, currentTime);
-        metricRecordStmt.setLong(6, metric.getStartTime());
-        metricRecordStmt.setString(7, metric.getUnits());
-        metricRecordStmt.setDouble(8, aggregates[0]);
-        metricRecordStmt.setDouble(9, aggregates[1]);
-        metricRecordStmt.setDouble(10, aggregates[2]);
-        metricRecordStmt.setLong(11, (long) aggregates[3]);
-        String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
-        metricRecordStmt.setString(12, json);
-
-        try {
-          metricRecordStmt.executeUpdate();
-
-          if (metadataManager != null) {
-            // Write to metadata cache on successful write to store
-            metadataManager.putIfModifiedTimelineMetricMetadata(
-              metadataManager.getTimelineMetricMetadata(metric));
-
-            metadataManager.putIfModifiedHostedAppsMetadata(
-              metric.getHostName(), metric.getAppId());
-          }
-
-        } catch (SQLException sql) {
-          LOG.error("Failed on insert records to store.", sql);
-        }
+    for (TimelineMetric tm: timelineMetrics) {
+      // Write to metadata cache on successful write to store
+      if (metadataManager != null) {
+        metadataManager.putIfModifiedTimelineMetricMetadata(
+                metadataManager.getTimelineMetricMetadata(tm));
+
+        metadataManager.putIfModifiedHostedAppsMetadata(
+                tm.getHostName(), tm.getAppId());
       }
+    }
 
-      // commit() blocked if HBase unavailable
-      conn.commit();
-
-    } finally {
-      if (metricRecordStmt != null) {
-        try {
-          metricRecordStmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
+    if  (!skipCache && cacheEnabled) {
+      LOG.debug("Adding metrics to cache");
+      if (insertCache.size() >= cacheSize) {
+        commitMetricsFromCache();
       }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
+      try {
+        insertCache.put(metrics); // blocked while the queue is full
+      } catch (InterruptedException e) {
+        e.printStackTrace();
       }
+    } else {
+      LOG.debug("Skipping metrics cache");
+      commitMetrics(metrics);
     }
   }
 
+  public void insertMetricRecords(TimelineMetrics metrics, boolean skipCache) throws SQLException,
IOException {
+    insertMetricRecordsWithMetadata(null, metrics, skipCache);
+  }
+
   public void insertMetricRecords(TimelineMetrics metrics) throws SQLException, IOException
{
-    insertMetricRecordsWithMetadata(null, metrics);
+    insertMetricRecords(metrics, false);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/ambari/blob/7dfce9b1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 8e9a8d6..cb78e02 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -42,6 +42,15 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
     "timeline.metrics.aggregator.checkpoint.dir";
 
+  public static final String TIMELINE_METRICS_CACHE_SIZE =
+    "timeline.metrics.cache.size";
+
+  public static final String TIMELINE_METRICS_CACHE_COMMIT_INTERVAL =
+    "timeline.metrics.cache.commit.interval";
+
+  public static final String TIMELINE_METRICS_CACHE_ENABLED =
+    "timeline.metrics.cache.enabled";
+
   public static final String DEFAULT_CHECKPOINT_LOCATION =
     System.getProperty("java.io.tmpdir");
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7dfce9b1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index 290a98a..a86fa11 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
@@ -41,6 +41,8 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -206,4 +208,39 @@ public class PhoenixHBaseAccessorTest {
     PowerMock.verifyAll();
   }
 
+  @Test
+  public void testMetricsCacheCommittingWhenFull() throws IOException, SQLException {
+    Configuration hbaseConf = new Configuration();
+    hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
+    Configuration metricsConf = new Configuration();
+    metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1");
+    metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL,
"100");
+    final Connection connection = EasyMock.createNiceMock(Connection.class);
+
+
+    PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf) {
+      @Override
+      public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection)
{
+        try {
+          connection.commit();
+        } catch (SQLException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    TimelineMetrics timelineMetrics = EasyMock.createNiceMock(TimelineMetrics.class);
+    EasyMock.expect(timelineMetrics.getMetrics()).andReturn(Collections.singletonList(new
TimelineMetric())).anyTimes();
+    connection.commit();
+    EasyMock.expectLastCall().once();
+
+    EasyMock.replay(timelineMetrics, connection);
+
+    accessor.insertMetricRecords(timelineMetrics);
+    accessor.insertMetricRecords(timelineMetrics);
+    accessor.insertMetricRecords(timelineMetrics);
+
+    EasyMock.verify(timelineMetrics, connection);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7dfce9b1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
index 92e4dfc..7363a61 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
@@ -69,7 +69,7 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest {
     }});
     timelineMetrics.getMetrics().add(metric2);
 
-    hdb.insertMetricRecordsWithMetadata(metadataManager, timelineMetrics);
+    hdb.insertMetricRecordsWithMetadata(metadataManager, timelineMetrics, true);
   }
 
   @Test(timeout = 180000)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7dfce9b1/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index e5758bf..77d5b31 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -553,6 +553,30 @@
     </description>
   </property>
   <property>
+    <name>timeline.metrics.cache.commit.interval</name>
+    <value>3</value>
+    <description>
+      Time in seconds between committing metrics from cache
+    </description>
+    <value-attributes>
+      <type>int</type>
+    </value-attributes>
+  </property>
+  <property>
+    <name>timeline.metrics.cache.size</name>
+    <value>150</value>
+    <description>
+      Size of array blocking queue used to cache metrics
+    </description>
+  </property>
+  <property>
+    <name>timeline.metrics.cache.enabled</name>
+    <value>true</value>
+    <description>
+      If set to true PhoenixHBaseAccessor will use cache to store metrics before committing
them
+    </description>
+  </property>
+  <property>
     <name>timeline.metrics.service.http.policy</name>
     <value>HTTP_ONLY</value>
     <description>

http://git-wip-us.apache.org/repos/asf/ambari/blob/7dfce9b1/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
index 8f369f7..ddd3e42 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
@@ -41,6 +41,8 @@ class AMSServiceCheck(Script):
   AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s"
   AMS_CONNECT_TRIES = 30
   AMS_CONNECT_TIMEOUT = 15
+  AMS_READ_TRIES = 10
+  AMS_READ_TIMEOUT = 5
 
   @OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
   def service_check(self, env):
@@ -131,41 +133,51 @@ class AMSServiceCheck(Script):
     Logger.info("Connecting (GET) to %s:%s%s" % (params.metric_collector_host,
                                                  params.metric_collector_port,
                                               self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters))
+    for i in xrange(0, self.AMS_READ_TRIES):
+      conn = network.get_http_connection(params.metric_collector_host,
+                                         int(params.metric_collector_port),
+                                         params.metric_collector_https_enabled,
+                                         ca_certs)
+      conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
+      response = conn.getresponse()
+      Logger.info("Http response: %s %s" % (response.status, response.reason))
 
-    conn = network.get_http_connection(params.metric_collector_host,
-                                       int(params.metric_collector_port),
-                                       params.metric_collector_https_enabled,
-                                       ca_certs)
-    conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
-    response = conn.getresponse()
-    Logger.info("Http response: %s %s" % (response.status, response.reason))
-
-    data = response.read()
-    Logger.info("Http data: %s" % data)
-    conn.close()
-
-    if response.status == 200:
-      Logger.info("Metrics were retrieved.")
-    else:
-      Logger.info("Metrics were not retrieved. Service check has failed.")
-      raise Fail("Metrics were not retrieved. Service check has failed. GET request status:
%s %s \n%s" %
-                 (response.status, response.reason, data))
-    data_json = json.loads(data)
-
-    def floats_eq(f1, f2, delta):
-      return abs(f1-f2) < delta
-
-    for metrics_data in data_json["metrics"]:
-      if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in metrics_data["metrics"]
-          and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001)
-          and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time,
1)):
-        Logger.info("Values %s and %s were found in the response." % (random_value1, current_time))
-        break
-      pass
-    else:
-      Logger.info("Values %s and %s were not found in the response." % (random_value1, current_time))
-      raise Fail("Values %s and %s were not found in the response." % (random_value1, current_time))
+      data = response.read()
+      Logger.info("Http data: %s" % data)
+      conn.close()
 
+      if response.status == 200:
+        Logger.info("Metrics were retrieved.")
+      else:
+        Logger.info("Metrics were not retrieved. Service check has failed.")
+        raise Fail("Metrics were not retrieved. Service check has failed. GET request status:
%s %s \n%s" %
+                   (response.status, response.reason, data))
+      data_json = json.loads(data)
+
+      def floats_eq(f1, f2, delta):
+        return abs(f1-f2) < delta
+
+      values_are_present = False
+      for metrics_data in data_json["metrics"]:
+        if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in
metrics_data["metrics"]
+            and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001)
+            and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time,
1)):
+          Logger.info("Values %s and %s were found in the response." % (random_value1, current_time))
+          values_are_present = True
+          break
+          pass
+
+      if not values_are_present:
+        if i < self.AMS_READ_TRIES - 1:  #range/xrange returns items from start to end-1
+          Logger.info("Values weren't stored yet. Retrying in %s seconds."
+                    % (self.AMS_READ_TIMEOUT))
+          time.sleep(self.AMS_READ_TIMEOUT)
+        else:
+          Logger.info("Values %s and %s were not found in the response." % (random_value1,
current_time))
+          raise Fail("Values %s and %s were not found in the response." % (random_value1,
current_time))
+      else:
+        break
+        pass
     Logger.info("Ambari Metrics service check is finished.")
 
 if __name__ == "__main__":


Mime
View raw message