flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [6/9] flink git commit: [FLINK-1464] [java-api] Added ResultTypeQueryable interface implementation to TypeSerializerInputFormat.
Date Tue, 03 Feb 2015 12:23:45 GMT
[FLINK-1464] [java-api] Added ResultTypeQueryable interface implementation to TypeSerializerInputFormat.

This closes #349


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3f6c9ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3f6c9ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3f6c9ba

Branch: refs/heads/master
Commit: e3f6c9ba69a3e545fdd8f18b7b652fa111ade93e
Parents: 9906cba
Author: Alexander Alexandrov <alexander.s.alexandrov@gmail.com>
Authored: Thu Jan 29 15:41:01 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Feb 3 12:20:08 2015 +0100

----------------------------------------------------------------------
 .../api/java/io/TypeSerializerInputFormat.java  | 27 +++++++++++++-------
 .../api/java/io/TypeSerializerFormatTest.java   |  8 +++---
 2 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3f6c9ba/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
index 0fca3e2..8e92c27 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *	 http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,7 +19,9 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.api.common.io.BinaryInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.core.memory.DataInputView;
 
 import java.io.IOException;
@@ -28,23 +30,30 @@ import java.io.IOException;
  * Reads elements by deserializing them with a given type serializer.
  * @param <T>
  */
-public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> {
+public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> implements
ResultTypeQueryable<T> {
 
 	private static final long serialVersionUID = 2123068581665107480L;
 
+	private transient TypeInformation<T> resultType;
+
 	private TypeSerializer<T> serializer;
 
-	public TypeSerializerInputFormat(TypeSerializer<T> serializer){
-		this.serializer = serializer;
+	public TypeSerializerInputFormat(TypeInformation<T> resultType) {
+		this.resultType = resultType;
+		this.serializer = resultType.createSerializer();
 	}
 
 	@Override
 	protected T deserialize(T reuse, DataInputView dataInput) throws IOException {
-		if(serializer == null){
-			throw new RuntimeException("TypeSerializerInputFormat requires a type serializer to "
+
-					"be defined.");
-		}
-
 		return serializer.deserialize(reuse, dataInput);
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Typing
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return resultType;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3f6c9ba/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
index ef271e7..7dd1135 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
@@ -40,6 +40,8 @@ import java.io.IOException;
 @RunWith(Parameterized.class)
 public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<Integer,
String>> {
 
+	TypeInformation<Tuple2<Integer, String>> resultType = TypeExtractor.getForObject(getRecord(0));
+
 	private TypeSerializer<Tuple2<Integer, String>> serializer;
 
 	private BlockInfo block;
@@ -47,9 +49,9 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<In
 	public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism)
{
 		super(numberOfTuples, blockSize, degreeOfParallelism);
 
-		TypeInformation<Tuple2<Integer, String>> tti = TypeExtractor.getForObject(getRecord(0));
+        resultType = TypeExtractor.getForObject(getRecord(0));
 
-		serializer = tti.createSerializer();
+		serializer = resultType.createSerializer();
 	}
 
 	@Before
@@ -63,7 +65,7 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<In
 		configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
 
 		final TypeSerializerInputFormat<Tuple2<Integer, String>> inputFormat = new
-				TypeSerializerInputFormat<Tuple2<Integer, String>>(serializer);
+				TypeSerializerInputFormat<Tuple2<Integer, String>>(resultType);
 		inputFormat.setFilePath(this.tempFile.toURI().toString());
 
 		inputFormat.configure(configuration);


Mime
View raw message