flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [4/4] flink git commit: [FLINK-5464] [metrics] Improve MetricDumpSerialization exception handling
Date Mon, 23 Jan 2017 21:18:26 GMT
[FLINK-5464] [metrics] Improve MetricDumpSerialization exception handling

This closes #3128.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8e85a2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8e85a2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8e85a2d

Branch: refs/heads/master
Commit: a8e85a2d5abcaf2defab27be0027190ac3ecb5d5
Parents: 7704724
Author: zentol <chesnay@apache.org>
Authored: Thu Jan 12 12:42:26 2017 +0100
Committer: zentol <chesnay@apache.org>
Committed: Mon Jan 23 22:18:20 2017 +0100

----------------------------------------------------------------------
 .../webmonitor/metrics/MetricFetcher.java       |   6 +-
 .../webmonitor/metrics/MetricFetcherTest.java   |   6 +-
 .../metrics/dump/MetricDumpSerialization.java   | 321 ++++++++++++-------
 .../metrics/dump/MetricQueryService.java        |   2 +-
 .../metrics/dump/MetricDumpSerializerTest.java  |  47 ++-
 .../metrics/dump/MetricQueryServiceTest.java    |  23 +-
 6 files changed, 264 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 621f4d9..7ffadce 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
@@ -42,7 +43,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -191,8 +191,8 @@ public class MetricFetcher {
 		logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
 	}
 
