Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A368F5A4 for ; Sun, 14 Dec 2014 22:26:58 +0000 (UTC) Received: (qmail 50176 invoked by uid 500); 14 Dec 2014 22:26:58 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 50153 invoked by uid 500); 14 Dec 2014 22:26:57 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 50144 invoked by uid 99); 14 Dec 2014 22:26:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 14 Dec 2014 22:26:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 14 Dec 2014 22:26:33 +0000 Received: (qmail 50063 invoked by uid 99); 14 Dec 2014 22:26:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 14 Dec 2014 22:26:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D6D8E955F32; Sun, 14 Dec 2014 22:26:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Sun, 14 Dec 2014 22:26:33 -0000 Message-Id: <2546bb336860411db16f53335685921c@git.apache.org> In-Reply-To: <7cc434c62aa24c2eae9e809c7b70f594@git.apache.org> References: <7cc434c62aa24c2eae9e809c7b70f594@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] incubator-flink git commit: [FLINK-1305] [FLINK-1304] Test for HadoopInputWrapper and NullWritable support X-Virus-Checked: Checked by ClamAV on apache.org [FLINK-1305] [FLINK-1304] Test for HadoopInputWrapper and NullWritable support This closes #252 Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/13968cd4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/13968cd4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/13968cd4 Branch: refs/heads/master Commit: 13968cd4de446b4f565a094554380eb8559b6cf9 Parents: de7f478 Author: Robert Metzger Authored: Fri Dec 5 19:19:29 2014 +0100 Committer: Stephan Ewen Committed: Sun Dec 14 16:38:32 2014 +0100 ---------------------------------------------------------------------- .../mapred/HadoopIOFormatsITCase.java | 230 +++++++++++++++++++ flink-java/pom.xml | 35 +++ .../typeutils/runtime/WritableSerializer.java | 4 + .../java/org/apache/hadoop/io/Writable.java | 105 --------- 4 files changed, 269 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java new file mode 100644 index 0000000..6ef0f2e --- /dev/null +++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat; +import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.LinkedList; + +@RunWith(Parameterized.class) +public class HadoopIOFormatsITCase extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 2; + + private int curProgId = config.getInteger("ProgramId", -1); + private String[] resultPath; + private String[] expectedResult; + private String sequenceFileInPath; + private String sequenceFileInPathNull; + + public HadoopIOFormatsITCase(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") }; + + File sequenceFile = createAndRegisterTempFile("seqFile"); + sequenceFileInPath = sequenceFile.toURI().toString(); + + // Create a sequence file + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf); + Path path = new Path(sequenceFile.getAbsolutePath()); + + // ------------------ Long / Text Key Value pair: ------------ + int kvCount = 4; + + LongWritable key = new LongWritable(); + Text value = new Text(); + SequenceFile.Writer writer = null; + try { + writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass()); + for (int i = 0; i < kvCount; i ++) { + if(i == 1) { + // write key = 0 a bit more often. + for(int a = 0;a < 15; a++) { + key.set(i); + value.set(i+" - somestring"); + writer.append(key, value); + } + } + key.set(i); + value.set(i+" - somestring"); + writer.append(key, value); + } + } finally { + IOUtils.closeStream(writer); + } + + + // ------------------ Long / Text Key Value pair: ------------ + + File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey"); + sequenceFileInPathNull = sequenceFileNull.toURI().toString(); + path = new Path(sequenceFileInPathNull); + + LongWritable value1 = new LongWritable(); + SequenceFile.Writer writer1 = null; + try { + writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass()); + for (int i = 0; i < kvCount; i ++) { + value1.set(i); + writer1.append(NullWritable.get(), value1); + } + } finally { + IOUtils.closeStream(writer1); + } + } + + @Override + protected void testProgram() throws Exception { + expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull); + } + + @Override + protected void postSubmit() throws Exception { + for(int i = 0; i < resultPath.length; i++) { + compareResultsByLinesInMemory(expectedResult[i], resultPath[i]); + } + } + + @Parameters + public static Collection getConfigurations() throws FileNotFoundException, IOException { + + LinkedList tConfigs = new LinkedList(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + public static class HadoopIOFormatPrograms { + + public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception { + + switch(progId) { + case 1: { + /** + * Test sequence file, including a key access. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + SequenceFileInputFormat sfif = new SequenceFileInputFormat(); + JobConf hdconf = new JobConf(); + SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath)); + HadoopInputFormat hif = new HadoopInputFormat(sfif, LongWritable.class, Text.class, hdconf); + DataSet> ds = env.createInput(hif); + DataSet> sumed = ds.map(new MapFunction, Tuple2>() { + @Override + public Tuple2 map(Tuple2 value) throws Exception { + return new Tuple2(value.f0.get(), value.f1); + } + }).sum(0); + sumed.writeAsText(resultPath[0]); + DataSet res = ds.distinct(0).map(new MapFunction, String>() { + @Override + public String map(Tuple2 value) throws Exception { + return value.f1 + " - " + value.f0.get(); + } + }); + res.writeAsText(resultPath[1]); + env.execute(); + + // return expected result + return new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" + + "1 - somestring - 1\n" + + "2 - somestring - 2\n" + + "3 - somestring - 3\n"}; + + } + case 2: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + SequenceFileInputFormat sfif = new SequenceFileInputFormat(); + JobConf hdconf = new JobConf(); + SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull)); + HadoopInputFormat hif = new HadoopInputFormat(sfif, NullWritable.class, LongWritable.class, hdconf); + DataSet> ds = env.createInput(hif); + DataSet> res = ds.map(new MapFunction, Tuple2>() { + @Override + public Tuple2 map(Tuple2 value) throws Exception { + return new Tuple2(null, value.f1.get()); + } + }); + DataSet> res1 = res.groupBy(1).sum(1); + res1.writeAsText(resultPath[1]); + res.writeAsText(resultPath[0]); + env.execute(); + + // return expected result + return new String [] {"(null,2)\n" + + "(null,0)\n" + + "(null,1)\n" + + "(null,3)", + "(null,0)\n" + + "(null,1)\n" + + "(null,2)\n" + + "(null,3)"}; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java/pom.xml b/flink-java/pom.xml index 4ae2cc3..21a0b68 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -86,4 +86,39 @@ under the License. + + + + + hadoop-1 + + + + hadoop.profile1 + + + + + + org.apache.hadoop + hadoop-core + + + + + hadoop-2 + + + + !hadoop.profile + + + + + org.apache.hadoop + hadoop-common + + + + http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java index 0fe8fdf..e838d27 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import com.esotericsoftware.kryo.Kryo; @@ -44,6 +45,9 @@ public class WritableSerializer extends TypeSerializer { @Override public T createInstance() { + if(typeClass == NullWritable.class) { + return (T) NullWritable.get(); + } return InstantiationUtil.instantiate(typeClass); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-java/src/main/java/org/apache/hadoop/io/Writable.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/hadoop/io/Writable.java b/flink-java/src/main/java/org/apache/hadoop/io/Writable.java deleted file mode 100644 index 16efe7f..0000000 --- a/flink-java/src/main/java/org/apache/hadoop/io/Writable.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// ================================================================================================ -// !!! NOTICE !!! -// -// This interface has been directly copied from the Apache Hadoop project. -// It has been added to this project to allow compiling against the type "Writable" -// without adding the heavyweight Hadoop dependency. This keeps the project dependencies -// lightweight. -// -// At runtime, the JVM will load either this interface, or the interface from a Hadoop jar, -// if present. In both cases, the dynamic class loading, linking, and method lookup will -// allow the types to interoperate as long as package name, class name, and method signature -// of this interface are kept strictly in sync with the version packaged with Hadoop. -// -// This is a core interface of the Hadoop project and has been stable across all releases. -// -// ================================================================================================ - -package org.apache.hadoop.io; - -import java.io.DataOutput; -import java.io.DataInput; -import java.io.IOException; - - -/** - * A serializable object which implements a simple, efficient, serialization - * protocol, based on {@link DataInput} and {@link DataOutput}. - * - *

