flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/3] flink git commit: [FLINK-1396][FLINK-1303] Hadoop Input/Output directly in API
Date Mon, 09 Feb 2015 14:38:25 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.8 944e2e3d5 -> cd2f88afd


http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
new file mode 100644
index 0000000..236d149
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+
+public class HadoopInputFormatTest {
+
+
+	public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
+
+		public DummyVoidKeyInputFormat() {
+		}
+
+		@Override
+		public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext) throws IOException, InterruptedException {
+			return null;
+		}
+	}
+	
+	
+	@Test
+	public void checkTypeInformation() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			// Set up the Hadoop Input Format
+			Job job = Job.getInstance();
+			HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void,
Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, job);
+
+			TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType();
+			TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO);
+			
+			if(tupleType.isTupleType()) {
+				if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
+					fail("Tuple type information was not set correctly!");
+				}
+			} else {
+				fail("Type information was not set to tuple type information!");
+			}
+
+		}
+		catch (Exception ex) {
+			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
+		}
+
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 43f8609..089412e 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -25,16 +25,23 @@ import org.apache.flink.api.java.io._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase}
+import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.flink.api.scala.hadoop.mapreduce
 import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
 import org.apache.flink.core.fs.Path
 
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv,
-CollectionEnvironment}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, CollectionEnvironment}
 import org.apache.flink.api.common.io.{InputFormat, FileInputFormat}
 
 import org.apache.flink.api.java.operators.DataSource
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.{NumberSequenceIterator, SplittableIterator}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
+import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job}
+import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat,
+InputFormat => MapredInputFormat, JobConf}
+import org.apache.hadoop.fs.{Path => HadoopPath}
+
 
 import scala.collection.JavaConverters._
 
