Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AF3BA200C05 for ; Mon, 23 Jan 2017 22:18:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AD690160B3C; Mon, 23 Jan 2017 21:18:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3AA6C160B53 for ; Mon, 23 Jan 2017 22:18:24 +0100 (CET) Received: (qmail 38195 invoked by uid 500); 23 Jan 2017 21:18:23 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 38019 invoked by uid 99); 23 Jan 2017 21:18:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jan 2017 21:18:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4F1ABDFCCC; Mon, 23 Jan 2017 21:18:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chesnay@apache.org To: commits@flink.apache.org Date: Mon, 23 Jan 2017 21:18:26 -0000 Message-Id: <4a359c2be6fd4529aca81d323159e90b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/4] flink git commit: [FLINK-5464] [metrics] Improve MetricDumpSerialization exception handling archived-at: Mon, 23 Jan 2017 21:18:25 -0000 [FLINK-5464] [metrics] Improve MetricDumpSerialization exception handling This closes #3128. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8e85a2d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8e85a2d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8e85a2d Branch: refs/heads/master Commit: a8e85a2d5abcaf2defab27be0027190ac3ecb5d5 Parents: 7704724 Author: zentol Authored: Thu Jan 12 12:42:26 2017 +0100 Committer: zentol Committed: Mon Jan 23 22:18:20 2017 +0100 ---------------------------------------------------------------------- .../webmonitor/metrics/MetricFetcher.java | 6 +- .../webmonitor/metrics/MetricFetcherTest.java | 6 +- .../metrics/dump/MetricDumpSerialization.java | 321 ++++++++++++------- .../metrics/dump/MetricQueryService.java | 2 +- .../metrics/dump/MetricDumpSerializerTest.java | 47 ++- .../metrics/dump/MetricQueryServiceTest.java | 23 +- 6 files changed, 264 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java index 621f4d9..7ffadce 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; +import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.webmonitor.JobManagerRetriever; @@ -42,7 +43,6 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -191,8 +191,8 @@ public class MetricFetcher { logErrorOnFailure(metricQueryFuture, "Fetching metrics failed."); } - private void addMetrics(Object result) throws IOException { - byte[] data = (byte[]) result; + private void addMetrics(Object result) { + MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result; List dumpedMetrics = deserializer.deserialize(data); for (MetricDump metric : dumpedMetrics) { metrics.add(metric); http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java index 58048e6..d644c23 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java @@ -115,7 +115,7 @@ public class MetricFetcherTest extends TestLogger { MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class); when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class))) - .thenReturn(Future$.MODULE$.successful((Object) new byte[16])); + .thenReturn(Future$.MODULE$.successful((Object) new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0))); MetricFetcher.BasicGateway tmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class); when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class))) @@ -171,7 +171,7 @@ public class MetricFetcherTest extends TestLogger { } } - private static byte[] createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException { + private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException { Map> counters = new HashMap<>(); Map, Tuple2> gauges = new HashMap<>(); Map> histograms = new HashMap<>(); @@ -213,7 +213,7 @@ public class MetricFetcherTest extends TestLogger { histograms.put(new TestingHistogram(), new Tuple2(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist")); MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); - byte[] dump = serializer.serialize(counters, gauges, histograms, meters); + MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); serializer.close(); return dump; http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java index 143faaf..e57a0d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java @@ -17,20 +17,23 @@ */ package org.apache.flink.runtime.metrics.dump; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Meter; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -45,208 +48,287 @@ import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY * Utility class for the serialization of metrics. */ public class MetricDumpSerialization { + private static final Logger LOG = LoggerFactory.getLogger(MetricDumpSerialization.class); private MetricDumpSerialization() { } + /** + * This class encapsulates all serialized metrics and a count for each metric type. + * + * The counts are stored separately from the metrics since the final count for any given type can only be + * determined after all metrics of that type were serialized. Storing them together in a single byte[] would + * require an additional copy of all serialized metrics, as you would first have to serialize the metrics into a + * temporary buffer to calculate the counts, write the counts to the final output and copy all metrics from the + * temporary buffer. + * + * Note that while one could implement the serialization in such a way so that at least 1 byte (a validity flag) + * is written for each metric, this would require more bandwidth due to the sheer number of metrics. + */ + public static class MetricSerializationResult implements Serializable { + + private static final long serialVersionUID = 6928770855951536906L; + + public final byte[] serializedMetrics; + public final int numCounters; + public final int numGauges; + public final int numMeters; + public final int numHistograms; + + public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) { + Preconditions.checkNotNull(serializedMetrics); + Preconditions.checkArgument(numCounters >= 0); + Preconditions.checkArgument(numGauges >= 0); + Preconditions.checkArgument(numMeters >= 0); + Preconditions.checkArgument(numHistograms >= 0); + this.serializedMetrics = serializedMetrics; + this.numCounters = numCounters; + this.numGauges = numGauges; + this.numMeters = numMeters; + this.numHistograms = numHistograms; + } + } + //------------------------------------------------------------------------- // Serialization //------------------------------------------------------------------------- + public static class MetricDumpSerializer { - private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); - private DataOutputStream dos = new DataOutputStream(baos); + + private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32); /** * Serializes the given metrics and returns the resulting byte array. + * + * Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned + * {@link MetricSerializationResult}. + * + * If the serialization of any primitive or String fails then the returned {@link MetricSerializationResult} + * is partially corrupted. Such a result can be deserialized safely by + * {@link MetricDumpDeserializer#deserialize(MetricSerializationResult)}; however only metrics that were + * fully serialized before the failure will be returned. * * @param counters counters to serialize * @param gauges gauges to serialize * @param histograms histograms to serialize - * @return byte array containing the serialized metrics - * @throws IOException + * @return MetricSerializationResult containing the serialized metrics and the count of each metric type */ - public byte[] serialize( + public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) throws IOException { - - baos.reset(); - dos.writeInt(counters.size()); - dos.writeInt(gauges.size()); - dos.writeInt(histograms.size()); - dos.writeInt(meters.size()); + Map> meters) { + + buffer.clear(); + int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeCounter(dos, entry.getKey()); + try { + serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numCounters++; + } catch (Exception e) { + LOG.debug("Failed to serialize counter.", e); + } } + int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeGauge(dos, entry.getKey()); + try { + serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numGauges++; + } catch (Exception e) { + LOG.debug("Failed to serialize gauge.", e); + } } + int numHistograms = 0; for (Map.Entry> entry : histograms.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeHistogram(dos, entry.getKey()); + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + } catch (Exception e) { + LOG.debug("Failed to serialize histogram.", e); + } } + int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeMeter(dos, entry.getKey()); + try { + serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numMeters++; + } catch (Exception e) { + LOG.debug("Failed to serialize meter.", e); + } } - return baos.toByteArray(); + + return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms); } public void close() { - try { - dos.close(); - } catch (Exception e) { - LOG.debug("Failed to close OutputStream.", e); - } - try { - baos.close(); - } catch (Exception e) { - LOG.debug("Failed to close OutputStream.", e); - } + buffer = null; } } - private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws IOException { - serializeString(dos, info.scope); - dos.writeByte(info.getCategory()); + private static void serializeMetricInfo(DataOutput out, QueryScopeInfo info) throws IOException { + out.writeUTF(info.scope); + out.writeByte(info.getCategory()); switch (info.getCategory()) { case INFO_CATEGORY_JM: break; case INFO_CATEGORY_TM: String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; - serializeString(dos, tmID); + out.writeUTF(tmID); break; case INFO_CATEGORY_JOB: QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; - serializeString(dos, jobInfo.jobID); + out.writeUTF(jobInfo.jobID); break; case INFO_CATEGORY_TASK: QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; - serializeString(dos, taskInfo.jobID); - serializeString(dos, taskInfo.vertexID); - dos.writeInt(taskInfo.subtaskIndex); + out.writeUTF(taskInfo.jobID); + out.writeUTF(taskInfo.vertexID); + out.writeInt(taskInfo.subtaskIndex); break; case INFO_CATEGORY_OPERATOR: QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; - serializeString(dos, operatorInfo.jobID); - serializeString(dos, operatorInfo.vertexID); - dos.writeInt(operatorInfo.subtaskIndex); - serializeString(dos, operatorInfo.operatorName); + out.writeUTF(operatorInfo.jobID); + out.writeUTF(operatorInfo.vertexID); + out.writeInt(operatorInfo.subtaskIndex); + out.writeUTF(operatorInfo.operatorName); break; + default: + throw new IOException("Unknown scope category: " + info.getCategory()); } } - private static void serializeString(DataOutputStream dos, String string) throws IOException { - byte[] bytes = string.getBytes(); - dos.writeInt(bytes.length); - dos.write(bytes); + private static void serializeCounter(DataOutput out, QueryScopeInfo info, String name, Counter counter) throws IOException { + long count = counter.getCount(); + serializeMetricInfo(out, info); + out.writeUTF(name); + out.writeLong(count); } - private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException { - dos.writeLong(counter.getCount()); - } + private static void serializeGauge(DataOutput out, QueryScopeInfo info, String name, Gauge gauge) throws IOException { + Object value = gauge.getValue(); + if (value == null) { + throw new NullPointerException("Value returned by gauge " + name + " was null."); + } + String stringValue = gauge.getValue().toString(); + if (stringValue == null) { + throw new NullPointerException("toString() of the value returned by gauge " + name + " returned null."); + } - private static void serializeGauge(DataOutputStream dos, Gauge gauge) throws IOException { - serializeString(dos, gauge.getValue().toString()); + serializeMetricInfo(out, info); + out.writeUTF(name); + out.writeUTF(stringValue); } - private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws IOException { + private static void serializeHistogram(DataOutput out, QueryScopeInfo info, String name, Histogram histogram) throws IOException { HistogramStatistics stat = histogram.getStatistics(); - - dos.writeLong(stat.getMin()); - dos.writeLong(stat.getMax()); - dos.writeDouble(stat.getMean()); - dos.writeDouble(stat.getQuantile(0.5)); - dos.writeDouble(stat.getStdDev()); - dos.writeDouble(stat.getQuantile(0.75)); - dos.writeDouble(stat.getQuantile(0.90)); - dos.writeDouble(stat.getQuantile(0.95)); - dos.writeDouble(stat.getQuantile(0.98)); - dos.writeDouble(stat.getQuantile(0.99)); - dos.writeDouble(stat.getQuantile(0.999)); + long min = stat.getMin(); + long max = stat.getMax(); + double mean = stat.getMean(); + double median = stat.getQuantile(0.5); + double stddev = stat.getStdDev(); + double p75 = stat.getQuantile(0.75); + double p90 = stat.getQuantile(0.90); + double p95 = stat.getQuantile(0.95); + double p98 = stat.getQuantile(0.98); + double p99 = stat.getQuantile(0.99); + double p999 = stat.getQuantile(0.999); + + serializeMetricInfo(out, info); + out.writeUTF(name); + out.writeLong(min); + out.writeLong(max); + out.writeDouble(mean); + out.writeDouble(median); + out.writeDouble(stddev); + out.writeDouble(p75); + out.writeDouble(p90); + out.writeDouble(p95); + out.writeDouble(p98); + out.writeDouble(p99); + out.writeDouble(p999); } - private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOException { - dos.writeDouble(meter.getRate()); + private static void serializeMeter(DataOutput out, QueryScopeInfo info, String name, Meter meter) throws IOException { + serializeMetricInfo(out, info); + out.writeUTF(name); + out.writeDouble(meter.getRate()); } //------------------------------------------------------------------------- // Deserialization //------------------------------------------------------------------------- + public static class MetricDumpDeserializer { /** * De-serializes metrics from the given byte array and returns them as a list of {@link MetricDump}. * * @param data serialized metrics * @return A list containing the deserialized metrics. - * @throws IOException */ - public List deserialize(byte[] data) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dis = new DataInputStream(bais); - - int numCounters = dis.readInt(); - int numGauges = dis.readInt(); - int numHistograms = dis.readInt(); - int numMeters = dis.readInt(); + public List deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.serializedMetrics, 0, data.serializedMetrics.length); - List metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); - for (int x = 0; x < numCounters; x++) { - metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.debug("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.debug("Failed to deserialize gauge.", e); + } } - for (int x = 0; x < numHistograms; x++) { - metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.debug("Failed to deserialize histogram.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.debug("Failed to deserialize meter.", e); + } } return metrics; } } - private static String deserializeString(DataInputStream dis) throws IOException { - int stringLength = dis.readInt(); - byte[] bytes = new byte[stringLength]; - dis.readFully(bytes); - return new String(bytes); - } - private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException { + private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException { QueryScopeInfo scope = deserializeMetricInfo(dis); - String name = deserializeString(dis); - return new MetricDump.CounterDump(scope, name, dis.readLong()); + String name = dis.readUTF(); + long count = dis.readLong(); + return new MetricDump.CounterDump(scope, name, count); } - private static MetricDump.GaugeDump deserializeGauge(DataInputStream dis) throws IOException { + private static MetricDump.GaugeDump deserializeGauge(DataInputView dis) throws IOException { QueryScopeInfo scope = deserializeMetricInfo(dis); - String name = deserializeString(dis); - String value = deserializeString(dis); + String name = dis.readUTF(); + String value = dis.readUTF(); return new MetricDump.GaugeDump(scope, name, value); } - private static MetricDump.HistogramDump deserializeHistogram(DataInputStream dis) throws IOException { + private static MetricDump.HistogramDump deserializeHistogram(DataInputView dis) throws IOException { QueryScopeInfo info = deserializeMetricInfo(dis); - String name = deserializeString(dis); + String name = dis.readUTF(); long min = dis.readLong(); long max = dis.readLong(); double mean = dis.readDouble(); @@ -258,45 +340,46 @@ public class MetricDumpSerialization { double p98 = dis.readDouble(); double p99 = dis.readDouble(); double p999 = dis.readDouble(); + return new MetricDump.HistogramDump(info, name, min, max, mean, median, stddev, p75, p90, p95, p98, p99, p999); } - private static MetricDump.MeterDump deserializeMeter(DataInputStream dis) throws IOException { + private static MetricDump.MeterDump deserializeMeter(DataInputView dis) throws IOException { QueryScopeInfo info = deserializeMetricInfo(dis); - String name = deserializeString(dis); + String name = dis.readUTF(); double rate = dis.readDouble(); return new MetricDump.MeterDump(info, name, rate); } - private static QueryScopeInfo deserializeMetricInfo(DataInputStream dis) throws IOException { + private static QueryScopeInfo deserializeMetricInfo(DataInput dis) throws IOException { String jobID; String vertexID; int subtaskIndex; - String scope = deserializeString(dis); + String scope = dis.readUTF(); byte cat = dis.readByte(); switch (cat) { case INFO_CATEGORY_JM: return new QueryScopeInfo.JobManagerQueryScopeInfo(scope); case INFO_CATEGORY_TM: - String tmID = deserializeString(dis); + String tmID = dis.readUTF(); return new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID, scope); case INFO_CATEGORY_JOB: - jobID = deserializeString(dis); + jobID = dis.readUTF(); return new QueryScopeInfo.JobQueryScopeInfo(jobID, scope); case INFO_CATEGORY_TASK: - jobID = deserializeString(dis); - vertexID = deserializeString(dis); + jobID = dis.readUTF(); + vertexID = dis.readUTF(); subtaskIndex = dis.readInt(); return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope); case INFO_CATEGORY_OPERATOR: - jobID = deserializeString(dis); - vertexID = deserializeString(dis); + jobID = dis.readUTF(); + vertexID = dis.readUTF(); subtaskIndex = dis.readInt(); - String operatorName = deserializeString(dis); + String operatorName = dis.readUTF(); return new QueryScopeInfo.OperatorQueryScopeInfo(jobID, vertexID, subtaskIndex, operatorName, scope); default: - throw new IOException("sup"); + throw new IOException("Unknown scope category: " + cat); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java index 20bc258..2229139 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java @@ -106,7 +106,7 @@ public class MetricQueryService extends UntypedActor { this.meters.remove(metric); } } else if (message instanceof CreateDump) { - byte[] dump = serializer.serialize(counters, gauges, histograms, meters); + MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); getSender().tell(dump, getSelf()); } else { LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java index 18f03e3..6e3d8f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java @@ -24,9 +24,13 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.metrics.util.TestingHistogram; +import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +46,47 @@ import static org.junit.Assert.fail; public class MetricDumpSerializerTest { @Test + public void testNullGaugeHandling() throws IOException { + MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); + MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer(); + + Map, Tuple2> gauges = new HashMap<>(); + + gauges.put(new Gauge() { + @Override + public Object getValue() { + return null; + } + }, new Tuple2(new QueryScopeInfo.JobManagerQueryScopeInfo("A"), "g")); + + MetricDumpSerialization.MetricSerializationResult output = serializer.serialize( + Collections.>emptyMap(), + gauges, + Collections.>emptyMap(), + Collections.>emptyMap()); + + // no metrics should be serialized + Assert.assertEquals(0, output.serializedMetrics.length); + + List deserialized = deserializer.deserialize(output); + Assert.assertEquals(0, deserialized.size()); + } + + @Test + public void testJavaSerialization() throws IOException { + MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); + + final ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); + final ObjectOutputStream oos = new ObjectOutputStream(bos); + + oos.writeObject(serializer.serialize( + new HashMap>(), + new HashMap, Tuple2>(), + new HashMap>(), + new HashMap>())); + } + + @Test public void testSerialization() throws IOException { MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer(); @@ -92,7 +137,7 @@ public class MetricDumpSerializerTest { gauges.put(g1, new Tuple2(new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1")); histograms.put(h1, new Tuple2(new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1")); - byte[] serialized = serializer.serialize(counters, gauges, histograms, meters); + MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(counters, gauges, histograms, meters); List deserialized = deserializer.deserialize(serialized); // ===== Counters ============================================================================================== http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java index 0104e3e..2243495 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java @@ -84,13 +84,6 @@ public class MetricQueryServiceTest extends TestLogger { MetricQueryService.notifyOfAddedMetric(serviceActor, g, "gauge", tm); MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm); MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm); - - // these metrics will be removed *after* the first query - MetricQueryService.notifyOfRemovedMetric(serviceActor, c); - MetricQueryService.notifyOfRemovedMetric(serviceActor, g); - MetricQueryService.notifyOfRemovedMetric(serviceActor, h); - MetricQueryService.notifyOfRemovedMetric(serviceActor, m); - serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef); synchronized (testActor.lock) { if (testActor.message == null) { @@ -98,9 +91,14 @@ public class MetricQueryServiceTest extends TestLogger { } } - byte[] dump = (byte[]) testActor.message; + MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message; testActor.message = null; - assertTrue(dump.length > 0); + assertTrue(dump.serializedMetrics.length > 0); + + MetricQueryService.notifyOfRemovedMetric(serviceActor, c); + MetricQueryService.notifyOfRemovedMetric(serviceActor, g); + MetricQueryService.notifyOfRemovedMetric(serviceActor, h); + MetricQueryService.notifyOfRemovedMetric(serviceActor, m); serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef); synchronized (testActor.lock) { @@ -109,12 +107,9 @@ public class MetricQueryServiceTest extends TestLogger { } } - byte[] emptyDump = (byte[]) testActor.message; + MetricDumpSerialization.MetricSerializationResult emptyDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message; testActor.message = null; - assertEquals(16, emptyDump.length); - for (int x = 0; x < 16; x++) { - assertEquals(0, emptyDump[x]); - } + assertEquals(0, emptyDump.serializedMetrics.length); s.shutdown(); }