flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [05/20] flink git commit: [FLINK-4048] Remove Hadoop from DataSet API
Date Wed, 27 Sep 2017 11:09:10 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
deleted file mode 100644
index 16d8b08..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapred;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.util.Progressable;
-import org.junit.Test;
-import org.mockito.Matchers;
-
-import java.io.IOException;
-
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link HadoopOutputFormat}.
- */
-public class HadoopOutputFormatTest {
-
-	@Test
-	public void testOpen() throws Exception {
-
-		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
-		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
-		JobConf jobConf = spy(new JobConf());
-		when(jobConf.getOutputCommitter()).thenReturn(outputCommitter);
-
-		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
-
-		outputFormat.open(1, 1);
-
-		verify(jobConf, times(2)).getOutputCommitter();
-		verify(outputCommitter, times(1)).setupJob(any(JobContext.class));
-		verify(dummyOutputFormat, times(1)).getRecordWriter(any(FileSystem.class), any(JobConf.class), anyString(), any(Progressable.class));
-	}
-
-	@Test
-	public void testConfigureWithConfigurable() {
-		ConfigurableDummyOutputFormat dummyOutputFormat = mock(ConfigurableDummyOutputFormat.class);
-		JobConf jobConf = mock(JobConf.class);
-
-		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
-
-		outputFormat.configure(Matchers.<org.apache.flink.configuration.Configuration>any());
-
-		verify(dummyOutputFormat, times(1)).setConf(any(Configuration.class));
-	}
-
-	@Test
-	public void testConfigureWithJobConfigurable() {
-		JobConfigurableDummyOutputFormat dummyOutputFormat = mock(JobConfigurableDummyOutputFormat.class);
-		JobConf jobConf = mock(JobConf.class);
-
-		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
-
-		outputFormat.configure(Matchers.<org.apache.flink.configuration.Configuration>any());
-
-		verify(dummyOutputFormat, times(1)).configure(any(JobConf.class));
-	}
-
-	@Test
-	public void testCloseWithTaskCommit() throws Exception {
-		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
-		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
-		when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(true);
-		DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
-		JobConf jobConf = mock(JobConf.class);
-
-		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
-		outputFormat.recordWriter = recordWriter;
-		outputFormat.outputCommitter = outputCommitter;
-
-		outputFormat.close();
-
-		verify(recordWriter, times(1)).close(any(Reporter.class));
-		verify(outputCommitter, times(1)).commitTask(any(TaskAttemptContext.class));
-	}
-
-	@Test
-	public void testCloseWithoutTaskCommit() throws Exception {
-		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
-		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
-		when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(false);
-		DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
-		JobConf jobConf = mock(JobConf.class);
-
-		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
-		outputFormat.recordWriter = recordWriter;
-		outputFormat.outputCommitter = outputCommitter;
-
-		outputFormat.close();
-
-		verify(recordWriter, times(1)).close(any(Reporter.class));
-		verify(outputCommitter, times(0)).commitTask(any(TaskAttemptContext.class));
-	}
-
-	@Test
-	public void testWriteRecord() throws Exception {
-		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
-		DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
-		JobConf jobConf = mock(JobConf.class);
-
-		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
-		outputFormat.recordWriter = recordWriter;
-
-		outputFormat.writeRecord(new Tuple2<>("key", 1L));
-
-		verify(recordWriter, times(1)).write(anyString(), anyLong());
-	}
-
-	@Test
-	public void testFinalizeGlobal() throws Exception {
-		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
-		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
-		JobConf jobConf = spy(new JobConf());
-		when(jobConf.getOutputCommitter()).thenReturn(outputCommitter);
-
-		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
-
-		outputFormat.finalizeGlobal(1);
-
-		verify(outputCommitter, times(1)).commitJob(any(JobContext.class));
-	}
-
-	private class DummyOutputFormat implements OutputFormat<String, Long> {
-
-		@Override
-		public RecordWriter<String, Long> getRecordWriter(FileSystem fileSystem, JobConf jobConf, String s, Progressable progressable) throws IOException {
-			return null;
-		}
-
-		@Override
-		public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOException {
-
-		}
-	}
-
-	private class ConfigurableDummyOutputFormat extends DummyOutputFormat implements Configurable {
-
-		@Override
-		public void setConf(Configuration configuration) {
-
-		}
-
-		@Override
-		public Configuration getConf() {
-			return null;
-		}
-	}
-
-	private class JobConfigurableDummyOutputFormat extends DummyOutputFormat implements JobConfigurable {
-
-		@Override
-		public void configure(JobConf jobConf) {
-
-		}
-	}
-
-	private class DummyOutputCommitter extends OutputCommitter {
-
-		@Override
-		public void setupJob(JobContext jobContext) throws IOException {
-
-		}
-
-		@Override
-		public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
-
-		}
-
-		@Override
-		public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
-			return false;
-		}
-
-		@Override
-		public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
-
-		}
-
-		@Override
-		public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
-
-		}
-	}
-
-	private class DummyRecordWriter implements RecordWriter<String, Long> {
-
-		@Override
-		public void write(String s, Long aLong) throws IOException {
-
-		}
-
-		@Override
-		public void close(Reporter reporter) throws IOException {
-
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
deleted file mode 100644
index 941eb1f..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapreduce;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.io.IOException;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link HadoopInputFormat}.
- */
-public class HadoopInputFormatTest {
-
-	@Rule
-	public final ExpectedException exception = ExpectedException.none();
-
-	@Test
-	public void testConfigure() throws Exception {
-
-		ConfigurableDummyInputFormat inputFormat = mock(ConfigurableDummyInputFormat.class);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
-		hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration());
-
-		verify(inputFormat, times(1)).setConf(any(Configuration.class));
-	}
-
-	@Test
-	public void testCreateInputSplits() throws Exception {
-		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
-		hadoopInputFormat.createInputSplits(2);
-
-		verify(inputFormat, times(1)).getSplits(any(JobContext.class));
-	}
-
-	@Test
-	public void testOpen() throws Exception {
-		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
-		when(inputFormat.createRecordReader(any(InputSplit.class), any(TaskAttemptContext.class))).thenReturn(new DummyRecordReader());
-		HadoopInputSplit inputSplit = mock(HadoopInputSplit.class);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
-		hadoopInputFormat.open(inputSplit);
-
-		verify(inputFormat, times(1)).createRecordReader(any(InputSplit.class), any(TaskAttemptContext.class));
-		assertThat(hadoopInputFormat.fetched, is(false));
-	}
-
-	@Test
-	public void testClose() throws Exception {
-
-		DummyRecordReader recordReader = mock(DummyRecordReader.class);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
-		hadoopInputFormat.close();
-
-		verify(recordReader, times(1)).close();
-	}
-
-	@Test
-	public void testCloseWithoutOpen() throws Exception {
-		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, Job.getInstance());
-		hadoopInputFormat.close();
-	}
-
-	@Test
-	public void testFetchNextInitialState() throws Exception {
-		DummyRecordReader recordReader = new DummyRecordReader();
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
-		hadoopInputFormat.fetchNext();
-
-		assertThat(hadoopInputFormat.fetched, is(true));
-		assertThat(hadoopInputFormat.hasNext, is(false));
-	}
-
-	@Test
-	public void testFetchNextRecordReaderHasNewValue() throws Exception {
-
-		DummyRecordReader recordReader = mock(DummyRecordReader.class);
-		when(recordReader.nextKeyValue()).thenReturn(true);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
-		hadoopInputFormat.fetchNext();
-
-		assertThat(hadoopInputFormat.fetched, is(true));
-		assertThat(hadoopInputFormat.hasNext, is(true));
-	}
-
-	@Test
-	public void testFetchNextRecordReaderThrowsException() throws Exception {
-
-		DummyRecordReader recordReader = mock(DummyRecordReader.class);
-		when(recordReader.nextKeyValue()).thenThrow(new InterruptedException());
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
-
-		exception.expect(IOException.class);
-		hadoopInputFormat.fetchNext();
-
-		assertThat(hadoopInputFormat.hasNext, is(true));
-	}
-
-	@Test
-	public void checkTypeInformation() throws Exception {
-
-		HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<>(
-				new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, Job.getInstance());
-
-		TypeInformation<Tuple2<Void, Long>> tupleType = hadoopInputFormat.getProducedType();
-		TypeInformation<Tuple2<Void, Long>> expectedType = new TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
-
-		assertThat(tupleType.isTupleType(), is(true));
-		assertThat(tupleType, is(equalTo(expectedType)));
-	}
-
-	private HadoopInputFormat<String, Long> setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job,
-																	RecordReader<String, Long> recordReader) {
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat,
-				String.class, Long.class, job);
-		hadoopInputFormat.recordReader = recordReader;
-
-		return hadoopInputFormat;
-	}
-
-	private class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
-
-		public DummyVoidKeyInputFormat() {}
-
-		@Override
-		public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-			return null;
-		}
-	}
-
-	private class DummyRecordReader extends RecordReader<String, Long> {
-
-		@Override
-		public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-
-		}
-
-		@Override
-		public boolean nextKeyValue() throws IOException, InterruptedException {
-			return false;
-		}
-
-		@Override
-		public String getCurrentKey() throws IOException, InterruptedException {
-			return null;
-		}
-
-		@Override
-		public Long getCurrentValue() throws IOException, InterruptedException {
-			return null;
-		}
-
-		@Override
-		public float getProgress() throws IOException, InterruptedException {
-			return 0;
-		}
-
-		@Override
-		public void close() throws IOException {
-
-		}
-	}
-
-	private class DummyInputFormat extends InputFormat<String, Long> {
-
-		@Override
-		public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
-			return null;
-		}
-
-		@Override
-		public RecordReader<String, Long> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-			return new DummyRecordReader();
-		}
-	}
-
-	private class ConfigurableDummyInputFormat extends DummyInputFormat implements Configurable {
-
-		@Override
-		public void setConf(Configuration configuration) {}
-
-		@Override
-		public Configuration getConf() {
-			return null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
deleted file mode 100644
index 8e2329a..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapreduce;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link HadoopOutputFormat}.
- */
-public class HadoopOutputFormatTest {
-
-	private static final String MAPRED_OUTPUT_PATH = "an/ignored/file/";
-	private static final String MAPRED_OUTPUT_DIR_KEY = "mapred.output.dir";
-
-	@Test
-	public void testWriteRecord() throws Exception {
-
-		RecordWriter<String, Long> recordWriter = mock(DummyRecordWriter.class);
-		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
-			Job.getInstance(), recordWriter, null, new Configuration());
-
-		hadoopOutputFormat.writeRecord(new Tuple2<String, Long>());
-
-		verify(recordWriter, times(1)).write(anyString(), anyLong());
-	}
-
-	@Test
-	public void testOpen() throws Exception {
-
-		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
-		OutputCommitter outputCommitter = setupOutputCommitter(true);
-		when(dummyOutputFormat.getOutputCommitter(any(TaskAttemptContext.class))).thenReturn(outputCommitter);
-
-		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(dummyOutputFormat,
-			Job.getInstance(), new DummyRecordWriter(), setupOutputCommitter(true), new Configuration());
-
-		hadoopOutputFormat.open(1, 4);
-
-		verify(hadoopOutputFormat.outputCommitter, times(1)).setupJob(any(JobContext.class));
-		verify(hadoopOutputFormat.mapreduceOutputFormat, times(1)).getRecordWriter(any(TaskAttemptContext.class));
-	}
-
-	@Test
-	public void testCloseWithNeedsTaskCommitTrue() throws Exception {
-
-		RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
-		OutputCommitter outputCommitter = setupOutputCommitter(true);
-
-		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
-			Job.getInstance(), recordWriter, outputCommitter, new Configuration());
-
-		hadoopOutputFormat.close();
-
-		verify(outputCommitter, times(1)).commitTask(any(TaskAttemptContext.class));
-		verify(recordWriter, times(1)).close(any(TaskAttemptContext.class));
-	}
-
-	@Test
-	public void testCloseWithNeedsTaskCommitFalse() throws Exception {
-
-		RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
-		OutputCommitter outputCommitter = setupOutputCommitter(false);
-
-		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
-			Job.getInstance(), recordWriter, outputCommitter, new Configuration());
-
-		hadoopOutputFormat.close();
-
-		verify(outputCommitter, times(0)).commitTask(any(TaskAttemptContext.class));
-		verify(recordWriter, times(1)).close(any(TaskAttemptContext.class));
-	}
-
-	@Test
-	public void testConfigure() throws Exception {
-
-		ConfigurableDummyOutputFormat outputFormat = mock(ConfigurableDummyOutputFormat.class);
-
-		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(outputFormat, Job.getInstance(),
-			null, null, new Configuration());
-
-		hadoopOutputFormat.configure(new org.apache.flink.configuration.Configuration());
-
-		verify(outputFormat, times(1)).setConf(any(Configuration.class));
-	}
-
-	@Test
-	public void testFinalizedGlobal() throws Exception {
-
-		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
-			Job.getInstance(), null, null, new Configuration());
-
-		hadoopOutputFormat.finalizeGlobal(1);
-
-		verify(hadoopOutputFormat.outputCommitter, times(1)).commitJob(any(JobContext.class));
-	}
-
-	private OutputCommitter setupOutputCommitter(boolean needsTaskCommit) throws IOException {
-		OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
-		when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(needsTaskCommit);
-		doNothing().when(outputCommitter).commitTask(any(TaskAttemptContext.class));
-
-		return outputCommitter;
-	}
-
-	private HadoopOutputFormat<String, Long> setupHadoopOutputFormat(
-		OutputFormat<String, Long> outputFormat,
-		Job job,
-		RecordWriter<String, Long> recordWriter,
-		OutputCommitter outputCommitter,
-		Configuration configuration) {
-
-		HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(outputFormat, job);
-		hadoopOutputFormat.recordWriter = recordWriter;
-		hadoopOutputFormat.outputCommitter = outputCommitter;
-		hadoopOutputFormat.configuration = configuration;
-		hadoopOutputFormat.configuration.set(MAPRED_OUTPUT_DIR_KEY, MAPRED_OUTPUT_PATH);
-
-		return hadoopOutputFormat;
-	}
-
-	class DummyRecordWriter extends RecordWriter<String, Long> {
-		@Override
-		public void write(String key, Long value) throws IOException, InterruptedException {
-		}
-
-		@Override
-		public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-
-		}
-	}
-
-	class DummyOutputFormat extends OutputFormat<String, Long> {
-		@Override
-		public RecordWriter<String, Long> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-			return null;
-		}
-
-		@Override
-		public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-
-		}
-
-		@Override
-		public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-			final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
-			doNothing().when(outputCommitter).setupJob(any(JobContext.class));
-
-			return outputCommitter;
-		}
-	}
-
-	class ConfigurableDummyOutputFormat extends DummyOutputFormat implements Configurable {
-
-		@Override
-		public void setConf(Configuration configuration) {}
-
-		@Override
-		public Configuration getConf() {
-			return null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 6d70438..bc24ad0 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -29,15 +29,10 @@ import org.apache.flink.api.java.operators.DataSource
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo}
 import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv}