Any key or value type in the Hadoop Map-Reduce - * framework implements this interface.

- * - *

Implementations typically implement a static read(DataInput) - * method which constructs a new instance, calls {@link #readFields(DataInput)} - * and returns the instance.

- * - *

Example:

- *

- *     public class MyWritable implements Writable {
- *       // Some data
- *       private int counter;
- *       private long timestamp;
- *
- *       // Default constructor to allow (de)serialization
- *       MyWritable() { }
- *
- *       public void write(DataOutput out) throws IOException {
- *         out.writeInt(counter);
- *         out.writeLong(timestamp);
- *       }
- *
- *       public void readFields(DataInput in) throws IOException {
- *         counter = in.readInt();
- *         timestamp = in.readLong();
- *       }
- *
- *       public static MyWritable read(DataInput in) throws IOException {
- *         MyWritable w = new MyWritable();
- *         w.readFields(in);
- *         return w;
- *       }
- *     }
- * 

- */ -public interface Writable { - /** - * Serialize the fields of this object to out. - * - * @param out - * DataOuput to serialize this object into. - * @throws IOException - */ - void write(DataOutput out) throws IOException; - - /** - * Deserialize the fields of this object from in. - * - *

- * For efficiency, implementations should attempt to re-use storage in the - * existing object where possible. - *

- * - * @param in - * DataInput to deseriablize this object from. - * @throws IOException - */ - void readFields(DataInput in) throws IOException; -}