Return-Path:
X-Original-To: apmail-flink-commits-archive@minotaur.apache.org
Delivered-To: apmail-flink-commits-archive@minotaur.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id 3A368F5A4
for ;
Sun, 14 Dec 2014 22:26:58 +0000 (UTC)
Received: (qmail 50176 invoked by uid 500); 14 Dec 2014 22:26:58 -0000
Delivered-To: apmail-flink-commits-archive@flink.apache.org
Received: (qmail 50153 invoked by uid 500); 14 Dec 2014 22:26:57 -0000
Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@flink.incubator.apache.org
Delivered-To: mailing list commits@flink.incubator.apache.org
Received: (qmail 50144 invoked by uid 99); 14 Dec 2014 22:26:57 -0000
Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230)
by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 14 Dec 2014 22:26:57 +0000
X-ASF-Spam-Status: No, hits=-2000.0 required=5.0
tests=ALL_TRUSTED,T_RP_MATCHES_RCVD
X-Spam-Check-By: apache.org
Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3)
by apache.org (qpsmtpd/0.29) with SMTP; Sun, 14 Dec 2014 22:26:33 +0000
Received: (qmail 50063 invoked by uid 99); 14 Dec 2014 22:26:31 -0000
Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org)
(140.211.11.114)
by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 14 Dec 2014 22:26:31 +0000
Received: by tyr.zones.apache.org (Postfix, from userid 65534)
id D6D8E955F32; Sun, 14 Dec 2014 22:26:30 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: sewen@apache.org
To: commits@flink.incubator.apache.org
Date: Sun, 14 Dec 2014 22:26:33 -0000
Message-Id: <2546bb336860411db16f53335685921c@git.apache.org>
In-Reply-To: <7cc434c62aa24c2eae9e809c7b70f594@git.apache.org>
References: <7cc434c62aa24c2eae9e809c7b70f594@git.apache.org>
X-Mailer: ASF-Git Admin Mailer
Subject: [4/4] incubator-flink git commit: [FLINK-1305] [FLINK-1304] Test for
HadoopInputWrapper and NullWritable support
X-Virus-Checked: Checked by ClamAV on apache.org
[FLINK-1305] [FLINK-1304] Test for HadoopInputWrapper and NullWritable support
This closes #252
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/13968cd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/13968cd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/13968cd4
Branch: refs/heads/master
Commit: 13968cd4de446b4f565a094554380eb8559b6cf9
Parents: de7f478
Author: Robert Metzger
Authored: Fri Dec 5 19:19:29 2014 +0100
Committer: Stephan Ewen
Committed: Sun Dec 14 16:38:32 2014 +0100
----------------------------------------------------------------------
.../mapred/HadoopIOFormatsITCase.java | 230 +++++++++++++++++++
flink-java/pom.xml | 35 +++
.../typeutils/runtime/WritableSerializer.java | 4 +
.../java/org/apache/hadoop/io/Writable.java | 105 ---------
4 files changed, 269 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/13968cd4/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
new file mode 100644
index 0000000..6ef0f2e
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class HadoopIOFormatsITCase extends JavaProgramTestBase {
+
+ private static int NUM_PROGRAMS = 2;
+
+ private int curProgId = config.getInteger("ProgramId", -1);
+ private String[] resultPath;
+ private String[] expectedResult;
+ private String sequenceFileInPath;
+ private String sequenceFileInPathNull;
+
+ public HadoopIOFormatsITCase(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") };
+
+ File sequenceFile = createAndRegisterTempFile("seqFile");
+ sequenceFileInPath = sequenceFile.toURI().toString();
+
+ // Create a sequence file
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
+ Path path = new Path(sequenceFile.getAbsolutePath());
+
+ // ------------------ Long / Text Key Value pair: ------------
+ int kvCount = 4;
+
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ SequenceFile.Writer writer = null;
+ try {
+ writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
+ for (int i = 0; i < kvCount; i ++) {
+ if(i == 1) {
+ // write key = 0 a bit more often.
+ for(int a = 0;a < 15; a++) {
+ key.set(i);
+ value.set(i+" - somestring");
+ writer.append(key, value);
+ }
+ }
+ key.set(i);
+ value.set(i+" - somestring");
+ writer.append(key, value);
+ }
+ } finally {
+ IOUtils.closeStream(writer);
+ }
+
+
+ // ------------------ Long / Text Key Value pair: ------------
+
+ File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
+ sequenceFileInPathNull = sequenceFileNull.toURI().toString();
+ path = new Path(sequenceFileInPathNull);
+
+ LongWritable value1 = new LongWritable();
+ SequenceFile.Writer writer1 = null;
+ try {
+ writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass());
+ for (int i = 0; i < kvCount; i ++) {
+ value1.set(i);
+ writer1.append(NullWritable.get(), value1);
+ }
+ } finally {
+ IOUtils.closeStream(writer1);
+ }
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ for(int i = 0; i < resultPath.length; i++) {
+ compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
+ }
+ }
+
+ @Parameters
+ public static Collection
- */
-public interface Writable {
- /**
- * Serialize the fields of this object to out
.
- *
- * @param out
- * DataOuput
to serialize this object into.
- * @throws IOException
- */
- void write(DataOutput out) throws IOException;
-
- /**
- * Deserialize the fields of this object from in
.
- *
- *
- * For efficiency, implementations should attempt to re-use storage in the
- * existing object where possible.
- *
- *
- * @param in
- * DataInput
to deseriablize this object from.
- * @throws IOException
- */
- void readFields(DataInput in) throws IOException;
-}