-	private void addMetrics(Object result) throws IOException {
-		byte[] data = (byte[]) result;
+	private void addMetrics(Object result) {
+		MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult)
result;
 		List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
 		for (MetricDump metric : dumpedMetrics) {
 			metrics.add(metric);

http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 58048e6..d644c23 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -115,7 +115,7 @@ public class MetricFetcherTest extends TestLogger {
 
 		MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
 		when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
-			.thenReturn(Future$.MODULE$.successful((Object) new byte[16]));
+			.thenReturn(Future$.MODULE$.successful((Object) new MetricDumpSerialization.MetricSerializationResult(new
byte[0], 0, 0, 0, 0)));
 
 		MetricFetcher.BasicGateway tmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
 		when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
@@ -171,7 +171,7 @@ public class MetricFetcherTest extends TestLogger {
 		}
 	}
 
-	private static byte[] createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException
{
+	private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID
tmID, JobID jobID) throws IOException {
 		Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
 		Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
 		Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
@@ -213,7 +213,7 @@ public class MetricFetcherTest extends TestLogger {
 		histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"),
"hist"));
 
 		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
-		byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
+		MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters,
gauges, histograms, meters);
 		serializer.close();
 
 		return dump;

http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index 143faaf..e57a0d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -17,20 +17,23 @@
  */
 package org.apache.flink.runtime.metrics.dump;
 
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Meter;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -45,208 +48,287 @@ import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY
  * Utility class for the serialization of metrics.
  */
 public class MetricDumpSerialization {
+
 	private static final Logger LOG = LoggerFactory.getLogger(MetricDumpSerialization.class);
 
 	private MetricDumpSerialization() {
 	}
 
+	/**
+	 * This class encapsulates all serialized metrics and a count for each metric type.
+	 * 
+	 * The counts are stored separately from the metrics since the final count for any given
type can only be
+	 * determined after all metrics of that type were serialized. Storing them together in a
single byte[] would
+	 * require an additional copy of all serialized metrics, as you would first have to serialize
the metrics into a
+	 * temporary buffer to calculate the counts, write the counts to the final output and copy
all metrics from the
+	 * temporary buffer.
+	 * 
+	 * Note that while one could implement the serialization in such a way so that at least
1 byte (a validity flag)
+	 * is written for each metric, this would require more bandwidth due to the sheer number
of metrics.
+	 */
+	public static class MetricSerializationResult implements Serializable {
+
+		private static final long serialVersionUID = 6928770855951536906L;
+
+		public final byte[] serializedMetrics;
+		public final int numCounters;
+		public final int numGauges;
+		public final int numMeters;
+		public final int numHistograms;
+		
+		public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges,
int numMeters, int numHistograms) {
+			Preconditions.checkNotNull(serializedMetrics);
+			Preconditions.checkArgument(numCounters >= 0);
+			Preconditions.checkArgument(numGauges >= 0);
+			Preconditions.checkArgument(numMeters >= 0); 
+			Preconditions.checkArgument(numHistograms >= 0);
+			this.serializedMetrics = serializedMetrics;
+			this.numCounters = numCounters;
+			this.numGauges = numGauges;
+			this.numMeters = numMeters;
+			this.numHistograms = numHistograms;
+		}
+	}
+
 	//-------------------------------------------------------------------------
 	// Serialization
 	//-------------------------------------------------------------------------
+
 	public static class MetricDumpSerializer {
-		private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-		private DataOutputStream dos = new DataOutputStream(baos);
+
+		private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
 
 		/**
 		 * Serializes the given metrics and returns the resulting byte array.
+		 * 
+		 * Should a {@link Metric} accessed in this method throw an exception it will be omitted
from the returned
+		 * {@link MetricSerializationResult}.
+		 * 
+		 * If the serialization of any primitive or String fails then the returned {@link MetricSerializationResult}
+		 * is partially corrupted. Such a result can be deserialized safely by 
+		 * {@link MetricDumpDeserializer#deserialize(MetricSerializationResult)}; however only
metrics that were
+		 * fully serialized before the failure will be returned.
 		 *
 		 * @param counters   counters to serialize
 		 * @param gauges     gauges to serialize
 		 * @param histograms histograms to serialize
-		 * @return byte array containing the serialized metrics
-		 * @throws IOException
+		 * @return MetricSerializationResult containing the serialized metrics and the count of
each metric type
 		 */
-		public byte[] serialize(
+		public MetricSerializationResult serialize(
 			Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
 			Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
 			Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
-			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
-				
-			baos.reset();
-			dos.writeInt(counters.size());
-			dos.writeInt(gauges.size());
-			dos.writeInt(histograms.size());
-			dos.writeInt(meters.size());
+			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
+
+			buffer.clear();
 
+			int numCounters = 0;
 			for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet())
{
-				serializeMetricInfo(dos, entry.getValue().f0);
-				serializeString(dos, entry.getValue().f1);
-				serializeCounter(dos, entry.getKey());
+				try {
+					serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					numCounters++;
+				} catch (Exception e) {
+					LOG.debug("Failed to serialize counter.", e);
+				}
 			}
 
+			int numGauges = 0;
 			for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet())
{
-				serializeMetricInfo(dos, entry.getValue().f0);
-				serializeString(dos, entry.getValue().f1);
-				serializeGauge(dos, entry.getKey());
+				try {
+					serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					numGauges++;
+				} catch (Exception e) {
+					LOG.debug("Failed to serialize gauge.", e);
+				}
 			}
 
+			int numHistograms = 0;
 			for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet())
{
-				serializeMetricInfo(dos, entry.getValue().f0);
-				serializeString(dos, entry.getValue().f1);
-				serializeHistogram(dos, entry.getKey());
+				try {
+					serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					numHistograms++;
+				} catch (Exception e) {
+					LOG.debug("Failed to serialize histogram.", e);
+				}
 			}
 
+			int numMeters = 0;
 			for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet())
{
-				serializeMetricInfo(dos, entry.getValue().f0);
-				serializeString(dos, entry.getValue().f1);
-				serializeMeter(dos, entry.getKey());
+				try {
+					serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					numMeters++;
+				} catch (Exception e) {
+					LOG.debug("Failed to serialize meter.", e);
+				}
 			}
-			return baos.toByteArray();
+
+			return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges,
numMeters, numHistograms);
 		}
 
 		public void close() {
-			try {
-				dos.close();
-			} catch (Exception e) {
-				LOG.debug("Failed to close OutputStream.", e);
-			}
-			try {
-				baos.close();
-			} catch (Exception e) {
-				LOG.debug("Failed to close OutputStream.", e);
-			}
+			buffer = null;
 		}
 	}
 