-import org.apache.flink.api.scala.hadoop.{mapred, mapreduce}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.Path
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator}
-import org.apache.hadoop.fs.{Path => HadoopPath}
-import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat, JobConf}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
-import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job}
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
@@ -410,147 +405,6 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
-   * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The
-   * given inputName is set on the given job.
-   *
-   * @deprecated Please use
-   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
-   * from the flink-hadoop-compatibility module.
-   */
-  @Deprecated
-  @PublicEvolving
-  def readHadoopFile[K, V](
-      mapredInputFormat: MapredFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String,
-      job: JobConf)
-      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
-    val result = createHadoopInput(mapredInputFormat, key, value, job)
-    MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
-    result
-  }
-
-  /**
-   * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A
-   * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
-   *
-   * @deprecated Please use
-   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
-   * from the flink-hadoop-compatibility module.
-   */
-  @Deprecated
-  @PublicEvolving
-  def readHadoopFile[K, V](
-      mapredInputFormat: MapredFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String)
-      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
-    readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
-  }
-
-  /**
-   * Creates a [[DataSet]] from [[org.apache.hadoop.mapred.SequenceFileInputFormat]]
-   * A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
-   *
-   * @deprecated Please use
-   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readSequenceFile]]
-   * from the flink-hadoop-compatibility module.
-   */
-  @Deprecated
-  @PublicEvolving
-  def readSequenceFile[K, V](
-      key: Class[K],
-      value: Class[V],
-      inputPath: String)
-      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
-    readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
-      key, value, inputPath)
-  }
-
-  /**
-   * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]].
-   *
-   * @deprecated Please use
-   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]]
-   * from the flink-hadoop-compatibility module.
-   */
-  @Deprecated
-  @PublicEvolving
-  def createHadoopInput[K, V](
-      mapredInputFormat: MapredInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      job: JobConf)
-      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
-    val hadoopInputFormat = new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
-    createInput(hadoopInputFormat)
-  }
-
-  /**
-   * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
-   * The given inputName is set on the given job.
-   *
-   * @deprecated Please use
-   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
-   * from the flink-hadoop-compatibility module.
-   */
-  @Deprecated
-  @PublicEvolving
-  def readHadoopFile[K, V](
-      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String,
-      job: Job)
-      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
-    val result = createHadoopInput(mapreduceInputFormat, key, value, job)
-    MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
-    result
-  }
-
-  /**
-   * Creates a [[DataSet]] from the given
-   * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A
-   * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created.
-   *
-   * @deprecated Please use
-   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
-   * from the flink-hadoop-compatibility module.
-   */
-  @Deprecated
-  @PublicEvolving
-  def readHadoopFile[K, V](
-      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String)
-      (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = {
-    readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance)
-  }
-
-  /**
-   * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]].
-   *
-   * @deprecated Please use
-   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]]
-   * from the flink-hadoop-compatibility module.
-   */
-  @Deprecated
-  @PublicEvolving
-  def createHadoopInput[K, V](
-      mapreduceInputFormat: MapreduceInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      job: Job)
-      (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = {
-    val hadoopInputFormat =
-      new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job)
-    createInput(hadoopInputFormat)
-  }
-
-  /**
    * Creates a DataSet from the given non-empty [[Iterable]].
    *
    * Note that this operation will result in a non-parallel data source, i.e. a data source with

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
deleted file mode 100644
index 9b614fd..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.hadoop.mapred
-
-import org.apache.flink.annotation.Public
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase
-import org.apache.hadoop.mapred.{JobConf, InputFormat}
-
-@Public
-class HadoopInputFormat[K, V](
-    mapredInputFormat: InputFormat[K, V],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    job: JobConf)
-  extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) {
-
-  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]) = {
-    this(mapredInputFormat, keyClass, valueClass, new JobConf)
-  }
-
-  def nextRecord(reuse: (K, V)): (K, V) = {
-    if (!fetched) {
-      fetchNext()
-    }
-    if (!hasNext) {
-      null
-    } else {
-      fetched = false
-      (key, value)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
deleted file mode 100644
index ad5f282..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.hadoop.mapred
-
-import org.apache.flink.annotation.Public
-import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
-import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat}
-
-@Public
-class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf)
-  extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
-
-  def this(
-      mapredOutputFormat: OutputFormat[K, V],
-      outputCommitterClass: Class[OutputCommitter],
-      job: JobConf) {
-
-    this(mapredOutputFormat, job)
-    this.getJobConf.setOutputCommitter(outputCommitterClass)
-  }
-
-  def writeRecord(record: (K, V)) {
-    this.recordWriter.write(record._1, record._2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
deleted file mode 100644
index 8efb53d..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.hadoop.mapreduce
-
-import org.apache.flink.annotation.Public
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase
-import org.apache.hadoop.mapreduce.{InputFormat, Job}
-
-@Public
-class HadoopInputFormat[K, V](
-    mapredInputFormat: InputFormat[K, V],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    job: Job)
-  extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) {
-
-  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]) = {
-    this(mapredInputFormat, keyClass, valueClass, Job.getInstance())
-  }
-
-  def nextRecord(reuse: (K, V)): (K, V) = {
-    if (!fetched) {
-      fetchNext()
-    }
-    if (!hasNext) {
-      null
-    } else {
-      fetched = false
-      (recordReader.getCurrentKey, recordReader.getCurrentValue)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
deleted file mode 100644
index 8095304..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.hadoop.mapreduce
-
-import org.apache.flink.annotation.Public
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase
-import org.apache.hadoop.mapreduce.{Job, OutputFormat}
-
-@Public
-class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: Job)
-  extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
-
-  def writeRecord(record: (K, V)) {
-    this.recordWriter.write(record._1, record._2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
deleted file mode 100644
index 07b4d76..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoop.mapred;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.util.OperatingSystem;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.LinkedList;
-
-/**
- * Integraiton tests for Hadoop IO formats.
- */
-@RunWith(Parameterized.class)
-public class HadoopIOFormatsITCase extends JavaProgramTestBase {
-
-	private static final int NUM_PROGRAMS = 2;
-
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String[] resultPath;
-	private String[] expectedResult;
-	private String sequenceFileInPath;
-	private String sequenceFileInPathNull;
-
-	public HadoopIOFormatsITCase(Configuration config) {
-		super(config);
-	}
-
-	@Before
-	public void checkOperatingSystem() {
-		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") };
-
-		File sequenceFile = createAndRegisterTempFile("seqFile");
-		sequenceFileInPath = sequenceFile.toURI().toString();
-
-		// Create a sequence file
-		org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-		FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
-		Path path = new Path(sequenceFile.getAbsolutePath());
-
-		//  ------------------ Long / Text Key Value pair: ------------
-		int kvCount = 4;
-
-		LongWritable key = new LongWritable();
-		Text value = new Text();
-		SequenceFile.Writer writer = null;
-		try {
-			writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
-			for (int i = 0; i < kvCount; i++) {
-				if (i == 1) {
-					// write key = 0 a bit more often.
-					for (int a = 0; a < 15; a++) {
-						key.set(i);
-						value.set(i + " - somestring");
-						writer.append(key, value);
-					}
-				}
-				key.set(i);
-				value.set(i + " - somestring");
-				writer.append(key, value);
-			}
-		} finally {
-			IOUtils.closeStream(writer);
-		}
-
-		//  ------------------ Long / Text Key Value pair: ------------
-
-		File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
-		sequenceFileInPathNull = sequenceFileNull.toURI().toString();
-		path = new Path(sequenceFileInPathNull);
-
-		LongWritable value1 = new LongWritable();
-		SequenceFile.Writer writer1 = null;
-		try {
-			writer1 = SequenceFile.createWriter(fs, conf, path, NullWritable.class, value1.getClass());
-			for (int i = 0; i < kvCount; i++) {
-				value1.set(i);
-				writer1.append(NullWritable.get(), value1);
-			}
-		} finally {
-			IOUtils.closeStream(writer1);
-		}
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		for (int i = 0; i < resultPath.length; i++) {
-			compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
-		}
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for (int i = 1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-
-		return TestBaseUtils.toParameterList(tConfigs);
-	}
-
-	private static class HadoopIOFormatPrograms {
-
-		public static String[] runProgram(int progId, String[] resultPath, String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
-
-			switch(progId) {
-			case 1: {
-				/**
-				 * Test sequence file, including a key access.
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>();
-				JobConf hdconf = new JobConf();
-				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath));
-				HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf);
-				DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif);
-				DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
-					@Override
-					public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
-						return new Tuple2<Long, Text>(value.f0.get(), value.f1);
-					}
-				}).sum(0);
-				sumed.writeAsText(resultPath[0]);
-				DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
-					@Override
-					public String map(Tuple2<LongWritable, Text> value) throws Exception {
-						return value.f1 + " - " + value.f0.get();
-					}
-				});
-				res.writeAsText(resultPath[1]);
-				env.execute();
-
-				// return expected result
-				return 	new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" +
-						"1 - somestring - 1\n" +
-						"2 - somestring - 2\n" +
-						"3 - somestring - 3\n"};
-
-			}
-			case 2: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
-				JobConf hdconf = new JobConf();
-				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull));
-				HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf);
-				DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif);
-				DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
-					@Override
-					public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception {
-						return new Tuple2<Void, Long>(null, value.f1.get());
-					}
-				});
-				DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1);
-				res1.writeAsText(resultPath[1]);
-				res.writeAsText(resultPath[0]);
-				env.execute();
-
-				// return expected result
-				return 	new String [] {"(null,2)\n" +
-						"(null,0)\n" +
-						"(null,1)\n" +
-						"(null,3)",
-						"(null,0)\n" +
-						"(null,1)\n" +
-						"(null,2)\n" +
-						"(null,3)"};
-			}
-			default:
-				throw new IllegalArgumentException("Invalid program id");
-			}
-
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
deleted file mode 100644
index 409bb40..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoop.mapred;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.testfunctions.Tokenizer;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.OperatingSystem;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.junit.Assume;
-import org.junit.Before;
-
-import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
-
-/**
- * Test WordCount with Hadoop input and output "mapred" (legacy) formats.
- */
-public class WordCountMapredITCase extends JavaProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Before
-	public void checkOperatingSystem() {
-		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[] {".", "_"});
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		internalRun(true);
-		postSubmit();
-		resultPath = getTempDirPath("result2");
-		internalRun(false);
-		postSubmit();
-	}
-
-	private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<LongWritable, Text>> input;
-
-		if (isTestDeprecatedAPI) {
-			input = env.readHadoopFile(new TextInputFormat(),
-				LongWritable.class, Text.class, textPath);
-		} else {
-			input = env.createInput(readHadoopFile(new TextInputFormat(),
-				LongWritable.class, Text.class, textPath));
-		}
-
-		DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
-			@Override
-			public String map(Tuple2<LongWritable, Text> value) throws Exception {
-				return value.f1.toString();
-			}
-		});
-
-		DataSet<Tuple2<String, Integer>> counts =
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new Tokenizer())
-						// group by the tuple field "0" and sum up tuple field "1"
-						.groupBy(0)
-						.sum(1);
-
-		DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
-
-			@Override
-			public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception {
-				return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1));
-			}
-		});
-
-		// Set up Hadoop Output Format
-		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
-				new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
-		hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
-		TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(resultPath));
-
-		// Output & Execute
-		words.output(hadoopOutputFormat);
-		env.execute("Hadoop Compat WordCount");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
deleted file mode 100644
index 632b406..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoop.mapreduce;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.testfunctions.Tokenizer;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.OperatingSystem;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.junit.Assume;
-import org.junit.Before;
-
-import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
-
-/**
- * Test WordCount with Hadoop input and output "mapreduce" (modern) formats.
- */
-public class WordCountMapreduceITCase extends JavaProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Before
-	public void checkOperatingSystem() {
-		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[] {".", "_"});
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		internalRun(true);
-		postSubmit();
-		resultPath = getTempDirPath("result2");
-		internalRun(false);
-		postSubmit();
-	}
-
-	private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<LongWritable, Text>> input;
-		if (isTestDeprecatedAPI) {
-			input = env.readHadoopFile(new TextInputFormat(),
-				LongWritable.class, Text.class, textPath);
-		} else {
-			input = env.createInput(readHadoopFile(new TextInputFormat(),
-				LongWritable.class, Text.class, textPath));
-		}
-
-		DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
-			@Override
-			public String map(Tuple2<LongWritable, Text> value) throws Exception {
-				return value.f1.toString();
-			}
-		});
-
-		DataSet<Tuple2<String, Integer>> counts =
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new Tokenizer())
-						// group by the tuple field "0" and sum up tuple field "1"
-						.groupBy(0)
-						.sum(1);
-
-		DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
-
-			@Override
-			public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception {
-				return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1));
-			}
-		});
-
-		// Set up Hadoop Output Format
-		Job job = Job.getInstance();
-		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
-				new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), job);
-		job.getConfiguration().set("mapred.textoutputformat.separator", " ");
-		TextOutputFormat.setOutputPath(job, new Path(resultPath));
-
-		// Output & Execute
-		words.output(hadoopOutputFormat);
-		env.execute("Hadoop Compat WordCount");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
deleted file mode 100644
index e7069e1..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.hadoop.mapred
-
-import org.apache.flink.api.scala._
-import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
-import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
-import org.apache.flink.util.OperatingSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextInputFormat, TextOutputFormat}
-import org.junit.{Assume, Before}
-
-class WordCountMapredITCase extends JavaProgramTestBase {
-  protected var textPath: String = null
-  protected var resultPath: String = null
-
-  @Before
-  def checkOperatingSystem() {
-    // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-    Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows)
-  }
-
-  protected override def preSubmit() {
-    textPath = createTempFile("text.txt", WordCountData.TEXT)
-    resultPath = getTempDirPath("result")
-  }
-
-  protected override def postSubmit() {
-    TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS,
-                                                resultPath, Array[String](".", "_"))
-  }
-
-  private def internalRun (testDeprecatedAPI: Boolean): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val input =
-      if (testDeprecatedAPI) {
-        env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
-      } else {
-        env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
-          classOf[Text], textPath))
-      }
-
-    val counts = input
-      .map(_._2.toString)
-      .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
-      .groupBy(0)
-      .sum(1)
-
-    val words = counts
-      .map( t => (new Text(t._1), new LongWritable(t._2)) )
-
-    val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
-      new TextOutputFormat[Text, LongWritable],
-      new JobConf)
-    hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ")
-
-    FileOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf, new Path(resultPath))
-
-    words.output(hadoopOutputFormat)
-
-    env.execute("Hadoop Compat WordCount")
-  }
-
-  protected def testProgram() {
-    internalRun(testDeprecatedAPI = true)
-    postSubmit()
-    resultPath = getTempDirPath("result2")
-    internalRun(testDeprecatedAPI = false)
-    postSubmit()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
deleted file mode 100644
index c3031e9..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.hadoop.mapreduce
-
-import org.apache.flink.api.scala._
-import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
-import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
-import org.apache.flink.util.OperatingSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{Text, LongWritable}
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
-import org.junit.{Assume, Before}
-
-class WordCountMapreduceITCase extends JavaProgramTestBase {
-  protected var textPath: String = null
-  protected var resultPath: String = null
-
-  @Before
-  def checkOperatingSystem() {
-    // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-    Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows)
-  }
-
-  protected override def preSubmit() {
-    textPath = createTempFile("text.txt", WordCountData.TEXT)
-    resultPath = getTempDirPath("result")
-  }
-
-  protected override def postSubmit() {
-    TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS,
-                                                resultPath, Array[String](".", "_"))
-  }
-
-  protected def testProgram() {
-    internalRun(testDeprecatedAPI = true)
-    postSubmit()
-    resultPath = getTempDirPath("result2")
-    internalRun(testDeprecatedAPI = false)
-    postSubmit()
-  }
-
-  private def internalRun (testDeprecatedAPI: Boolean): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val input =
-      if (testDeprecatedAPI) {
-        env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
-      } else {
-        env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
-          classOf[Text], textPath))
-      }
-
-    val counts = input
-      .map(_._2.toString)
-      .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
-      .groupBy(0)
-      .sum(1)
-
-    val words = counts
-      .map( t => (new Text(t._1), new LongWritable(t._2)) )
-
-    val job = Job.getInstance()
-    val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
-      new TextOutputFormat[Text, LongWritable],
-      job)
-    hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " ")
-
-    FileOutputFormat.setOutputPath(job, new Path(resultPath))
-
-    words.output(hadoopOutputFormat)
-
-    env.execute("Hadoop Compat WordCount")
-  }
-}
-


Mime
View raw message