flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/3] flink git commit: [FLINK-4093] Expose metric interfaces
Date Mon, 27 Jun 2016 13:59:18 GMT
Repository: flink
Updated Branches:
  refs/heads/master 62cb954d9 -> ee3c7a88b


[FLINK-4093] Expose metric interfaces

This closes #2134


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d43bf8d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d43bf8d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d43bf8d9

Branch: refs/heads/master
Commit: d43bf8d9b3085d1341bfca61e05c2a77e5426226
Parents: 62cb954
Author: zentol <chesnay@apache.org>
Authored: Wed Jun 22 10:37:03 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Mon Jun 27 15:23:27 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/metrics/Counter.java  | 24 ++-----
 .../java/org/apache/flink/metrics/Gauge.java    |  5 +-
 .../org/apache/flink/metrics/MetricGroup.java   | 32 +++++++--
 .../org/apache/flink/metrics/SimpleCounter.java | 71 ++++++++++++++++++++
 .../metrics/groups/AbstractMetricGroup.java     | 16 ++++-
 .../groups/UnregisteredMetricsGroup.java        | 19 ++++--
 .../flink/runtime/taskmanager/TaskManager.scala | 41 ++++++-----
 .../partition/consumer/InputChannelTest.java    |  4 +-
 8 files changed, 154 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
index acc37cf..ffb1cc7 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
@@ -24,48 +24,36 @@ import org.apache.flink.annotation.PublicEvolving;
  * A Counter is a {@link Metric} that measures a count.
  */
 @PublicEvolving