-	private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws
IOException {
-		serializeString(dos, info.scope);
-		dos.writeByte(info.getCategory());
+	private static void serializeMetricInfo(DataOutput out, QueryScopeInfo info) throws IOException
{
+		out.writeUTF(info.scope);
+		out.writeByte(info.getCategory());
 		switch (info.getCategory()) {
 			case INFO_CATEGORY_JM:
 				break;
 			case INFO_CATEGORY_TM:
 				String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
-				serializeString(dos, tmID);
+				out.writeUTF(tmID);
 				break;
 			case INFO_CATEGORY_JOB:
 				QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
-				serializeString(dos, jobInfo.jobID);
+				out.writeUTF(jobInfo.jobID);
 				break;
 			case INFO_CATEGORY_TASK:
 				QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
-				serializeString(dos, taskInfo.jobID);
-				serializeString(dos, taskInfo.vertexID);
-				dos.writeInt(taskInfo.subtaskIndex);
+				out.writeUTF(taskInfo.jobID);
+				out.writeUTF(taskInfo.vertexID);
+				out.writeInt(taskInfo.subtaskIndex);
 				break;
 			case INFO_CATEGORY_OPERATOR:
 				QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo)
info;
-				serializeString(dos, operatorInfo.jobID);
-				serializeString(dos, operatorInfo.vertexID);
-				dos.writeInt(operatorInfo.subtaskIndex);
-				serializeString(dos, operatorInfo.operatorName);
+				out.writeUTF(operatorInfo.jobID);
+				out.writeUTF(operatorInfo.vertexID);
+				out.writeInt(operatorInfo.subtaskIndex);
+				out.writeUTF(operatorInfo.operatorName);
 				break;
+			default:
+				throw new IOException("Unknown scope category: " + info.getCategory());
 		}
 	}
 
