flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/2] flink git commit: [FLINK-4074] Make metric reporters less blocking
Date Mon, 27 Jun 2016 11:08:41 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9487fcbfb -> 19ff8db68


[FLINK-4074] Make metric reporters less blocking

This closes #2105


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56cdec7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56cdec7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56cdec7d

Branch: refs/heads/master
Commit: 56cdec7d81a53f9ea9578ca274c149f9758b5ffd
Parents: 9487fcb
Author: zentol <chesnay@apache.org>
Authored: Wed Jun 15 12:23:41 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Mon Jun 27 13:08:16 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/metrics/MetricRegistry.java    | 39 ++++++++++++--------
 .../flink/metrics/statsd/StatsDReporter.java    |  9 +++++
 2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56cdec7d/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index b3422e1..0abcdec 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -31,6 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -60,7 +62,7 @@ public class MetricRegistry {
 	static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
 	
 	private final MetricReporter reporter;
-	private final java.util.Timer timer;
+	private final ScheduledExecutorService executor;
 
 	private final ScopeFormats scopeFormats;
 
@@ -86,12 +88,11 @@ public class MetricRegistry {
 			// by default, create JMX metrics
 			LOG.info("No metrics reporter configured, exposing metrics via JMX");
 			this.reporter = new JMXReporter();
-			this.timer = null;
+			this.executor = null;
 		}
 		else {
 			MetricReporter reporter;
-			java.util.Timer timer;
-			
+			ScheduledExecutorService executor = null;
 			try {
 				String configuredPeriod = config.getString(KEY_METRICS_REPORTER_INTERVAL, null);
 				TimeUnit timeunit = TimeUnit.SECONDS;
@@ -117,24 +118,20 @@ public class MetricRegistry {
 				reporter.open(reporterConfig);
 
 				if (reporter instanceof Scheduled) {
+					executor = Executors.newSingleThreadScheduledExecutor();
 					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
-					long millis = timeunit.toMillis(period);
 					
-					timer = new java.util.Timer("Periodic Metrics Reporter", true);
-					timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
-				}
-				else {
-					timer = null;
+					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period,
timeunit);
 				}
 			}
 			catch (Throwable t) {
 				reporter = new JMXReporter();
-				timer = null;
+				shutdownExecutor();
 				LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.",
t);
 			}
 
 			this.reporter = reporter;
-			this.timer = timer;
+			this.executor = executor;
 		}
 	}
 
@@ -142,9 +139,6 @@ public class MetricRegistry {
 	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
 	 */
 	public void shutdown() {
-		if (timer != null) {
-			timer.cancel();
-		}
 		if (reporter != null) {
 			try {
 				reporter.close();
@@ -152,6 +146,21 @@ public class MetricRegistry {
 				LOG.warn("Metrics reporter did not shut down cleanly", t);
 			}
 		}
+		shutdownExecutor();
+	}
+	
+	private void shutdownExecutor() {
+		if (executor != null) {
+			executor.shutdown();
+
+			try {
+				if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+					executor.shutdownNow();
+				}
+			} catch (InterruptedException e) {
+				executor.shutdownNow();
+			}
+		}
 	}
 
 	public ScopeFormats getScopeFormats() {

http://git-wip-us.apache.org/repos/asf/flink/blob/56cdec7d/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index ae57f55..087a265 100644
--- a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -53,6 +53,8 @@ public class StatsDReporter extends AbstractReporter implements Scheduled
{
 //	public static final String ARG_CONVERSION_RATE = "rateConversion";
 //	public static final String ARG_CONVERSION_DURATION = "durationConversion";
 
+	private boolean closed = false;
+
 	private DatagramSocket socket;
 	private InetSocketAddress address;
 
@@ -81,6 +83,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled
{
 
 	@Override
 	public void close() {
+		closed = true;
 		if (socket != null && !socket.isClosed()) {
 			socket.close();
 		}
@@ -95,10 +98,16 @@ public class StatsDReporter extends AbstractReporter implements Scheduled
{
 		// operator creation and shutdown
 		try {
 			for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
+				if (closed) {
+					return;
+				}
 				reportGauge(entry.getValue(), entry.getKey());
 			}
 
 			for (Map.Entry<Counter, String> entry : counters.entrySet()) {
+				if (closed) {
+					return;
+				}
 				reportCounter(entry.getValue(), entry.getKey());
 			}
 		}


Mime
View raw message