flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [6/7] flink git commit: [FLINK-8805][runtime] Optimize EvenSerializer.isEvent method
Date Wed, 28 Feb 2018 16:17:52 GMT
[FLINK-8805][runtime] Optimize EvenSerializer.isEvent method

For example, previously if the method was used to check for EndOfPartitionEvent
and the Buffer contained huge custom event, the even had to be deserialized before
performing the actual check. Now we are quickly entering the correct if/else branch
and doing full costly deserialization only if we have to.

Other calls to isEvent() then checking against EndOfPartitionEvent were not used.

(cherry picked from commit 767027f)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61a34a69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61a34a69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61a34a69

Branch: refs/heads/release-1.5
Commit: 61a34a691e7d5233f18ac72a1ab8fb09b53c4753
Parents: 8eb6a30
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Mon Feb 26 16:13:06 2018 +0100
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Wed Feb 28 17:17:04 2018 +0100

----------------------------------------------------------------------
 .../api/serialization/EventSerializer.java      | 57 ++++++--------------
 .../io/network/netty/PartitionRequestQueue.java |  3 +-
 .../api/serialization/EventSerializerTest.java  | 39 +++++++++-----
 3 files changed, 42 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61a34a69/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index d7fb7e8..8d76bb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -101,16 +101,15 @@ public class EventSerializer {
 	}
 
 	/**
-	 * Identifies whether the given buffer encodes the given event.
+	 * Identifies whether the given buffer encodes the given event. Custom events are not supported.
 	 *
 	 * <p><strong>Pre-condition</strong>: This buffer must encode some event!</p>
 	 *
 	 * @param buffer the buffer to peak into
 	 * @param eventClass the expected class of the event type
-	 * @param classLoader the class loader to use for custom event classes
 	 * @return whether the event class of the <tt>buffer</tt> matches the given
<tt>eventClass</tt>
 	 */