-	private static void serializeString(DataOutputStream dos, String string) throws IOException
{
-		byte[] bytes = string.getBytes();
-		dos.writeInt(bytes.length);
-		dos.write(bytes);
+	private static void serializeCounter(DataOutput out, QueryScopeInfo info, String name, Counter
counter) throws IOException {
+		long count = counter.getCount();
+		serializeMetricInfo(out, info);
+		out.writeUTF(name);
+		out.writeLong(count);
 	}
 
-	private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException
{
-		dos.writeLong(counter.getCount());
-	}
+	private static void serializeGauge(DataOutput out, QueryScopeInfo info, String name, Gauge<?>
gauge) throws IOException {
+		Object value = gauge.getValue();
+		if (value == null) {
+			throw new NullPointerException("Value returned by gauge " + name + " was null.");
+		}
+		String stringValue = gauge.getValue().toString();
+		if (stringValue == null) {
+			throw new NullPointerException("toString() of the value returned by gauge " + name + "
returned null.");
+		}
 
-	private static void serializeGauge(DataOutputStream dos, Gauge<?> gauge) throws IOException
{
-		serializeString(dos, gauge.getValue().toString());
+		serializeMetricInfo(out, info);
+		out.writeUTF(name);
+		out.writeUTF(stringValue);
 	}
 
-	private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws
IOException {
+	private static void serializeHistogram(DataOutput out, QueryScopeInfo info, String name,
Histogram histogram) throws IOException {
 		HistogramStatistics stat = histogram.getStatistics();
-
-		dos.writeLong(stat.getMin());
-		dos.writeLong(stat.getMax());
-		dos.writeDouble(stat.getMean());
-		dos.writeDouble(stat.getQuantile(0.5));
-		dos.writeDouble(stat.getStdDev());
-		dos.writeDouble(stat.getQuantile(0.75));
-		dos.writeDouble(stat.getQuantile(0.90));
-		dos.writeDouble(stat.getQuantile(0.95));
-		dos.writeDouble(stat.getQuantile(0.98));
-		dos.writeDouble(stat.getQuantile(0.99));
-		dos.writeDouble(stat.getQuantile(0.999));
+		long min = stat.getMin();
+		long max = stat.getMax();
+		double mean = stat.getMean();
+		double median = stat.getQuantile(0.5);
+		double stddev = stat.getStdDev();
+		double p75 = stat.getQuantile(0.75);
+		double p90 = stat.getQuantile(0.90);
+		double p95 = stat.getQuantile(0.95);
+		double p98 = stat.getQuantile(0.98);
+		double p99 = stat.getQuantile(0.99);
+		double p999 = stat.getQuantile(0.999);
+
+		serializeMetricInfo(out, info);
+		out.writeUTF(name);
+		out.writeLong(min);
+		out.writeLong(max);
+		out.writeDouble(mean);
+		out.writeDouble(median);
+		out.writeDouble(stddev);
+		out.writeDouble(p75);
+		out.writeDouble(p90);
+		out.writeDouble(p95);
+		out.writeDouble(p98);
+		out.writeDouble(p99);
+		out.writeDouble(p999);
 	}
 
-	private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOException
{
-		dos.writeDouble(meter.getRate());
+	private static void serializeMeter(DataOutput out, QueryScopeInfo info, String name, Meter
meter) throws IOException {
+		serializeMetricInfo(out, info);
+		out.writeUTF(name);
+		out.writeDouble(meter.getRate());
 	}
 
 	//-------------------------------------------------------------------------
 	// Deserialization
 	//-------------------------------------------------------------------------
+
 	public static class MetricDumpDeserializer {
 		/**
 		 * De-serializes metrics from the given byte array and returns them as a list of {@link
MetricDump}.
 		 *
 		 * @param data serialized metrics
 		 * @return A list containing the deserialized metrics.
-		 * @throws IOException
 		 */
-		public List<MetricDump> deserialize(byte[] data) throws IOException {
-			ByteArrayInputStream bais = new ByteArrayInputStream(data);
-			DataInputStream dis = new DataInputStream(bais);
-
-			int numCounters = dis.readInt();
-			int numGauges = dis.readInt();
-			int numHistograms = dis.readInt();
-			int numMeters = dis.readInt();
+		public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult
data) {
+			DataInputView in = new DataInputDeserializer(data.serializedMetrics, 0, data.serializedMetrics.length);
 
-			List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
+			List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges
+ data.numHistograms + data.numMeters);
 
-			for (int x = 0; x < numCounters; x++) {
-				metrics.add(deserializeCounter(dis));
+			for (int x = 0; x < data.numCounters; x++) {
+				try {
+					metrics.add(deserializeCounter(in));
+				} catch (Exception e) {
+					LOG.debug("Failed to deserialize counter.", e);
+				}
 			}
 
-			for (int x = 0; x < numGauges; x++) {
-				metrics.add(deserializeGauge(dis));
+			for (int x = 0; x < data.numGauges; x++) {
+				try {
+					metrics.add(deserializeGauge(in));
+				} catch (Exception e) {
+					LOG.debug("Failed to deserialize gauge.", e);
+				}
 			}
 
-			for (int x = 0; x < numHistograms; x++) {
-				metrics.add(deserializeHistogram(dis));
+			for (int x = 0; x < data.numHistograms; x++) {
+				try {
+					metrics.add(deserializeHistogram(in));
+				} catch (Exception e) {
+					LOG.debug("Failed to deserialize histogram.", e);
+				}
 			}
 
-			for (int x = 0; x < numMeters; x++) {
-				metrics.add(deserializeMeter(dis));
+			for (int x = 0; x < data.numMeters; x++) {
+				try {
+					metrics.add(deserializeMeter(in));
+				} catch (Exception e) {
+					LOG.debug("Failed to deserialize meter.", e);
+				}
 			}
 
 			return metrics;
 		}
 	}
 
-	private static String deserializeString(DataInputStream dis) throws IOException {
-		int stringLength = dis.readInt();
-		byte[] bytes = new byte[stringLength];
-		dis.readFully(bytes);
-		return new String(bytes);
-	}
 
-	private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException
{
+	private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException
{
 		QueryScopeInfo scope = deserializeMetricInfo(dis);
-		String name = deserializeString(dis);
-		return new MetricDump.CounterDump(scope, name, dis.readLong());
+		String name = dis.readUTF();
+		long count = dis.readLong();
+		return new MetricDump.CounterDump(scope, name, count);
 	}
 
-	private static MetricDump.GaugeDump deserializeGauge(DataInputStream dis) throws IOException
{
+	private static MetricDump.GaugeDump deserializeGauge(DataInputView dis) throws IOException
{
 		QueryScopeInfo scope = deserializeMetricInfo(dis);
-		String name = deserializeString(dis);
-		String value = deserializeString(dis);
+		String name = dis.readUTF();
+		String value = dis.readUTF();
 		return new MetricDump.GaugeDump(scope, name, value);
 	}
 
-	private static MetricDump.HistogramDump deserializeHistogram(DataInputStream dis) throws
IOException {
+	private static MetricDump.HistogramDump deserializeHistogram(DataInputView dis) throws IOException
{
 		QueryScopeInfo info = deserializeMetricInfo(dis);
-		String name = deserializeString(dis);
+		String name = dis.readUTF();
 		long min = dis.readLong();
 		long max = dis.readLong();
 		double mean = dis.readDouble();
@@ -258,45 +340,46 @@ public class MetricDumpSerialization {
 		double p98 = dis.readDouble();
 		double p99 = dis.readDouble();
 		double p999 = dis.readDouble();
+
 		return new MetricDump.HistogramDump(info, name, min, max, mean, median, stddev, p75, p90,
p95, p98, p99, p999);
 	}
 
-	private static MetricDump.MeterDump deserializeMeter(DataInputStream dis) throws IOException
{
+	private static MetricDump.MeterDump deserializeMeter(DataInputView dis) throws IOException
{
 		QueryScopeInfo info = deserializeMetricInfo(dis);
-		String name = deserializeString(dis);
+		String name = dis.readUTF();
 		double rate = dis.readDouble();
 		return new MetricDump.MeterDump(info, name, rate);
 	}
 
-	private static QueryScopeInfo deserializeMetricInfo(DataInputStream dis) throws IOException
{
+	private static QueryScopeInfo deserializeMetricInfo(DataInput dis) throws IOException {
 		String jobID;
 		String vertexID;
 		int subtaskIndex;
 
-		String scope = deserializeString(dis);
+		String scope = dis.readUTF();
 		byte cat = dis.readByte();
 		switch (cat) {
 			case INFO_CATEGORY_JM:
 				return new QueryScopeInfo.JobManagerQueryScopeInfo(scope);
 			case INFO_CATEGORY_TM:
-				String tmID = deserializeString(dis);
+				String tmID = dis.readUTF();
 				return new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID, scope);
 			case INFO_CATEGORY_JOB:
-				jobID = deserializeString(dis);
+				jobID = dis.readUTF();
 				return new QueryScopeInfo.JobQueryScopeInfo(jobID, scope);
 			case INFO_CATEGORY_TASK:
-				jobID = deserializeString(dis);
-				vertexID = deserializeString(dis);
+				jobID = dis.readUTF();
+				vertexID = dis.readUTF();
 				subtaskIndex = dis.readInt();
 				return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope);
 			case INFO_CATEGORY_OPERATOR:
-				jobID = deserializeString(dis);
-				vertexID = deserializeString(dis);
+				jobID = dis.readUTF();
+				vertexID = dis.readUTF();
 				subtaskIndex = dis.readInt();
-				String operatorName = deserializeString(dis);
+				String operatorName = dis.readUTF();
 				return new QueryScopeInfo.OperatorQueryScopeInfo(jobID, vertexID, subtaskIndex, operatorName,
scope);
 			default:
-				throw new IOException("sup");
+				throw new IOException("Unknown scope category: " + cat);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 20bc258..2229139 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -106,7 +106,7 @@ public class MetricQueryService extends UntypedActor {
 					this.meters.remove(metric);
 				}
 			} else if (message instanceof CreateDump) {
-				byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
+				MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters,
gauges, histograms, meters);
 				getSender().tell(dump, getSelf());
 			} else {
 				LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 18f03e3..6e3d8f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -24,9 +24,13 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +46,47 @@ import static org.junit.Assert.fail;
 
 public class MetricDumpSerializerTest {
 	@Test
+	public void testNullGaugeHandling() throws IOException {
+		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
+		MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
+
+		Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
+		
+		gauges.put(new Gauge<Object>() {
+			@Override
+			public Object getValue() {
+				return null;
+			}
+		}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("A"),
"g"));
+		
+		MetricDumpSerialization.MetricSerializationResult output = serializer.serialize(
+			Collections.<Counter, Tuple2<QueryScopeInfo,String>>emptyMap(),
+			gauges,
+			Collections.<Histogram, Tuple2<QueryScopeInfo, String>>emptyMap(),
+			Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());
+		
+		// no metrics should be serialized
+		Assert.assertEquals(0, output.serializedMetrics.length);
+
+		List<MetricDump> deserialized = deserializer.deserialize(output);
+		Assert.assertEquals(0, deserialized.size());
+	}
+
+	@Test
+	public void testJavaSerialization() throws IOException {
+		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
+
+		final ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+		final ObjectOutputStream oos = new ObjectOutputStream(bos);
+
+		oos.writeObject(serializer.serialize(
+			new HashMap<Counter, Tuple2<QueryScopeInfo,String>>(),
+			new HashMap<Gauge<?>, Tuple2<QueryScopeInfo,String>>(),
+			new HashMap<Histogram, Tuple2<QueryScopeInfo,String>>(),
+			new HashMap<Meter, Tuple2<QueryScopeInfo,String>>()));
+	}
+
+	@Test
 	public void testSerialization() throws IOException {
 		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
 		MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
@@ -92,7 +137,7 @@ public class MetricDumpSerializerTest {
 		gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo("jid",
"vid", 2, "D"), "g1"));
 		histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo("jid",
"vid", 2, "opname", "E"), "h1"));
 
-		byte[] serialized = serializer.serialize(counters, gauges, histograms, meters);
+		MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(counters,
gauges, histograms, meters);
 		List<MetricDump> deserialized = deserializer.deserialize(serialized);
 
 		// ===== Counters ==============================================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 0104e3e..2243495 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -84,13 +84,6 @@ public class MetricQueryServiceTest extends TestLogger {
 		MetricQueryService.notifyOfAddedMetric(serviceActor, g, "gauge", tm);
 		MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm);
 		MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm);
-
-		// these metrics will be removed *after* the first query
-		MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
-		MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
-		MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
-		MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
-
 		serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
 		synchronized (testActor.lock) {
 			if (testActor.message == null) {
@@ -98,9 +91,14 @@ public class MetricQueryServiceTest extends TestLogger {
 			}
 		}
 
-		byte[] dump = (byte[]) testActor.message;
+		MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult)
testActor.message;
 		testActor.message = null;
-		assertTrue(dump.length > 0);
+		assertTrue(dump.serializedMetrics.length > 0);
+
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
 
 		serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
 		synchronized (testActor.lock) {
@@ -109,12 +107,9 @@ public class MetricQueryServiceTest extends TestLogger {
 			}
 		}
 
-		byte[] emptyDump = (byte[]) testActor.message;
+		MetricDumpSerialization.MetricSerializationResult emptyDump = (MetricDumpSerialization.MetricSerializationResult)
testActor.message;
 		testActor.message = null;
-		assertEquals(16, emptyDump.length);
-		for (int x = 0; x < 16; x++) {
-			assertEquals(0, emptyDump[x]);
-		}
+		assertEquals(0, emptyDump.serializedMetrics.length);
 
 		s.shutdown();
 	}


Mime
View raw message