@@ -269,6 +276,92 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The
+   * given inputName is set on the given job.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: JobConf)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    val result = createHadoopInput(mapredInputFormat, key, value, job)
+    MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    result
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A
+   * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]].
+   */
+  def createHadoopInput[K, V](
+      mapredInputFormat: MapredInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: JobConf)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    val hadoopInputFormat = new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value,
job)
+    createInput(hadoopInputFormat)
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+   * The given inputName is set on the given job.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: Job)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    val result = createHadoopInput(mapredInputFormat, key, value, job)
+    MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    result
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given
+   * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A
+   * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = {
+    readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance)
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]].
+   */
+  def createHadoopInput[K, V](
+      mapredInputFormat: MapreduceInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: Job)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = {
+    val hadoopInputFormat =
+      new mapreduce.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+    createInput(hadoopInputFormat)
+  }
+
+  /**
    * Creates a DataSet from the given non-empty [[Seq]]. The elements need to be serializable
    * because the framework may move the elements into the cluster if needed.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
new file mode 100644
index 0000000..5170d14
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase
+import org.apache.hadoop.mapred.{JobConf, InputFormat}
+
+class HadoopInputFormat[K, V](
+    mapredInputFormat: InputFormat[K, V],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    job: JobConf)
+  extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job)
{
+
+  def nextRecord(reuse: (K, V)): (K, V) = {
+    if (!fetched) {
+      fetchNext()
+    }
+    if (!hasNext) {
+      return null
+    }
+    fetched = false
+    (key, value)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
new file mode 100644
index 0000000..180a8bf
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+
+class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf)
+  extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
+
+  def writeRecord(record: (K, V)) {
+    this.recordWriter.write(record._1, record._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
new file mode 100644
index 0000000..cafbdcb
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase
+import org.apache.hadoop.mapreduce.{InputFormat, Job}
+
+class HadoopInputFormat[K, V](
+    mapredInputFormat: InputFormat[K, V],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    job: Job)
+  extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job)
{
+
+  def nextRecord(reuse: (K, V)): (K, V) = {
+    if (!fetched) {
+      fetchNext()
+    }
+    if (!hasNext) {
+      return null
+    }
+    fetched = false
+    (recordReader.getCurrentKey, recordReader.getCurrentValue)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
new file mode 100644
index 0000000..51db9de
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase
+import org.apache.hadoop.mapreduce.{Job, OutputFormat}
+
+class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: Job)
+  extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
+
+  def writeRecord(record: (K, V)) {
+    this.recordWriter.write(record._1, record._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 3ee7a26..b94283d 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -116,12 +116,12 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 		
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-			<scope>test</scope>
-		</dependency>
+		<!--<dependency>-->
+			<!--<groupId>com.google.guava</groupId>-->
+			<!--<artifactId>guava</artifactId>-->
+			<!--<version>${guava.version}</version>-->
+			<!--<scope>test</scope>-->
+		<!--</dependency>-->
 		
 		<dependency>
 			<groupId>org.scalatest</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
new file mode 100644
index 0000000..037610e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoop.mapred;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+public class WordCountMapredITCase extends JavaProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	public WordCountMapredITCase(){
+//		setDegreeOfParallelism(4);
+//		setNumTaskManagers(2);
+//		setTaskManagerNumSlots(2);
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[] {".", "_"});
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+//		env.setDegreeOfParallelism(1);
+
+
+		DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(),
+				LongWritable.class, Text.class, textPath);
+
+		DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>,
String>() {
+			@Override
+			public String map(Tuple2<LongWritable, Text> value) throws Exception {
+				return value.f1.toString();
+			}
+		});
+
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.groupBy(0)
+						.sum(1);
+
+		DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String,
Integer>, Tuple2<Text, LongWritable>>() {
+
+
+			@Override
+			public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws
Exception {
+				return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1));
+			}
+		});
+
+		// Set up Hadoop Output Format
+		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
+				new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(),
new JobConf());
+		hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
+		TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(resultPath));
+
+		// Output & Execute
+		words.output(hadoopOutputFormat);
+		env.execute("Hadoop Compat WordCount");
+	}
+
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,
Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
new file mode 100644
index 0000000..3bdaa22
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoop.mapreduce;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class WordCountMapreduceITCase extends JavaProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	public WordCountMapreduceITCase(){
+//		setDegreeOfParallelism(4);
+//		setNumTaskManagers(2);
+//		setTaskManagerNumSlots(2);
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[] {".", "_"});
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+
+		DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(),
+				LongWritable.class, Text.class, textPath);
+
+		DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>,
String>() {
+			@Override
+			public String map(Tuple2<LongWritable, Text> value) throws Exception {
+				return value.f1.toString();
+			}
+		});
+
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.groupBy(0)
+						.sum(1);
+
+		DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String,
Integer>, Tuple2<Text, LongWritable>>() {
+
+
+			@Override
+			public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws
Exception {
+				return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1));
+			}
+		});
+
+		// Set up Hadoop Output Format
+		Job job = Job.getInstance();
+		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
+				new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(),
job);
+		job.getConfiguration().set("mapred.textoutputformat.separator", " ");
+		TextOutputFormat.setOutputPath(job, new Path(resultPath));
+
+		// Output & Execute
+		words.output(hadoopOutputFormat);
+		env.execute("Hadoop Compat WordCount");
+	}
+
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,
Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
new file mode 100644
index 0000000..25c878f
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.api.scala._
+
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{Text, LongWritable}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat}
+
+class WordCountMapredITCase extends JavaProgramTestBase {
+  protected var textPath: String = null
+  protected var resultPath: String = null
+
+  protected override def preSubmit() {
+    textPath = createTempFile("text.txt", WordCountData.TEXT)
+    resultPath = getTempDirPath("result")
+  }
+
+  protected override def postSubmit() {
+    compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, Array[String](".", "_"))
+  }
+
+  protected def testProgram() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input =
+      env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+
+    val text = input map { _._2.toString }
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+
+    val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+      new TextOutputFormat[Text, LongWritable],
+      new JobConf)
+    hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ")
+
+    FileOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf, new Path(resultPath))
+
+    words.output(hadoopOutputFormat)
+
+    env.execute("Hadoop Compat WordCount")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
new file mode 100644
index 0000000..4178267
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{Text, LongWritable}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+
+class WordCountMapreduceITCase extends JavaProgramTestBase {
+  protected var textPath: String = null
+  protected var resultPath: String = null
+
+  protected override def preSubmit() {
+    textPath = createTempFile("text.txt", WordCountData.TEXT)
+    resultPath = getTempDirPath("result")
+  }
+
+  protected override def postSubmit() {
+    compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, Array[String](".", "_"))
+  }
+
+  protected def testProgram() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input =
+      env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+
+    val text = input map { _._2.toString }
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+
+    val job = Job.getInstance()
+    val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+      new TextOutputFormat[Text, LongWritable],
+      job)
+    hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " ")
+
+    FileOutputFormat.setOutputPath(job, new Path(resultPath))
+
+    words.output(hadoopOutputFormat)
+
+    env.execute("Hadoop Compat WordCount")
+  }
+}
+


Mime
View raw message