flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [06/20] flink git commit: [FLINK-4048] Remove Hadoop from DataSet API
Date Wed, 27 Sep 2017 11:09:11 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
deleted file mode 100644
index 79f0a98..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapred;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.IOException;
-
-/**
- * Wrapper for using HadoopInputFormats (mapred-variant) with Flink.
- *
- * <p>The IF is returning a {@code Tuple2<K,V>}.
- *
- * @param <K> Type of the key
- * @param <V> Type of the value.
- */
-@Public
-public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>> implements ResultTypeQueryable<Tuple2<K, V>> {
-
-	private static final long serialVersionUID = 1L;
-
-	public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
-		super(mapredInputFormat, key, value, job);
-	}
-
-	public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value) {
-		super(mapredInputFormat, key, value, new JobConf());
-	}
-
-	@Override
-	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
-		if (!fetched) {
-			fetchNext();
-		}
-		if (!hasNext) {
-			return null;
-		}
-		record.f0 = key;
-		record.f1 = value;
-		fetched = false;
-		return record;
-	}
-
-	@Override
-	public TypeInformation<Tuple2<K, V>> getProducedType() {
-		return new TupleTypeInfo<>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
deleted file mode 100644
index 27a477c..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapred;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.InputSplitAssigner;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-
-/**
- * Common base for Java and Scala API for using Hadoop input formats with Flink.
- *
- * @param <K> Type of key
- * @param <V> Type of value
- * @param <T> The type iself
- */
-@Internal
-public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCommonBase<T, HadoopInputSplit> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
-
-	// Mutexes to avoid concurrent operations on Hadoop InputFormats.
-	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
-	// In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
-	// might be used in the same JVM.
-	private static final Object OPEN_MUTEX = new Object();
-	private static final Object CONFIGURE_MUTEX = new Object();
-	private static final Object CLOSE_MUTEX = new Object();
-
-	private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
-	protected Class<K> keyClass;
-	protected Class<V> valueClass;
-	private JobConf jobConf;
-
-	protected transient K key;
-	protected transient V value;
-
-	private transient RecordReader<K, V> recordReader;
-	protected transient boolean fetched = false;
-	protected transient boolean hasNext;
-
-	public HadoopInputFormatBase(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
-		super(job.getCredentials());
-		this.mapredInputFormat = mapredInputFormat;
-		this.keyClass = key;
-		this.valueClass = value;
-		HadoopUtils.mergeHadoopConf(job);
-		this.jobConf = job;
-		ReflectionUtils.setConf(mapredInputFormat, jobConf);
-	}
-
-	public JobConf getJobConf() {
-		return jobConf;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  InputFormat
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void configure(Configuration parameters) {
-
-		// enforce sequential configuration() calls
-		synchronized (CONFIGURE_MUTEX) {
-			// configure MR InputFormat if necessary
-			if (this.mapredInputFormat instanceof Configurable) {
-				((Configurable) this.mapredInputFormat).setConf(this.jobConf);
-			} else if (this.mapredInputFormat instanceof JobConfigurable) {
-				((JobConfigurable) this.mapredInputFormat).configure(this.jobConf);
-			}
-		}
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
-		// only gather base statistics for FileInputFormats
-		if (!(mapredInputFormat instanceof FileInputFormat)) {
-			return null;
-		}
-
-		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
-				(FileBaseStatistics) cachedStats : null;
-
-		try {
-			final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf);
-
-			return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
-		} catch (IOException ioex) {
-			if (LOG.isWarnEnabled()) {
-				LOG.warn("Could not determine statistics due to an io error: "
-						+ ioex.getMessage());
-			}
-		} catch (Throwable t) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Unexpected problem while getting the file statistics: "
-						+ t.getMessage(), t);
-			}
-		}
-
-		// no statistics available
-		return null;
-	}
-
-	@Override
-	public HadoopInputSplit[] createInputSplits(int minNumSplits)
-			throws IOException {
-		org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits);
-		HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
-		for (int i = 0; i < splitArray.length; i++) {
-			hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf);
-		}
-		return hiSplit;
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
-		return new LocatableInputSplitAssigner(inputSplits);
-	}
-
-	@Override
-	public void open(HadoopInputSplit split) throws IOException {
-
-		// enforce sequential open() calls
-		synchronized (OPEN_MUTEX) {
-
-			this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
-			if (this.recordReader instanceof Configurable) {
-				((Configurable) this.recordReader).setConf(jobConf);
-			}
-			key = this.recordReader.createKey();
-			value = this.recordReader.createValue();
-			this.fetched = false;
-		}
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		if (!fetched) {
-			fetchNext();
-		}
-		return !hasNext;
-	}
-
-	protected void fetchNext() throws IOException {
-		hasNext = this.recordReader.next(key, value);
-		fetched = true;
-	}
-
-	@Override
-	public void close() throws IOException {
-		if (this.recordReader != null) {
-
-			// enforce sequential close() calls
-			synchronized (CLOSE_MUTEX) {
-				this.recordReader.close();
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Helper methods
-	// --------------------------------------------------------------------------------------------
-
-	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
-			ArrayList<FileStatus> files) throws IOException {
-
-		long latestModTime = 0L;
-
-		// get the file info and check whether the cached statistics are still valid.
-		for (org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
-
-			final Path filePath = new Path(hadoopPath.toUri());
-			final FileSystem fs = FileSystem.get(filePath.toUri());
-
-			final FileStatus file = fs.getFileStatus(filePath);
-			latestModTime = Math.max(latestModTime, file.getModificationTime());
-
-			// enumerate all files and check their modification time stamp.
-			if (file.isDir()) {
-				FileStatus[] fss = fs.listStatus(filePath);
-				files.ensureCapacity(files.size() + fss.length);
-
-				for (FileStatus s : fss) {
-					if (!s.isDir()) {
-						files.add(s);
-						latestModTime = Math.max(s.getModificationTime(), latestModTime);
-					}
-				}
-			} else {
-				files.add(file);
-			}
-		}
-
-		// check whether the cached statistics are still valid, if we have any
-		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
-			return cachedStats;
-		}
-
-		// calculate the whole length
-		long len = 0;
-		for (FileStatus s : files) {
-			len += s.getLen();
-		}
-
-		// sanity check
-		if (len <= 0) {
-			len = BaseStatistics.SIZE_UNKNOWN;
-		}
-
-		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		super.write(out);
-		out.writeUTF(mapredInputFormat.getClass().getName());
-		out.writeUTF(keyClass.getName());
-		out.writeUTF(valueClass.getName());
-		jobConf.write(out);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		super.read(in);
-
-		String hadoopInputFormatClassName = in.readUTF();
-		String keyClassName = in.readUTF();
-		String valueClassName = in.readUTF();
-		if (jobConf == null) {
-			jobConf = new JobConf();
-		}
-		jobConf.readFields(in);
-		try {
-			this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K, V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
-		}
-		try {
-			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find key class.", e);
-		}
-		try {
-			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find value class.", e);
-		}
-		ReflectionUtils.setConf(mapredInputFormat, jobConf);
-
-		jobConf.getCredentials().addAll(this.credentials);
-		Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
-		if (currentUserCreds != null) {
-			jobConf.getCredentials().addAll(currentUserCreds);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
deleted file mode 100644
index dbbead2..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapred;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCommitter;
-
-import java.io.IOException;
-
-/**
- * Wrapper for using HadoopOutputFormats (mapred-variant) with Flink.
- *
- * <p>The IF is returning a {@code Tuple2<K,V>}.
- *
- * @param <K> Type of the key
- * @param <V> Type of the value.
- */
-@Public
-public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
-
-	private static final long serialVersionUID = 1L;
-
-	public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
-		super(mapredOutputFormat, job);
-	}
-
-	public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, Class<OutputCommitter> outputCommitterClass, JobConf job) {
-		this(mapredOutputFormat, job);
-		super.getJobConf().setOutputCommitter(outputCommitterClass);
-	}
-
-	@Override
-	public void writeRecord(Tuple2<K, V> record) throws IOException {
-		this.recordWriter.write(record.f0, record.f1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
deleted file mode 100644
index 3cc1749..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapred;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.io.FinalizeOnMaster;
-import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.configuration.Configuration;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptContextImpl;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
-
-/**
- * Common base for the mapred HadoopOutputFormat wrappers. There are implementations for Java and Scala.
- *
- * @param <K> Type of Key
- * @param <V> Type of Value
- * @param <T> Record type.
- */
-@Internal
-public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormatCommonBase<T> implements FinalizeOnMaster {
-
-	private static final long serialVersionUID = 1L;
-
-	// Mutexes to avoid concurrent operations on Hadoop OutputFormats.
-	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
-	// In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
-	// might be used in the same JVM.
-	protected static final Object OPEN_MUTEX = new Object();
-	protected static final Object CONFIGURE_MUTEX = new Object();
-	protected static final Object CLOSE_MUTEX = new Object();
-
-	protected JobConf jobConf;
-	protected org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat;
-	protected transient RecordWriter<K, V> recordWriter;
-	protected transient OutputCommitter outputCommitter;
-	protected transient TaskAttemptContext context;
-
-	public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
-		super(job.getCredentials());
-		this.mapredOutputFormat = mapredOutputFormat;
-		HadoopUtils.mergeHadoopConf(job);
-		this.jobConf = job;
-	}
-
-	public JobConf getJobConf() {
-		return jobConf;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  OutputFormat
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void configure(Configuration parameters) {
-
-		// enforce sequential configure() calls
-		synchronized (CONFIGURE_MUTEX) {
-			// configure MR OutputFormat if necessary
-			if (this.mapredOutputFormat instanceof Configurable) {
-				((Configurable) this.mapredOutputFormat).setConf(this.jobConf);
-			} else if (this.mapredOutputFormat instanceof JobConfigurable) {
-				((JobConfigurable) this.mapredOutputFormat).configure(this.jobConf);
-			}
-		}
-	}
-
-	/**
-	 * create the temporary output file for hadoop RecordWriter.
-	 * @param taskNumber The number of the parallel instance.
-	 * @param numTasks The number of parallel tasks.
-	 * @throws java.io.IOException
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-
-		// enforce sequential open() calls
-		synchronized (OPEN_MUTEX) {
-			if (Integer.toString(taskNumber + 1).length() > 6) {
-				throw new IOException("Task id too large.");
-			}
-
-			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
-					+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
-					+ Integer.toString(taskNumber + 1)
-					+ "_0");
-
-			this.jobConf.set("mapred.task.id", taskAttemptID.toString());
-			this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
-			// for hadoop 2.2
-			this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-			this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
-
-			this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
-
-			this.outputCommitter = this.jobConf.getOutputCommitter();
-
-			JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
-
-			this.outputCommitter.setupJob(jobContext);
-
-			this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
-		}
-	}
-
-	/**
-	 * commit the task by moving the output file out from the temporary directory.
-	 * @throws java.io.IOException
-	 */
-	@Override
-	public void close() throws IOException {
-
-		// enforce sequential close() calls
-		synchronized (CLOSE_MUTEX) {
-			this.recordWriter.close(new HadoopDummyReporter());
-
-			if (this.outputCommitter.needsTaskCommit(this.context)) {
-				this.outputCommitter.commitTask(this.context);
-			}
-		}
-	}
-
-	@Override
-	public void finalizeGlobal(int parallelism) throws IOException {
-
-		try {
-			JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
-			OutputCommitter outputCommitter = this.jobConf.getOutputCommitter();
-
-			// finalize HDFS output format
-			outputCommitter.commitJob(jobContext);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		super.write(out);
-		out.writeUTF(mapredOutputFormat.getClass().getName());
-		jobConf.write(out);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		super.read(in);
-		String hadoopOutputFormatName = in.readUTF();
-		if (jobConf == null) {
-			jobConf = new JobConf();
-		}
-		jobConf.readFields(in);
-		try {
-			this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K, V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
-		}
-		ReflectionUtils.setConf(mapredOutputFormat, jobConf);
-
-		jobConf.getCredentials().addAll(this.credentials);
-		Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
-		if (currentUserCreds != null) {
-			jobConf.getCredentials().addAll(currentUserCreds);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
deleted file mode 100644
index 8760968..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapred.utils;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Utility class to work with Apache Hadoop MapRed classes.
- */
-@Internal
-public final class HadoopUtils {
-
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
-
-	private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
-
-	/**
-	 * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
-	 */
-	public static void mergeHadoopConf(JobConf jobConf) {
-		// we have to load the global configuration here, because the HadoopInputFormatBase does not
-		// have access to a Flink configuration object
-		org.apache.flink.configuration.Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
-
-		Configuration hadoopConf = getHadoopConfiguration(flinkConfiguration);
-		for (Map.Entry<String, String> e : hadoopConf) {
-			if (jobConf.get(e.getKey()) == null) {
-				jobConf.set(e.getKey(), e.getValue());
-			}
-		}
-	}
-
-	/**
-	 * Returns a new Hadoop Configuration object using the path to the hadoop conf configured
-	 * in the main configuration (flink-conf.yaml).
-	 * This method is public because its being used in the HadoopDataSource.
-	 *
-	 * @param flinkConfiguration Flink configuration object
-	 * @return A Hadoop configuration instance
-	 */
-	public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
-
-		Configuration retConf = new Configuration();
-
-		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
-		// the hdfs configuration
-		// Try to load HDFS configuration from Hadoop's own configuration files
-		// 1. approach: Flink configuration
-		final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants
-				.HDFS_DEFAULT_CONFIG, null);
-		if (hdfsDefaultPath != null) {
-			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
-		} else {
-			LOG.debug("Cannot find hdfs-default configuration file");
-		}
-
-		final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
-		if (hdfsSitePath != null) {
-			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
-		} else {
-			LOG.debug("Cannot find hdfs-site configuration file");
-		}
-
-		// 2. Approach environment variables
-		String[] possibleHadoopConfPaths = new String[4];
-		possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
-		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
-
-		if (System.getenv("HADOOP_HOME") != null) {
-			possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf";
-			possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2
-		}
-
-		for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
-			if (possibleHadoopConfPath != null) {
-				if (new File(possibleHadoopConfPath).exists()) {
-					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
-						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
-
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
-						}
-					}
-					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
-						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
-
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
-						}
-					}
-				}
-			}
-		}
-		return retConf;
-	}
-
-	/**
-	 * Indicates whether the current user has an HDFS delegation token.
-	 */
-	public static boolean hasHDFSDelegationToken() throws Exception {
-		UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
-		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
-		for (Token<? extends TokenIdentifier> token : usrTok) {
-			if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private HadoopUtils() {
-		throw new RuntimeException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java
deleted file mode 100644
index ba404f8..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapred.wrapper;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import org.apache.hadoop.util.Progressable;
-
-/**
- * This is a dummy progress.
- *
- */
-@PublicEvolving
-public class HadoopDummyProgressable implements Progressable {
-	@Override
-	public void progress() {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java
deleted file mode 100644
index f6bc76e..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapred.wrapper;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This is a dummy progress monitor / reporter.
- *
- */
-@PublicEvolving
-public class HadoopDummyReporter implements Reporter {
-
-	@Override
-	public void progress() {
-	}
-
-	@Override
-	public void setStatus(String status) {
-
-	}
-
-	@Override
-	public Counter getCounter(Enum<?> name) {
-		return null;
-	}
-
-	@Override
-	public Counter getCounter(String group, String name) {
-		return null;
-	}
-
-	@Override
-	public void incrCounter(Enum<?> key, long amount) {
-
-	}
-
-	@Override
-	public void incrCounter(String group, String counter, long amount) {
-
-	}
-
-	@Override
-	public InputSplit getInputSplit() throws UnsupportedOperationException {
-		return null;
-	}
-
-	// There should be an @Override, but some CDH4 dependency does not contain this method
-	public float getProgress() {
-		return 0;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
deleted file mode 100644
index 0cca558..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapred.wrapper;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.LocatableInputSplit;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-/**
- * A wrapper that represents an input split from the Hadoop mapred API as
- * a Flink {@link InputSplit}.
- */
-@Internal
-public class HadoopInputSplit extends LocatableInputSplit {
-
-	private static final long serialVersionUID = -6990336376163226160L;
-
-	private final Class<? extends org.apache.hadoop.mapred.InputSplit> splitType;
-
-	private transient JobConf jobConf;
-
-	private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
-
-	public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
-		super(splitNumber, (String) null);
-
-		if (hInputSplit == null) {
-			throw new NullPointerException("Hadoop input split must not be null");
-		}
-		if (jobconf == null) {
-			throw new NullPointerException("Hadoop JobConf must not be null");
-		}
-
-		this.splitType = hInputSplit.getClass();
-
-		this.jobConf = jobconf;
-		this.hadoopInputSplit = hInputSplit;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String[] getHostnames() {
-		try {
-			return this.hadoopInputSplit.getLocations();
-		}
-		catch (IOException e) {
-			return new String[0];
-		}
-	}
-
-	public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
-		return hadoopInputSplit;
-	}
-
-	public JobConf getJobConf() {
-		return this.jobConf;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Serialization
-	// ------------------------------------------------------------------------
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		// serialize the parent fields and the final fields
-		out.defaultWriteObject();
-
-		// the job conf knows how to serialize itself
-		jobConf.write(out);
-
-		// write the input split
-		hadoopInputSplit.write(out);
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		// read the parent fields and the final fields
-		in.defaultReadObject();
-
-		// the job conf knows how to deserialize itself
-		jobConf = new JobConf();
-		jobConf.readFields(in);
-
-		try {
-			hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
-		}
-
-		if (hadoopInputSplit instanceof Configurable) {
-			((Configurable) hadoopInputSplit).setConf(this.jobConf);
-		}
-		else if (hadoopInputSplit instanceof JobConfigurable) {
-			((JobConfigurable) hadoopInputSplit).configure(this.jobConf);
-		}
-		hadoopInputSplit.readFields(in);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
deleted file mode 100644
index a4badfc..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapreduce;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-
-/**
- * InputFormat implementation allowing to use Hadoop (mapreduce) InputFormats with Flink.
- *
- * @param <K> Key Type
- * @param <V> Value Type
- */
-@Public
-public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>> implements ResultTypeQueryable<Tuple2<K, V>> {
-
-	private static final long serialVersionUID = 1L;
-
-	public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
-		super(mapreduceInputFormat, key, value, job);
-	}
-
-	public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value) throws IOException {
-		super(mapreduceInputFormat, key, value, Job.getInstance());
-	}
-
-	@Override
-	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
-		if (!this.fetched) {
-			fetchNext();
-		}
-		if (!this.hasNext) {
-			return null;
-		}
-		try {
-			record.f0 = recordReader.getCurrentKey();
-			record.f1 = recordReader.getCurrentValue();
-		} catch (InterruptedException e) {
-			throw new IOException("Could not get KeyValue pair.", e);
-		}
-		this.fetched = false;
-
-		return record;
-	}
-
-	@Override
-	public TypeInformation<Tuple2<K, V>> getProducedType() {
-		return new TupleTypeInfo<Tuple2<K, V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
deleted file mode 100644
index 06205e9..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapreduce;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
-import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
-import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Base class shared between the Java and Scala API of Flink.
- */
-@Internal
-public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCommonBase<T, HadoopInputSplit> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
-
-	// Mutexes to avoid concurrent operations on Hadoop InputFormats.
-	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
-	// In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
-	// might be used in the same JVM.
-	private static final Object OPEN_MUTEX = new Object();
-	private static final Object CONFIGURE_MUTEX = new Object();
-	private static final Object CLOSE_MUTEX = new Object();
-
-	// NOTE: this class is using a custom serialization logic, without a defaultWriteObject() method.
-	// Hence, all fields here are "transient".
-	private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
-	protected Class<K> keyClass;
-	protected Class<V> valueClass;
-	private org.apache.hadoop.conf.Configuration configuration;
-
-	protected transient RecordReader<K, V> recordReader;
-	protected boolean fetched = false;
-	protected boolean hasNext;
-
-	public HadoopInputFormatBase(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
-		super(Preconditions.checkNotNull(job, "Job can not be null").getCredentials());
-		this.mapreduceInputFormat = Preconditions.checkNotNull(mapreduceInputFormat);
-		this.keyClass = Preconditions.checkNotNull(key);
-		this.valueClass = Preconditions.checkNotNull(value);
-		this.configuration = job.getConfiguration();
-		HadoopUtils.mergeHadoopConf(configuration);
-	}
-
-	public org.apache.hadoop.conf.Configuration getConfiguration() {
-		return this.configuration;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  InputFormat
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void configure(Configuration parameters) {
-
-		// enforce sequential configuration() calls
-		synchronized (CONFIGURE_MUTEX) {
-			if (mapreduceInputFormat instanceof Configurable) {
-				((Configurable) mapreduceInputFormat).setConf(configuration);
-			}
-		}
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
-		// only gather base statistics for FileInputFormats
-		if (!(mapreduceInputFormat instanceof FileInputFormat)) {
-			return null;
-		}
-
-		JobContext jobContext = new JobContextImpl(configuration, null);
-
-		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
-				(FileBaseStatistics) cachedStats : null;
-
-		try {
-			final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
-			return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
-		} catch (IOException ioex) {
-			if (LOG.isWarnEnabled()) {
-				LOG.warn("Could not determine statistics due to an io error: "
-						+ ioex.getMessage());
-			}
-		} catch (Throwable t) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Unexpected problem while getting the file statistics: "
-						+ t.getMessage(), t);
-			}
-		}
-
-		// no statistics available
-		return null;
-	}
-
-	@Override
-	public HadoopInputSplit[] createInputSplits(int minNumSplits)
-			throws IOException {
-		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
-
-		JobContext jobContext = new JobContextImpl(configuration, new JobID());
-
-		jobContext.getCredentials().addAll(this.credentials);
-		Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
-		if (currentUserCreds != null) {
-			jobContext.getCredentials().addAll(currentUserCreds);
-		}
-
-		List<org.apache.hadoop.mapreduce.InputSplit> splits;
-		try {
-			splits = this.mapreduceInputFormat.getSplits(jobContext);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not get Splits.", e);
-		}
-		HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
-
-		for (int i = 0; i < hadoopInputSplits.length; i++) {
-			hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
-		}
-		return hadoopInputSplits;
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
-		return new LocatableInputSplitAssigner(inputSplits);
-	}
-
-	@Override
-	public void open(HadoopInputSplit split) throws IOException {
-
-		// enforce sequential open() calls
-		synchronized (OPEN_MUTEX) {
-
-			TaskAttemptContext context = new TaskAttemptContextImpl(configuration, new TaskAttemptID());
-
-			try {
-				this.recordReader = this.mapreduceInputFormat
-						.createRecordReader(split.getHadoopInputSplit(), context);
-				this.recordReader.initialize(split.getHadoopInputSplit(), context);
-			} catch (InterruptedException e) {
-				throw new IOException("Could not create RecordReader.", e);
-			} finally {
-				this.fetched = false;
-			}
-		}
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		if (!this.fetched) {
-			fetchNext();
-		}
-		return !this.hasNext;
-	}
-
-	protected void fetchNext() throws IOException {
-		try {
-			this.hasNext = this.recordReader.nextKeyValue();
-		} catch (InterruptedException e) {
-			throw new IOException("Could not fetch next KeyValue pair.", e);
-		} finally {
-			this.fetched = true;
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		if (this.recordReader != null) {
-
-			// enforce sequential close() calls
-			synchronized (CLOSE_MUTEX) {
-				this.recordReader.close();
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Helper methods
-	// --------------------------------------------------------------------------------------------
-
-	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
-											ArrayList<FileStatus> files) throws IOException {
-
-		long latestModTime = 0L;
-
-		// get the file info and check whether the cached statistics are still valid.
-		for (org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
-
-			final Path filePath = new Path(hadoopPath.toUri());
-			final FileSystem fs = FileSystem.get(filePath.toUri());
-
-			final FileStatus file = fs.getFileStatus(filePath);
-			latestModTime = Math.max(latestModTime, file.getModificationTime());
-
-			// enumerate all files and check their modification time stamp.
-			if (file.isDir()) {
-				FileStatus[] fss = fs.listStatus(filePath);
-				files.ensureCapacity(files.size() + fss.length);
-
-				for (FileStatus s : fss) {
-					if (!s.isDir()) {
-						files.add(s);
-						latestModTime = Math.max(s.getModificationTime(), latestModTime);
-					}
-				}
-			} else {
-				files.add(file);
-			}
-		}
-
-		// check whether the cached statistics are still valid, if we have any
-		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
-			return cachedStats;
-		}
-
-		// calculate the whole length
-		long len = 0;
-		for (FileStatus s : files) {
-			len += s.getLen();
-		}
-
-		// sanity check
-		if (len <= 0) {
-			len = BaseStatistics.SIZE_UNKNOWN;
-		}
-
-		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		super.write(out);
-		out.writeUTF(this.mapreduceInputFormat.getClass().getName());
-		out.writeUTF(this.keyClass.getName());
-		out.writeUTF(this.valueClass.getName());
-		this.configuration.write(out);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		super.read(in);
-		String hadoopInputFormatClassName = in.readUTF();
-		String keyClassName = in.readUTF();
-		String valueClassName = in.readUTF();
-
-		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
-		configuration.readFields(in);
-
-		if (this.configuration == null) {
-			this.configuration = configuration;
-		}
-
-		try {
-			this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K, V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
-		}
-		try {
-			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find key class.", e);
-		}
-		try {
-			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find value class.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
deleted file mode 100644
index 15414e2..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapreduce;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-
-/**
- * OutputFormat implementation allowing to use Hadoop (mapreduce) OutputFormats with Flink.
- *
- * @param <K> Key Type
- * @param <V> Value Type
- */
-@Public
-public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
-
-	private static final long serialVersionUID = 1L;
-
-	public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, Job job) {
-		super(mapreduceOutputFormat, job);
-	}
-
-	@Override
-	public void writeRecord(Tuple2<K, V> record) throws IOException {
-		try {
-			this.recordWriter.write(record.f0, record.f1);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not write Record.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
deleted file mode 100644
index 165455e..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapreduce;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.io.FinalizeOnMaster;
-import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
-import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
-import org.apache.flink.configuration.Configuration;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
-
-/**
- * Base class shared between the Java and Scala API of Flink.
- */
-@Internal
-public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormatCommonBase<T> implements FinalizeOnMaster {
-
-	private static final long serialVersionUID = 1L;
-
-	// Mutexes to avoid concurrent operations on Hadoop OutputFormats.
-	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
-	// In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
-	// might be used in the same JVM.
-	protected static final Object OPEN_MUTEX = new Object();
-	protected static final Object CONFIGURE_MUTEX = new Object();
-	protected static final Object CLOSE_MUTEX = new Object();
-
-	protected org.apache.hadoop.conf.Configuration configuration;
-	protected org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat;
-	protected transient RecordWriter<K, V> recordWriter;
-	protected transient OutputCommitter outputCommitter;
-	protected transient TaskAttemptContext context;
-	protected transient int taskNumber;
-
-	public HadoopOutputFormatBase(org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, Job job) {
-		super(job.getCredentials());
-		this.mapreduceOutputFormat = mapreduceOutputFormat;
-		this.configuration = job.getConfiguration();
-		HadoopUtils.mergeHadoopConf(configuration);
-	}
-
-	public org.apache.hadoop.conf.Configuration getConfiguration() {
-		return this.configuration;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  OutputFormat
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void configure(Configuration parameters) {
-
-		// enforce sequential configure() calls
-		synchronized (CONFIGURE_MUTEX) {
-			if (this.mapreduceOutputFormat instanceof Configurable) {
-				((Configurable) this.mapreduceOutputFormat).setConf(this.configuration);
-			}
-		}
-	}
-
-	/**
-	 * create the temporary output file for hadoop RecordWriter.
-	 * @param taskNumber The number of the parallel instance.
-	 * @param numTasks The number of parallel tasks.
-	 * @throws java.io.IOException
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-
-		// enforce sequential open() calls
-		synchronized (OPEN_MUTEX) {
-			if (Integer.toString(taskNumber + 1).length() > 6) {
-				throw new IOException("Task id too large.");
-			}
-
-			this.taskNumber = taskNumber + 1;
-
-			// for hadoop 2.2
-			this.configuration.set("mapreduce.output.basename", "tmp");
-
-			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
-					+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
-					+ Integer.toString(taskNumber + 1)
-					+ "_0");
-
-			this.configuration.set("mapred.task.id", taskAttemptID.toString());
-			this.configuration.setInt("mapred.task.partition", taskNumber + 1);
-			// for hadoop 2.2
-			this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-			this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
-
-			try {
-				this.context = new TaskAttemptContextImpl(this.configuration, taskAttemptID);
-				this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
-				this.outputCommitter.setupJob(new JobContextImpl(this.configuration, new JobID()));
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-
-			this.context.getCredentials().addAll(this.credentials);
-			Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
-			if (currentUserCreds != null) {
-				this.context.getCredentials().addAll(currentUserCreds);
-			}
-
-			// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
-			if (outputCommitter instanceof FileOutputCommitter) {
-				this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter) this.outputCommitter).getWorkPath().toString());
-			}
-
-			try {
-				this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
-			} catch (InterruptedException e) {
-				throw new IOException("Could not create RecordWriter.", e);
-			}
-		}
-	}
-
-	/**
-	 * commit the task by moving the output file out from the temporary directory.
-	 * @throws java.io.IOException
-	 */
-	@Override
-	public void close() throws IOException {
-
-		// enforce sequential close() calls
-		synchronized (CLOSE_MUTEX) {
-			try {
-				this.recordWriter.close(this.context);
-			} catch (InterruptedException e) {
-				throw new IOException("Could not close RecordReader.", e);
-			}
-
-			if (this.outputCommitter.needsTaskCommit(this.context)) {
-				this.outputCommitter.commitTask(this.context);
-			}
-
-			Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
-
-			// rename tmp-file to final name
-			FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
-
-			String taskNumberStr = Integer.toString(this.taskNumber);
-			String tmpFileTemplate = "tmp-r-00000";
-			String tmpFile = tmpFileTemplate.substring(0, 11 - taskNumberStr.length()) + taskNumberStr;
-
-			if (fs.exists(new Path(outputPath.toString() + "/" + tmpFile))) {
-				fs.rename(new Path(outputPath.toString() + "/" + tmpFile), new Path(outputPath.toString() + "/" + taskNumberStr));
-			}
-		}
-	}
-
-	@Override
-	public void finalizeGlobal(int parallelism) throws IOException {
-
-		JobContext jobContext;
-		TaskAttemptContext taskContext;
-		try {
-			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
-					+ String.format("%" + (6 - Integer.toString(1).length()) + "s", " ").replace(" ", "0")
-					+ Integer.toString(1)
-					+ "_0");
-
-			jobContext = new JobContextImpl(this.configuration, new JobID());
-			taskContext = new TaskAttemptContextImpl(this.configuration, taskAttemptID);
-			this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(taskContext);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-
-		jobContext.getCredentials().addAll(this.credentials);
-		Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
-		if (currentUserCreds != null) {
-			jobContext.getCredentials().addAll(currentUserCreds);
-		}
-
-		// finalize HDFS output format
-		if (this.outputCommitter != null) {
-			this.outputCommitter.commitJob(jobContext);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		super.write(out);
-		out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
-		this.configuration.write(out);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		super.read(in);
-		String hadoopOutputFormatClassName = in.readUTF();
-
-		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
-		configuration.readFields(in);
-
-		if (this.configuration == null) {
-			this.configuration = configuration;
-		}
-
-		try {
-			this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K, V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
deleted file mode 100644
index 3dc0c77..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapreduce.utils;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.GlobalConfiguration;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.util.Map;
-
-/**
- * Utility class to work with next generation of Apache Hadoop MapReduce classes.
- */
-@Internal
-public final class HadoopUtils {
-
-	/**
-	 * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
-	 */
-	public static void mergeHadoopConf(Configuration hadoopConfig) {
-
-		// we have to load the global configuration here, because the HadoopInputFormatBase does not
-		// have access to a Flink configuration object
-		org.apache.flink.configuration.Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
-
-		Configuration hadoopConf =
-			org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration(flinkConfiguration);
-
-		for (Map.Entry<String, String> e : hadoopConf) {
-			if (hadoopConfig.get(e.getKey()) == null) {
-				hadoopConfig.set(e.getKey(), e.getValue());
-			}
-		}
-	}
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private HadoopUtils() {
-		throw new RuntimeException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
deleted file mode 100644
index 8f3d13c..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapreduce.wrapper;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.LocatableInputSplit;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapreduce.JobContext;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-/**
- * A wrapper that represents an input split from the Hadoop mapreduce API as
- * a Flink {@link InputSplit}.
- */
-public class HadoopInputSplit extends LocatableInputSplit {
-
-	private static final long serialVersionUID = 6119153593707857235L;
-
-	private final Class<? extends org.apache.hadoop.mapreduce.InputSplit> splitType;
-
-	private transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit;
-
-	public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
-		super(splitNumber, (String) null);
-
-		if (mapreduceInputSplit == null) {
-			throw new NullPointerException("Hadoop input split must not be null");
-		}
-		if (!(mapreduceInputSplit instanceof Writable)) {
-			throw new IllegalArgumentException("InputSplit must implement Writable interface.");
-		}
-		this.splitType = mapreduceInputSplit.getClass();
-		this.mapreduceInputSplit = mapreduceInputSplit;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
-		return mapreduceInputSplit;
-	}
-
-	@Override
-	public String[] getHostnames() {
-		try {
-			return mapreduceInputSplit.getLocations();
-		}
-		catch (Exception e) {
-			return new String[0];
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Serialization
-	// ------------------------------------------------------------------------
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		// serialize the parent fields and the final fields
-		out.defaultWriteObject();
-
-		// write the input split
-		((Writable) mapreduceInputSplit).write(out);
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		// read the parent fields and the final fields
-		in.defaultReadObject();
-
-		try {
-			Class<? extends Writable> writableSplit = splitType.asSubclass(Writable.class);
-			mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(writableSplit);
-		}
-
-		catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the Hadoop InputSplit", e);
-		}
-
-		((Writable) mapreduceInputSplit).readFields(in);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
deleted file mode 100644
index bfee273..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.mapred;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link HadoopInputFormat}.
- */
-public class HadoopInputFormatTest {
-
-	@Test
-	public void testConfigureWithConfigurableInstance() {
-		ConfigurableDummyInputFormat inputFormat = mock(ConfigurableDummyInputFormat.class);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
-		verify(inputFormat, times(1)).setConf(any(JobConf.class));
-
-		hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration());
-		verify(inputFormat, times(2)).setConf(any(JobConf.class));
-	}
-
-	@Test
-	public void testConfigureWithJobConfigurableInstance() {
-		JobConfigurableDummyInputFormat inputFormat = mock(JobConfigurableDummyInputFormat.class);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
-		verify(inputFormat, times(1)).configure(any(JobConf.class));
-
-		hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration());
-		verify(inputFormat, times(2)).configure(any(JobConf.class));
-	}
-
-	@Test
-	public void testOpenClose() throws Exception {
-		DummyRecordReader recordReader = mock(DummyRecordReader.class);
-		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
-		when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
-		hadoopInputFormat.open(getHadoopInputSplit());
-
-		verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class));
-		verify(recordReader, times(1)).createKey();
-		verify(recordReader, times(1)).createValue();
-
-		assertThat(hadoopInputFormat.fetched, is(false));
-
-		hadoopInputFormat.close();
-		verify(recordReader, times(1)).close();
-	}
-
-	@Test
-	public void testOpenWithConfigurableReader() throws Exception {
-		ConfigurableDummyRecordReader recordReader = mock(ConfigurableDummyRecordReader.class);
-		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
-		when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
-		hadoopInputFormat.open(getHadoopInputSplit());
-
-		verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class));
-		verify(recordReader, times(1)).setConf(any(JobConf.class));
-		verify(recordReader, times(1)).createKey();
-		verify(recordReader, times(1)).createValue();
-
-		assertThat(hadoopInputFormat.fetched, is(false));
-
-	}
-
-	@Test
-	public void testCreateInputSplits() throws Exception {
-
-		FileSplit[] result = new FileSplit[1];
-		result[0] = getFileSplit();
-		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
-		when(inputFormat.getSplits(any(JobConf.class), anyInt())).thenReturn(result);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
-		hadoopInputFormat.createInputSplits(2);
-
-		verify(inputFormat, times(1)).getSplits(any(JobConf.class), anyInt());
-	}
-
-	@Test
-	public void testReachedEndWithElementsRemaining() throws IOException {
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, new JobConf());
-		hadoopInputFormat.fetched = true;
-		hadoopInputFormat.hasNext = true;
-
-		assertThat(hadoopInputFormat.reachedEnd(), is(false));
-	}
-
-	@Test
-	public void testReachedEndWithNoElementsRemaining() throws IOException {
-		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, new JobConf());
-		hadoopInputFormat.fetched = true;
-		hadoopInputFormat.hasNext = false;
-
-		assertThat(hadoopInputFormat.reachedEnd(), is(true));
-	}
-
-	@Test
-	public void testFetchNext() throws IOException {
-		DummyRecordReader recordReader = mock(DummyRecordReader.class);
-		when(recordReader.next(anyString(), anyLong())).thenReturn(true);
-
-		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
-		when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
-
-		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
-		hadoopInputFormat.open(getHadoopInputSplit());
-		hadoopInputFormat.fetchNext();
-
-		verify(recordReader, times(1)).next(anyString(), anyLong());
-		assertThat(hadoopInputFormat.hasNext, is(true));
-		assertThat(hadoopInputFormat.fetched, is(true));
-	}
-
-	@Test
-	public void checkTypeInformation() throws Exception {
-		HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<>(
-				new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, new JobConf());
-
-		TypeInformation<Tuple2<Void, Long>> tupleType = hadoopInputFormat.getProducedType();
-		TypeInformation<Tuple2<Void, Long>> expectedType = new TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
-
-		assertThat(tupleType.isTupleType(), is(true));
-		assertThat(tupleType, is(equalTo(expectedType)));
-	}
-
-	@Test
-	public void testCloseWithoutOpen() throws Exception {
-		HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<>(
-			new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, new JobConf());
-		hadoopInputFormat.close();
-	}
-
-	private HadoopInputSplit getHadoopInputSplit() {
-		return new HadoopInputSplit(1, getFileSplit(), new JobConf());
-	}
-
-	private FileSplit getFileSplit() {
-		return new FileSplit(new Path("path"), 1, 2, new String[]{});
-	}
-
-	private class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
-
-		public DummyVoidKeyInputFormat() {}
-
-		@Override
-		public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
-			return null;
-		}
-	}
-
-	private class DummyRecordReader implements RecordReader<String, Long> {
-
-		@Override
-		public float getProgress() throws IOException {
-			return 0;
-		}
-
-		@Override
-		public boolean next(String s, Long aLong) throws IOException {
-			return false;
-		}
-
-		@Override
-		public String createKey() {
-			return null;
-		}
-
-		@Override
-		public Long createValue() {
-			return null;
-		}
-
-		@Override
-		public long getPos() throws IOException {
-			return 0;
-		}
-
-		@Override
-		public void close() throws IOException {
-
-		}
-	}
-
-	private class ConfigurableDummyRecordReader implements RecordReader<String, Long>, Configurable {
-
-		@Override
-		public void setConf(Configuration configuration) {}
-
-		@Override
-		public Configuration getConf() {
-			return null;
-		}
-
-		@Override
-		public boolean next(String s, Long aLong) throws IOException {
-			return false;
-		}
-
-		@Override
-		public String createKey() {
-			return null;
-		}
-
-		@Override
-		public Long createValue() {
-			return null;
-		}
-
-		@Override
-		public long getPos() throws IOException {
-			return 0;
-		}
-
-		@Override
-		public void close() throws IOException {
-
-		}
-
-		@Override
-		public float getProgress() throws IOException {
-			return 0;
-		}
-	}
-
-	private class DummyInputFormat implements InputFormat<String, Long> {
-
-		@Override
-		public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
-			return new InputSplit[0];
-		}
-
-		@Override
-		public RecordReader<String, Long> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
-			return null;
-		}
-	}
-
-	private class ConfigurableDummyInputFormat extends DummyInputFormat implements Configurable {
-		@Override
-		public void setConf(Configuration configuration) {}
-
-		@Override
-		public Configuration getConf() {
-			return null;
-		}
-	}
-
-	private class JobConfigurableDummyInputFormat extends DummyInputFormat implements JobConfigurable {
-
-		@Override
-		public void configure(JobConf jobConf) {}
-	}
-}


Mime
View raw message