Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A76417D6D for ; Tue, 3 Feb 2015 12:23:48 +0000 (UTC) Received: (qmail 71701 invoked by uid 500); 3 Feb 2015 12:23:41 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 71600 invoked by uid 500); 3 Feb 2015 12:23:41 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 71445 invoked by uid 99); 3 Feb 2015 12:23:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Feb 2015 12:23:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A938DE0496; Tue, 3 Feb 2015 12:23:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Tue, 03 Feb 2015 12:23:45 -0000 Message-Id: In-Reply-To: <60d51516b942432ab771cd609afa9245@git.apache.org> References: <60d51516b942432ab771cd609afa9245@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/9] flink git commit: [FLINK-1464] [java-api] Added ResultTypeQueryable interface implementation to TypeSerializerInputFormat. [FLINK-1464] [java-api] Added ResultTypeQueryable interface implementation to TypeSerializerInputFormat. This closes #349 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3f6c9ba Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3f6c9ba Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3f6c9ba Branch: refs/heads/master Commit: e3f6c9ba69a3e545fdd8f18b7b652fa111ade93e Parents: 9906cba Author: Alexander Alexandrov Authored: Thu Jan 29 15:41:01 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 3 12:20:08 2015 +0100 ---------------------------------------------------------------------- .../api/java/io/TypeSerializerInputFormat.java | 27 +++++++++++++------- .../api/java/io/TypeSerializerFormatTest.java | 8 +++--- 2 files changed, 23 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e3f6c9ba/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java index 0fca3e2..8e92c27 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,7 +19,9 @@ package org.apache.flink.api.java.io; import org.apache.flink.api.common.io.BinaryInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.core.memory.DataInputView; import java.io.IOException; @@ -28,23 +30,30 @@ import java.io.IOException; * Reads elements by deserializing them with a given type serializer. * @param */ -public class TypeSerializerInputFormat extends BinaryInputFormat { +public class TypeSerializerInputFormat extends BinaryInputFormat implements ResultTypeQueryable { private static final long serialVersionUID = 2123068581665107480L; + private transient TypeInformation resultType; + private TypeSerializer serializer; - public TypeSerializerInputFormat(TypeSerializer serializer){ - this.serializer = serializer; + public TypeSerializerInputFormat(TypeInformation resultType) { + this.resultType = resultType; + this.serializer = resultType.createSerializer(); } @Override protected T deserialize(T reuse, DataInputView dataInput) throws IOException { - if(serializer == null){ - throw new RuntimeException("TypeSerializerInputFormat requires a type serializer to " + - "be defined."); - } - return serializer.deserialize(reuse, dataInput); } + + // -------------------------------------------------------------------------------------------- + // Typing + // -------------------------------------------------------------------------------------------- + + @Override + public TypeInformation getProducedType() { + return resultType; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e3f6c9ba/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java index ef271e7..7dd1135 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java @@ -40,6 +40,8 @@ import java.io.IOException; @RunWith(Parameterized.class) public class TypeSerializerFormatTest extends SequentialFormatTestBase> { + TypeInformation> resultType = TypeExtractor.getForObject(getRecord(0)); + private TypeSerializer> serializer; private BlockInfo block; @@ -47,9 +49,9 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase> tti = TypeExtractor.getForObject(getRecord(0)); + resultType = TypeExtractor.getForObject(getRecord(0)); - serializer = tti.createSerializer(); + serializer = resultType.createSerializer(); } @Before @@ -63,7 +65,7 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase> inputFormat = new - TypeSerializerInputFormat>(serializer); + TypeSerializerInputFormat>(resultType); inputFormat.setFilePath(this.tempFile.toURI().toString()); inputFormat.configure(configuration);