flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [01/10] flink git commit: [FLINK-2386] [kafka] Add new Kafka Consumer for Flink
Date Wed, 26 Aug 2015 19:13:51 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.9 3cdbb8014 -> 9f2f6b7bb


http://git-wip-us.apache.org/repos/asf/flink/blob/940a7c8a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
new file mode 100644
index 0000000..720a01e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TypeInformationSerializationSchemaTest {
+
+	@Test
+	public void testDeSerialization() {
+		try {
+			TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class);
+
+			TypeInformationSerializationSchema<MyPOJO> schema =
+					new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
+
+			MyPOJO[] types = {
+					new MyPOJO(72, new Date(763784523L), new Date(88234L)),
+					new MyPOJO(-1, new Date(11111111111111L)),
+					new MyPOJO(42),
+					new MyPOJO(17, new Date(222763784523L))
+			};
+
+			for (MyPOJO val : types) {
+				byte[] serialized = schema.serialize(val);
+				MyPOJO deser = schema.deserialize(serialized);
+				assertEquals(val, deser);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSerializability() {
+		try {
+			TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class);
+			TypeInformationSerializationSchema<MyPOJO> schema =
+					new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
+
+			// this needs to succeed
+			CommonTestUtils.createCopySerializable(schema);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test data types
+	// ------------------------------------------------------------------------
+
+	public static class MyPOJO {
+
+		public int aField;
+		public List<Date> aList;
+
+		public MyPOJO() {}
+
+		public MyPOJO(int iVal, Date... dates) {
+			this.aField = iVal;
+			this.aList = new ArrayList<Date>(Arrays.asList(dates));
+		}
+
+		@Override
+		public int hashCode() {
+			return aField;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof MyPOJO) {
+				MyPOJO that = (MyPOJO) obj;
+				return this.aField == that.aField && (this.aList == null ?
+						that.aList == null :
+						that.aList != null && this.aList.equals(that.aList));
+			}
+			return super.equals(obj);
+		}
+
+		@Override
+		public String toString() {
+			return String.format("MyPOJO " + aField + " " + aList);
+		}
+	}
+}
\ No newline at end of file


Mime
View raw message