flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [6/7] flink git commit: [hotfix] [py] Code cleanup - SerializationUtils
Date Thu, 04 May 2017 12:44:24 GMT
[hotfix] [py] Code cleanup - SerializationUtils


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/154bb3bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/154bb3bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/154bb3bb

Branch: refs/heads/master
Commit: 154bb3bb09dcc6dbc3dd7e1a6984ff5afd41431b
Parents: 3695062
Author: zentol <chesnay@apache.org>
Authored: Thu Apr 20 13:27:15 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 .../api/streaming/plan/PythonPlanSender.java    |  1 -
 .../api/streaming/util/SerializationUtils.java  | 67 ++++++++++++--------
 2 files changed, 40 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/154bb3bb/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
index 8b8366a..331c67e 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
@@ -28,7 +28,6 @@ public class PythonPlanSender {
 		this.output = new DataOutputStream(output);
 	}
 
-	@SuppressWarnings("unchecked")
 	public void sendRecord(Object record) throws IOException {
 		byte[] data = SerializationUtils.getSerializer(record).serialize(record);
 		output.write(data);

http://git-wip-us.apache.org/repos/asf/flink/blob/154bb3bb/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
index f228327..721746a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
@@ -20,21 +20,22 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
 public class SerializationUtils {
-	public static final byte TYPE_BOOLEAN = (byte) 34;
-	public static final byte TYPE_BYTE = (byte) 33;
-	public static final byte TYPE_INTEGER = (byte) 32;
-	public static final byte TYPE_LONG = (byte) 31;
-	public static final byte TYPE_DOUBLE = (byte) 30;
-	public static final byte TYPE_FLOAT = (byte) 29;
-	public static final byte TYPE_STRING = (byte) 28;
-	public static final byte TYPE_BYTES = (byte) 27;
-	public static final byte TYPE_NULL = (byte) 26;
+	public static final byte TYPE_BOOLEAN = 34;
+	public static final byte TYPE_BYTE = 33;
+	public static final byte TYPE_INTEGER = 32;
+	public static final byte TYPE_LONG = 31;
+	public static final byte TYPE_DOUBLE = 30;
+	public static final byte TYPE_FLOAT = 29;
+	public static final byte TYPE_STRING = 28;
+	public static final byte TYPE_BYTES = 27;
+	public static final byte TYPE_NULL = 26;
 
 	private enum SupportedTypes {
 		TUPLE, BOOLEAN, BYTE, BYTES, INTEGER, LONG, FLOAT, DOUBLE, STRING, NULL, CUSTOMTYPEWRAPPER
 	}
 
-	public static Serializer getSerializer(Object value) {
+	@SuppressWarnings("unchecked")
+	public static <IN> Serializer<IN> getSerializer(IN value) {
 		String className = value.getClass().getSimpleName().toUpperCase();
 		if (className.startsWith("TUPLE")) {
 			className = "TUPLE";
@@ -43,35 +44,48 @@ public class SerializationUtils {
 			className = "BYTES";
 		}
 		SupportedTypes type = SupportedTypes.valueOf(className);
+		Serializer<?> serializer;
 		switch (type) {
 			case TUPLE:
-				return new TupleSerializer((Tuple) value);
+				serializer = new TupleSerializer((Tuple) value);
+				break;
 			case BOOLEAN:
-				return new BooleanSerializer();
+				serializer = new BooleanSerializer();
+				break;
 			case BYTE:
-				return new ByteSerializer();
+				serializer = new ByteSerializer();
+				break;
 			case BYTES:
-				return new BytesSerializer();
+				serializer = new BytesSerializer();
+				break;
 			case INTEGER:
-				return new IntSerializer();
+				serializer = new IntSerializer();
+				break;
 			case LONG:
-				return new LongSerializer();
+				serializer = new LongSerializer();
+				break;
 			case STRING:
-				return new StringSerializer();
+				serializer = new StringSerializer();
+				break;
 			case FLOAT:
-				return new FloatSerializer();
+				serializer = new FloatSerializer();
+				break;
 			case DOUBLE:
-				return new DoubleSerializer();
+				serializer = new DoubleSerializer();
+				break;
 			case NULL:
-				return new NullSerializer();
+				serializer = new NullSerializer();
+				break;
 			case CUSTOMTYPEWRAPPER:
-				return new CustomTypeWrapperSerializer((CustomTypeWrapper) value);
+				serializer = new CustomTypeWrapperSerializer((CustomTypeWrapper) value);
+				break;
 			default:
 				throw new IllegalArgumentException("Unsupported Type encountered: " + type);
 		}
+		return (Serializer<IN>) serializer;
 	}
 
-	public static abstract class Serializer<IN> {
+	public abstract static class Serializer<IN> {
 		private byte[] typeInfo = null;
 
 		public byte[] serialize(IN value) {
@@ -130,7 +144,7 @@ public class SerializationUtils {
 	public static class BooleanSerializer extends Serializer<Boolean> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(Boolean value) {
-			return new byte[]{value ? (byte) 1 : (byte) 0};
+			return new byte[]{(byte) (value ? 1 : 0)};
 		}
 
 		@Override
@@ -237,7 +251,7 @@ public class SerializationUtils {
 	}
 
 	public static class TupleSerializer extends Serializer<Tuple> {
-		private final Serializer[] serializer;
+		private final Serializer<Object>[] serializer;
 
 		public TupleSerializer(Tuple value) {
 			serializer = new Serializer[value.getArity()];
@@ -246,7 +260,6 @@ public class SerializationUtils {
 			}
 		}
 
-		@SuppressWarnings("unchecked")
 		@Override
 		public byte[] serializeWithoutTypeInfo(Tuple value) {
 			ArrayList<byte[]> bits = new ArrayList<>();
@@ -269,7 +282,7 @@ public class SerializationUtils {
 		@Override
 		public void putTypeInfo(ByteBuffer buffer) {
 			buffer.put((byte) serializer.length);
-			for (Serializer s : serializer) {
+			for (Serializer<Object> s : serializer) {
 				s.putTypeInfo(buffer);
 			}
 		}
@@ -277,7 +290,7 @@ public class SerializationUtils {
 		@Override
 		public int getTypeInfoSize() {
 			int size = 1;
-			for (Serializer s : serializer) {
+			for (Serializer<Object> s : serializer) {
 				size += s.getTypeInfoSize();
 			}
 			return size;


Mime
View raw message