-public final class Counter implements Metric {
-
-	private long count;
+public interface Counter extends Metric {
 
 	/**
 	 * Increment the current count by 1.
 	 */
-	public void inc() {
-		count++;
-	}
+	void inc();
 
 	/**
 	 * Increment the current count by the given value.
 	 *
 	 * @param n value to increment the current count by
 	 */
-	public void inc(long n) {
-		count += n;
-	}
+	void inc(long n);
 
 	/**
 	 * Decrement the current count by 1.
 	 */
-	public void dec() {
-		count--;
-	}
+	void dec();
 
 	/**
 	 * Decrement the current count by the given value.
 	 *
 	 * @param n value to decrement the current count by
 	 */
-	public void dec(long n) {
-		count -= n;
-	}
+	void dec(long n);
 
 	/**
 	 * Returns the current count.
 	 *
 	 * @return current count
 	 */
-	public long getCount() {
-		return count;
-	}
+	long getCount();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
index aad8deb..740645d 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
@@ -24,12 +24,11 @@ import org.apache.flink.annotation.PublicEvolving;
  * A Gauge is a {@link Metric} that calculates a specific value at a point in time.
  */
 @PublicEvolving
-public abstract class Gauge<T> implements Metric {
-
+public interface Gauge<T> extends Metric {
 	/**
 	 * Calculates and returns the measured value.
 	 *
 	 * @return calculated value
 	 */
-	public abstract T getValue();
+	T getValue();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index 6c9e044..b131949 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -63,7 +63,7 @@ public interface MetricGroup {
 	 * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
 	 *
 	 * @param name name of the counter
-	 * @return the registered counter
+	 * @return the created counter
 	 */
 	Counter counter(int name);
 
@@ -71,19 +71,39 @@ public interface MetricGroup {
 	 * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
 	 *
 	 * @param name name of the counter
-	 * @return the registered counter
+	 * @return the created counter
 	 */
 	Counter counter(String name);
 
 	/**
+	 * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
+	 *
+	 * @param name    name of the counter
+	 * @param counter counter to register
+	 * @param <C>     counter type
+	 * @return the given counter
+	 */
+	<C extends Counter> C counter(int name, C counter);
+
+	/**
+	 * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
+	 *
+	 * @param name    name of the counter
+	 * @param counter counter to register
+	 * @param <C>     counter type
+	 * @return the given counter
+	 */
+	<C extends Counter> C counter(String name, C counter);
+	
+	/**
 	 * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
 	 *
 	 * @param name  name of the gauge
 	 * @param gauge gauge to register
 	 * @param <T>   return type of the gauge
-	 * @return the registered gauge
+	 * @return the given gauge
 	 */
-	<T> Gauge<T> gauge(int name, Gauge<T> gauge);
+	<T, G extends Gauge<T>> G gauge(int name, G gauge);
 
 	/**
 	 * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
@@ -91,9 +111,9 @@ public interface MetricGroup {
 	 * @param name  name of the gauge
 	 * @param gauge gauge to register
 	 * @param <T>   return type of the gauge
-	 * @return the registered gauge
+	 * @return the given gauge
 	 */
-	<T> Gauge<T> gauge(String name, Gauge<T> gauge);
+	<T, G extends Gauge<T>> G gauge(String name, G gauge);
 
 	// ------------------------------------------------------------------------
 	// Groups

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
new file mode 100644
index 0000000..9720b08
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not thread-safe.
+ */
+public class SimpleCounter implements Counter {
+	private long count;
+
+	/**
+	 * Increment the current count by 1.
+	 */
+	@Override
+	public void inc() {
+		count++;
+	}
+
+	/**
+	 * Increment the current count by the given value.
+	 *
+	 * @param n value to increment the current count by
+	 */
+	@Override
+	public void inc(long n) {
+		count += n;
+	}
+
+	/**
+	 * Decrement the current count by 1.
+	 */
+	@Override
+	public void dec() {
+		count--;
+	}
+
+	/**
+	 * Decrement the current count by the given value.
+	 *
+	 * @param n value to decrement the current count by
+	 */
+	@Override
+	public void dec(long n) {
+		count -= n;
+	}
+
+	/**
+	 * Returns the current count.
+	 *
+	 * @return current count
+	 */
+	@Override
+	public long getCount() {
+		return count;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
index 032fa04..93eb734 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.SimpleCounter;
 
 import org.apache.flink.metrics.groups.scope.ScopeFormat;
 import org.slf4j.Logger;
@@ -146,18 +147,27 @@ public abstract class AbstractMetricGroup implements MetricGroup {
 
 	@Override
 	public Counter counter(String name) {
-		Counter counter = new Counter();
+		return counter(name, new SimpleCounter());
+	}
+	
+	@Override
+	public <C extends Counter> C counter(int name, C counter) {
+		return counter(String.valueOf(name), counter);
+	}
+
+	@Override
+	public <C extends Counter> C counter(String name, C counter) {
 		addMetric(name, counter);
 		return counter;
 	}
 
 	@Override
-	public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+	public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
 		return gauge(String.valueOf(name), gauge);
 	}
 
 	@Override
-	public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+	public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
 		addMetric(name, gauge);
 		return gauge;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
index 961bcce..29d71d9 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
 
 /**
  * A special {@link MetricGroup} that does not register any metrics at the metrics registry
@@ -42,21 +43,31 @@ public class UnregisteredMetricsGroup implements MetricGroup {
 
 	@Override
 	public Counter counter(int name) {
-		return new Counter();
+		return new SimpleCounter();
 	}
 
 	@Override
 	public Counter counter(String name) {
-		return new Counter();
+		return new SimpleCounter();
 	}
 
 	@Override
-	public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+	public <C extends Counter> C counter(int name, C counter) {
+		return counter;
+	}
+
+	@Override
+	public <C extends Counter> C counter(String name, C counter) {
+		return counter;
+	}
+
+	@Override
+	public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
 		return gauge;
 	}
 
 	@Override
-	public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+	public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
 		return gauge;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8ef22af..1fb0e09 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2294,11 +2294,10 @@ object TaskManager {
   private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
     val mxBean = ManagementFactory.getClassLoadingMXBean
 
-    metrics
-      .gauge("ClassesLoaded", new FlinkGauge[Long] {
+    metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] {
       override def getValue: Long = mxBean.getTotalLoadedClassCount
     })
-    metrics.gauge("ClassesUnloaded", new FlinkGauge[Long] {
+    metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] {
       override def getValue: Long = mxBean.getUnloadedClassCount
     })
   }
@@ -2308,10 +2307,10 @@ object TaskManager {
 
     for (garbageCollector <- garbageCollectors) {
       val gcGroup = metrics.addGroup("\"" + garbageCollector.getName + "\"")
-      gcGroup.gauge("Count", new FlinkGauge[Long] {
+      gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
         override def getValue: Long = garbageCollector.getCollectionCount
       })
-      gcGroup.gauge("Time", new FlinkGauge[Long] {
+      gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
         override def getValue: Long = garbageCollector.getCollectionTime
       })
     }
@@ -2320,24 +2319,24 @@ object TaskManager {
   private def instantiateMemoryMetrics(metrics: MetricGroup) {
     val mxBean = ManagementFactory.getMemoryMXBean
     val heap = metrics.addGroup("Heap")
-    heap.gauge("Used", new FlinkGauge[Long] {
+    heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
       override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
     })
-    heap.gauge("Committed", new FlinkGauge[Long] {
+    heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
       })
-    heap.gauge("Max", new FlinkGauge[Long] {
+    heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
       })
 
     val nonHeap = metrics.addGroup("NonHeap")
-    nonHeap.gauge("Used", new FlinkGauge[Long] {
+    nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
       })
-    nonHeap.gauge("Committed", new FlinkGauge[Long] {
+    nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
       })
-    nonHeap.gauge("Max", new FlinkGauge[Long] {
+    nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
       })
 
@@ -2346,15 +2345,15 @@ object TaskManager {
     val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
 
     val direct = metrics.addGroup("Direct")
-    direct.gauge("Count", new FlinkGauge[Long] {
+    direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(directObjectName, "Count").asInstanceOf[Long]
       })
-    direct.gauge("MemoryUsed", new FlinkGauge[Long] {
+    direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
       })
-    direct.gauge("TotalCapacity", new FlinkGauge[Long] {
+    direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
       })
@@ -2362,15 +2361,15 @@ object TaskManager {
     val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
 
     val mapped = metrics.addGroup("Mapped")
-    mapped.gauge("Count", new FlinkGauge[Long] {
+    mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
       })
-    mapped.gauge("MemoryUsed", new FlinkGauge[Long] {
+    mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
       })
-    mapped.gauge("TotalCapacity", new FlinkGauge[Long] {
+    mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
       })
@@ -2379,8 +2378,7 @@ object TaskManager {
   private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
     val mxBean = ManagementFactory.getThreadMXBean
 
-    metrics
-      .gauge("Count", new FlinkGauge[Int] {
+    metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
       override def getValue: Int = mxBean.getThreadCount
     })
   }
@@ -2390,11 +2388,10 @@ object TaskManager {
       val mxBean = ManagementFactory.getOperatingSystemMXBean
         .asInstanceOf[com.sun.management.OperatingSystemMXBean]
 
-      metrics
-        .gauge("Load", new FlinkGauge[Double] {
+      metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] {
           override def getValue: Double = mxBean.getProcessCpuLoad
         })
-      metrics.gauge("Time", new FlinkGauge[Long] {
+      metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
           override def getValue: Long = mxBean.getProcessCpuTime
         })
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index da15f08..0868398 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -119,7 +119,7 @@ public class InputChannelTest {
 				ResultPartitionID partitionId,
 				Tuple2<Integer, Integer> initialAndMaxBackoff) {
 
-			super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter());
+			super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new SimpleCounter());
 		}
 
 		@Override


Mime
View raw message