-	private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoader
classLoader) throws IOException {
+	private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass) throws IOException
{
 		if (buffer.remaining() < 4) {
 			throw new IOException("Incomplete event");
 		}
@@ -122,38 +121,16 @@ public class EventSerializer {
 		try {
 			int type = buffer.getInt();
 
-			switch (type) {
-				case END_OF_PARTITION_EVENT:
-					return eventClass.equals(EndOfPartitionEvent.class);
-				case CHECKPOINT_BARRIER_EVENT:
-					return eventClass.equals(CheckpointBarrier.class);
-				case END_OF_SUPERSTEP_EVENT:
-					return eventClass.equals(EndOfSuperstepEvent.class);
-				case CANCEL_CHECKPOINT_MARKER_EVENT:
-					return eventClass.equals(CancelCheckpointMarker.class);
-				case OTHER_EVENT:
-					try {
-						final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
-						final String className = deserializer.readUTF();
-
-						final Class<? extends AbstractEvent> clazz;
-						try {
-							clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
-						}
-						catch (ClassNotFoundException e) {
-							throw new IOException("Could not load event class '" + className + "'.", e);
-						}
-						catch (ClassCastException e) {
-							throw new IOException("The class '" + className + "' is not a valid subclass of '"
-								+ AbstractEvent.class.getName() + "'.", e);
-						}
-						return eventClass.equals(clazz);
-					}
-					catch (Exception e) {
-						throw new IOException("Error while deserializing or instantiating event.", e);
-					}
-				default:
-					throw new IOException("Corrupt byte stream for event");
+			if (eventClass.equals(EndOfPartitionEvent.class)) {
+				return type == END_OF_PARTITION_EVENT;
+			} else if (eventClass.equals(CheckpointBarrier.class)) {
+				return type == CHECKPOINT_BARRIER_EVENT;
+			} else if (eventClass.equals(EndOfSuperstepEvent.class)) {
+				return type == END_OF_SUPERSTEP_EVENT;
+			} else if (eventClass.equals(CancelCheckpointMarker.class)) {
+				return type == CANCEL_CHECKPOINT_MARKER_EVENT;
+			} else {
+				throw new UnsupportedOperationException("Unsupported eventClass = " + eventClass);
 			}
 		}
 		finally {
@@ -314,17 +291,13 @@ public class EventSerializer {
 	}
 
 	/**
-	 * Identifies whether the given buffer encodes the given event.
+	 * Identifies whether the given buffer encodes the given event. Custom events are not supported.
 	 *
 	 * @param buffer the buffer to peak into
 	 * @param eventClass the expected class of the event type
-	 * @param classLoader the class loader to use for custom event classes
 	 * @return whether the event class of the <tt>buffer</tt> matches the given
<tt>eventClass</tt>
 	 */
-	public static boolean isEvent(final Buffer buffer,
-		final Class<?> eventClass,
-		final ClassLoader classLoader) throws IOException {
-		return !buffer.isBuffer() &&
-			isEvent(buffer.getNioBufferReadable(), eventClass, classLoader);
+	public static boolean isEvent(Buffer buffer, Class<?> eventClass) throws IOException
{
+		return !buffer.isBuffer() && isEvent(buffer.getNioBufferReadable(), eventClass);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/61a34a69/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 8d43815..d63a88e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -287,8 +287,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 	}
 
 	private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException {
-		return EventSerializer.isEvent(buffer, EndOfPartitionEvent.class,
-			getClass().getClassLoader());
+		return EventSerializer.isEvent(buffer, EndOfPartitionEvent.class);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/61a34a69/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index de5f4a8..c00fea7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -33,11 +33,13 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link EventSerializer}.
@@ -95,7 +97,7 @@ public class EventSerializerTest {
 	}
 
 	/**
-	 * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)}
+	 * Tests {@link EventSerializer#isEvent(Buffer, Class)}
 	 * whether it peaks into the buffer only, i.e. after the call, the buffer
 	 * is still de-serializable.
 	 */
@@ -106,8 +108,7 @@ public class EventSerializerTest {
 		try {
 			final ClassLoader cl = getClass().getClassLoader();
 			assertTrue(
-				EventSerializer
-					.isEvent(serializedEvent, EndOfPartitionEvent.class, cl));
+				EventSerializer.isEvent(serializedEvent, EndOfPartitionEvent.class));
 			EndOfPartitionEvent event = (EndOfPartitionEvent) EventSerializer
 				.fromBuffer(serializedEvent, cl);
 			assertEquals(EndOfPartitionEvent.INSTANCE, event);
@@ -117,7 +118,7 @@ public class EventSerializerTest {
 	}
 
 	/**
-	 * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns
+	 * Tests {@link EventSerializer#isEvent(Buffer, Class)} returns
 	 * the correct answer for various encoded event buffers.
 	 */
 	@Test
@@ -130,12 +131,25 @@ public class EventSerializerTest {
 			new CancelCheckpointMarker(287087987329842L)
 		};
 
+		Class[] expectedClasses = Arrays.stream(events)
+			.map(AbstractEvent::getClass)
+			.toArray(Class[]::new);
+
 		for (AbstractEvent evt : events) {
-			for (AbstractEvent evt2 : events) {
-				if (evt == evt2) {
-					assertTrue(checkIsEvent(evt, evt2.getClass()));
+			for (Class<?> expectedClass: expectedClasses) {
+				if (expectedClass.equals(TestTaskEvent.class)) {
+					try {
+						checkIsEvent(evt, expectedClass);
+						fail("This should fail");
+					}
+					catch (UnsupportedOperationException ex) {
+						// expected
+					}
+				}
+				else if (evt.getClass().equals(expectedClass)) {
+					assertTrue(checkIsEvent(evt, expectedClass));
 				} else {
-					assertFalse(checkIsEvent(evt, evt2.getClass()));
+					assertFalse(checkIsEvent(evt, expectedClass));
 				}
 			}
 		}
@@ -143,23 +157,22 @@ public class EventSerializerTest {
 
 	/**
 	 * Returns the result of
-	 * {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} on a buffer
+	 * {@link EventSerializer#isEvent(Buffer, Class)} on a buffer
 	 * that encodes the given <tt>event</tt>.
 	 *
 	 * @param event the event to encode
 	 * @param eventClass the event class to check against
 	 *
-	 * @return whether {@link EventSerializer#isEvent(ByteBuffer, Class, ClassLoader)}
+	 * @return whether {@link EventSerializer#isEvent(ByteBuffer, Class)}
 	 * 		thinks the encoded buffer matches the class
 	 */
 	private boolean checkIsEvent(
 			AbstractEvent event,
-			Class<? extends AbstractEvent> eventClass) throws IOException {
+			Class<?> eventClass) throws IOException {
 
 		final Buffer serializedEvent = EventSerializer.toBuffer(event);
 		try {
-			final ClassLoader cl = getClass().getClassLoader();
-			return EventSerializer.isEvent(serializedEvent, eventClass, cl);
+			return EventSerializer.isEvent(serializedEvent, eventClass);
 		} finally {
 			serializedEvent.recycleBuffer();
 		}


Mime
View raw message