flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/4] flink git commit: [FLINK-8730][REST] JSON serialize entire SerializedThrowable
Date Wed, 28 Feb 2018 12:27:32 GMT
Repository: flink
Updated Branches:
  refs/heads/master f89fce83e -> e8d168509


[FLINK-8730][REST] JSON serialize entire SerializedThrowable

Do not only serialize the serialized exception but the entire
SerializedThrowable object. This makes it possible to throw the
SerializedThrowable itself without deserializing it.

This closes #5546.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/970d94e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/970d94e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/970d94e8

Branch: refs/heads/master
Commit: 970d94e821d2341d200baf692ee2fbbc85e395b5
Parents: 71baa27
Author: gyao <gary@data-artisans.com>
Authored: Wed Feb 21 16:02:01 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Feb 28 13:27:05 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/util/SerializedThrowable.java  | 14 ----
 .../json/SerializedThrowableDeserializer.java   | 15 +++--
 .../json/SerializedThrowableSerializer.java     |  7 +-
 .../json/SerializedThrowableSerializerTest.java | 71 ++++++++++++++++++++
 4 files changed, 83 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/970d94e8/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
index de6358c..13f8d77 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
@@ -25,8 +25,6 @@ import java.lang.ref.WeakReference;
 import java.util.HashSet;
 import java.util.Set;
 
-import static java.util.Objects.requireNonNull;
-
 /**
  * Utility class for dealing with user-defined Throwable types that are serialized (for
  * example during RPC/Actor communication), but cannot be resolved with the default
@@ -64,18 +62,6 @@ public class SerializedThrowable extends Exception implements Serializable
{
 		this(exception, new HashSet<>());
 	}
 
-	/**
-	 * Creates a new SerializedThrowable from a serialized exception provided as a byte array.
-	 */
-	public SerializedThrowable(
-			final byte[] serializedException,
-			final String originalErrorClassName,
-			final String fullStringifiedStackTrace) {
-		this.serializedException = requireNonNull(serializedException);
-		this.originalErrorClassName = requireNonNull(originalErrorClassName);
-		this.fullStringifiedStackTrace = requireNonNull(fullStringifiedStackTrace);
-	}
-
 	private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
 		super(getMessageOrError(exception));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/970d94e8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
index 3217cce..d0f71ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages.json;
 
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedThrowable;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -27,9 +28,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 
 import java.io.IOException;
 
-import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_CLASS;
-import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_EXCEPTION;
-import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_STACK_TRACE;
+import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_THROWABLE;
 
 /**
  * JSON deserializer for {@link SerializedThrowable}.
@@ -48,10 +47,12 @@ public class SerializedThrowableDeserializer extends StdDeserializer<SerializedT
 			final DeserializationContext ctxt) throws IOException {
 		final JsonNode root = p.readValueAsTree();
 
-		final String exceptionClassName = root.get(FIELD_NAME_CLASS).asText();
-		final String stackTrace = root.get(FIELD_NAME_STACK_TRACE).asText();
-		final byte[] serializedException = root.get(FIELD_NAME_SERIALIZED_EXCEPTION).binaryValue();
-		return new SerializedThrowable(serializedException, exceptionClassName, stackTrace);
+		final byte[] serializedException = root.get(FIELD_NAME_SERIALIZED_THROWABLE).binaryValue();
+		try {
+			return InstantiationUtil.deserializeObject(serializedException, ClassLoader.getSystemClassLoader());
+		} catch (ClassNotFoundException e) {
+			throw new IOException("Failed to deserialize " + SerializedThrowable.class.getCanonicalName(),
e);
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/970d94e8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
index cb921a9..51f111f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages.json;
 
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedThrowable;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -33,12 +34,12 @@ public class SerializedThrowableSerializer extends StdSerializer<SerializedThrow
 
 	private static final long serialVersionUID = 1L;
 
-	static final String FIELD_NAME_SERIALIZED_EXCEPTION = "serialized-exception";
-
 	static final String FIELD_NAME_CLASS = "class";
 
 	static final String FIELD_NAME_STACK_TRACE = "stack-trace";
 
+	static final String FIELD_NAME_SERIALIZED_THROWABLE = "serialized-throwable";
+
 	public SerializedThrowableSerializer() {
 		super(SerializedThrowable.class);
 	}
@@ -48,7 +49,7 @@ public class SerializedThrowableSerializer extends StdSerializer<SerializedThrow
 		gen.writeStartObject();
 		gen.writeStringField(FIELD_NAME_CLASS, value.getOriginalErrorClassName());
 		gen.writeStringField(FIELD_NAME_STACK_TRACE, value.getFullStringifiedStackTrace());
-		gen.writeBinaryField(FIELD_NAME_SERIALIZED_EXCEPTION, value.getSerializedException());
+		gen.writeBinaryField(FIELD_NAME_SERIALIZED_THROWABLE, InstantiationUtil.serializeObject(value));
 		gen.writeEndObject();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/970d94e8/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
new file mode 100644
index 0000000..6274e62
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link SerializedThrowableSerializer} and {@link SerializedThrowableDeserializer}.
+ */
+public class SerializedThrowableSerializerTest extends TestLogger {
+
+	private ObjectMapper objectMapper = new ObjectMapper();
+
+	@Before
+	public void setUp() {
+		final SimpleModule simpleModule = new SimpleModule();
+		simpleModule.addDeserializer(SerializedThrowable.class, new SerializedThrowableDeserializer());
+		simpleModule.addSerializer(SerializedThrowable.class, new SerializedThrowableSerializer());
+
+		objectMapper = new ObjectMapper();
+		objectMapper.registerModule(simpleModule);
+	}
+
+	@Test
+	public void testSerializationDeserialization() throws Exception {
+		final String lastExceptionMessage = "message";
+		final String causeMessage = "cause";
+
+		final SerializedThrowable serializedThrowable = new SerializedThrowable(
+			new RuntimeException(lastExceptionMessage,
+				new RuntimeException(causeMessage)));
+		final String json = objectMapper.writeValueAsString(serializedThrowable);
+		final SerializedThrowable deserializedSerializedThrowable = objectMapper.readValue(
+			json,
+			SerializedThrowable.class);
+
+		assertThat(deserializedSerializedThrowable.getMessage(), equalTo(lastExceptionMessage));
+		assertThat(deserializedSerializedThrowable.getFullStringifiedStackTrace(), equalTo(serializedThrowable.getFullStringifiedStackTrace()));
+
+		assertThat(deserializedSerializedThrowable.getCause().getMessage(), equalTo(causeMessage));
+		assertThat(deserializedSerializedThrowable.getCause(), instanceOf(SerializedThrowable.class));
+	}
+
+}


Mime
View raw message