ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From avija...@apache.org
Subject [1/2] ambari git commit: AMBARI-16440 : Flush metrics to collector if metric system is stopped gracefully in the Sink daemon. (avijayan)
Date Mon, 16 May 2016 17:07:07 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 3e0426d6c -> 928a03ba6


AMBARI-16440 : Flush metrics to collector if metric system is stopped gracefully in the Sink
daemon. (avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fd85458c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fd85458c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fd85458c

Branch: refs/heads/branch-2.4
Commit: fd85458c7829ad565d94a59e490a10f75ea15377
Parents: 3e0426d
Author: Aravindan Vijayan <avijayan@hortonworks.com>
Authored: Mon May 16 09:55:09 2016 -0700
Committer: Aravindan Vijayan <avijayan@hortonworks.com>
Committed: Mon May 16 09:55:09 2016 -0700

----------------------------------------------------------------------
 .../timeline/cache/TimelineMetricsCache.java    | 25 ++++++++++++++++++++
 .../timeline/HadoopTimelineMetricsSink.java     | 21 +++++++++++++++-
 2 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/fd85458c/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
index 3316a54..0bed7d0 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -22,8 +22,12 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -124,6 +128,23 @@ public class TimelineMetricsCache {
       return timelineMetric;
     }
 
+    public TimelineMetrics evictAll() {
+      List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
+      for (Iterator<Map.Entry<String, TimelineMetricWrapper>> it = this.entrySet().iterator();
it.hasNext();) {
+        Map.Entry<String, TimelineMetricWrapper> cacheEntry = it.next();
+        TimelineMetricWrapper metricWrapper = cacheEntry.getValue();
+        if (metricWrapper != null) {
+          TimelineMetric timelineMetric = cacheEntry.getValue().getTimelineMetric();
+          metricList.add(timelineMetric);
+        }
+        it.remove();
+      }
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(metricList);
+      return timelineMetrics;
+    }
+
     public void put(String metricName, TimelineMetric timelineMetric) {
       if (isDuplicate(timelineMetric)) {
         return;
@@ -157,6 +178,10 @@ public class TimelineMetricsCache {
     return null;
   }
 
+  public TimelineMetrics getAllMetrics() {
+    return timelineMetricCache.evictAll();
+  }
+
   /**
    * Getter method to help testing eviction
    * @return @int

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd85458c/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 35b9459..8966978 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.util.Servers;
 import org.apache.hadoop.net.DNS;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
@@ -44,10 +45,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink
{
+public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink,
Closeable {
   private Map<String, Set<String>> useTagsMap = new HashMap<String, Set<String>>();
   private TimelineMetricsCache metricsCache;
   private String hostName = "UNKNOWN.example.com";
@@ -63,6 +66,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink
imple
   private SubsetConfiguration conf;
   // Cache the rpc port used and the suffix to use if the port tag is found
   private Map<String, String> rpcPortSuffixes = new HashMap<>(10);
+  private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
 
   @Override
   public void init(SubsetConfiguration conf) {
@@ -386,4 +391,18 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink
imple
     // TODO: Buffering implementation
   }
 
+  @Override
+  public void close() throws IOException {
+
+    executorService.submit(new Runnable() {
+      @Override
+      public void run() {
+        LOG.info("Closing HadoopTimelineMetricSink. Flushing metrics to collector...");
+        TimelineMetrics metrics = metricsCache.getAllMetrics();
+        if (metrics != null) {
+          emitMetrics(metrics);
+        }
+      }
+    });
+  }
 }


Mime
View raw message