flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [3/4] git commit: The Hadoop Compatibility has been refactored and extended to support the new Java API.
Date Sat, 21 Jun 2014 09:45:27 GMT
The Hadoop Compatibility has been refactored and extended to support the new Java API.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a65b7591
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a65b7591
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a65b7591

Branch: refs/heads/master
Commit: a65b7591e43072742b9c8d36880f46a27d9822c9
Parents: 191dd1d
Author: twalthr <info@twalthr.com>
Authored: Wed May 7 02:54:29 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Sat Jun 21 11:43:40 2014 +0200

----------------------------------------------------------------------
 .../DummyHadoopProgressable.java                |  27 --
 .../DummyHadoopReporter.java                    |  65 ----
 .../FileOutputCommitterWrapper.java             | 191 -----------
 .../HadoopConfiguration.java                    |  34 --
 .../hadoopcompatibility/HadoopDataSink.java     | 102 ------
 .../hadoopcompatibility/HadoopDataSource.java   |  79 -----
 .../HadoopInputFormatWrapper.java               | 164 ---------
 .../HadoopInputSplitWrapper.java                |  87 -----
 .../HadoopOutputFormatWrapper.java              | 147 --------
 .../datatypes/DefaultHadoopTypeConverter.java   |  78 -----
 .../DefaultStratosphereTypeConverter.java       |  91 -----
 .../datatypes/HadoopTypeConverter.java          |  36 --
 .../datatypes/StratosphereTypeConverter.java    |  37 --
 .../datatypes/WritableComparableWrapper.java    |  35 --
 .../datatypes/WritableWrapper.java              |  66 ----
 .../datatypes/WritableWrapperConverter.java     |  40 ---
 .../hadoopcompatibility/example/WordCount.java  | 172 ----------
 .../WordCountWithHadoopOutputFormat.java        | 161 ---------
 .../mapred/HadoopInputFormat.java               | 287 ++++++++++++++++
 .../mapred/HadoopOutputFormat.java              | 164 +++++++++
 .../mapred/example/WordCount.java               | 115 +++++++
 .../mapred/record/HadoopDataSink.java           | 102 ++++++
 .../mapred/record/HadoopDataSource.java         |  81 +++++
 .../mapred/record/HadoopRecordInputFormat.java  | 167 +++++++++
 .../mapred/record/HadoopRecordOutputFormat.java | 151 +++++++++
 .../datatypes/DefaultHadoopTypeConverter.java   |  78 +++++
 .../DefaultStratosphereTypeConverter.java       |  91 +++++
 .../datatypes/HadoopFileOutputCommitter.java    | 191 +++++++++++
 .../record/datatypes/HadoopTypeConverter.java   |  36 ++
 .../datatypes/StratosphereTypeConverter.java    |  37 ++
 .../datatypes/WritableComparableWrapper.java    |  35 ++
 .../record/datatypes/WritableWrapper.java       |  66 ++++
 .../datatypes/WritableWrapperConverter.java     |  40 +++
 .../mapred/record/example/WordCount.java        | 179 ++++++++++
 .../example/WordCountWithOutputFormat.java      | 168 +++++++++
 .../mapred/utils/HadoopUtils.java               |  84 +++++
 .../mapred/wrapper/HadoopDummyProgressable.java |  27 ++
 .../mapred/wrapper/HadoopDummyReporter.java     |  65 ++++
 .../mapred/wrapper/HadoopInputSplit.java        |  87 +++++
 .../mapreduce/HadoopInputFormat.java            | 337 +++++++++++++++++++
 .../mapreduce/HadoopOutputFormat.java           | 204 +++++++++++
 .../mapreduce/example/WordCount.java            | 114 +++++++
 .../mapreduce/utils/HadoopUtils.java            |  80 +++++
 .../mapreduce/wrapper/HadoopInputSplit.java     |  86 +++++
 .../HadoopInputOutputTest.java                  |  48 ---
 .../mapred/HadoopInputOutputITCase.java         |  40 +++
 .../record/HadoopRecordInputOutputITCase.java   |  48 +++
 .../mapreduce/HadoopInputOutputITCase.java      |  40 +++
 48 files changed, 3200 insertions(+), 1660 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopProgressable.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopProgressable.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopProgressable.java
deleted file mode 100644
index 11c8606..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopProgressable.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility;
-
-import org.apache.hadoop.util.Progressable;
-
-/**
- * This is a dummy progress
- *
- */
-public class DummyHadoopProgressable implements Progressable {
-	@Override
-	public void progress() {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopReporter.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopReporter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopReporter.java
deleted file mode 100644
index 823217b..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopReporter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility;
-
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This is a dummy progress monitor / reporter
- *
- */
-public class DummyHadoopReporter implements Reporter {
-
-	@Override
-	public void progress() {
-	}
-
-	@Override
-	public void setStatus(String status) {
-
-	}
-
-	@Override
-	public Counter getCounter(Enum<?> name) {
-		return null;
-	}
-
-	@Override
-	public Counter getCounter(String group, String name) {
-		return null;
-	}
-
-	@Override
-	public void incrCounter(Enum<?> key, long amount) {
-
-	}
-
-	@Override
-	public void incrCounter(String group, String counter, long amount) {
-
-	}
-
-	@Override
-	public InputSplit getInputSplit() throws UnsupportedOperationException {
-		return null;
-	}
-
-	@Override
-	public float getProgress() {
-		return 0;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/FileOutputCommitterWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/FileOutputCommitterWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/FileOutputCommitterWrapper.java
deleted file mode 100644
index 23a4605..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/FileOutputCommitterWrapper.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Hadoop 1.2.1 {@link org.apache.hadoop.mapred.FileOutputCommitter} takes {@link org.apache.hadoop.mapred.JobContext}
- * as input parameter. However JobContext class is package private, and in Hadoop 2.2.0 it's public.
- * This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead of JobContext in order to setup and commit tasks.
- */
-public class FileOutputCommitterWrapper extends FileOutputCommitter implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
-		"mapreduce.fileoutputcommitter.marksuccessfuljobs";
-
-	public void setupJob(JobConf conf) throws IOException {
-		Path outputPath = FileOutputFormat.getOutputPath(conf);
-		if (outputPath != null) {
-			Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
-			FileSystem fileSys = tmpDir.getFileSystem(conf);
-			if (!fileSys.mkdirs(tmpDir)) {
-				LOG.error("Mkdirs failed to create " + tmpDir.toString());
-			}
-		}
-	}
-
-	private static boolean getOutputDirMarking(JobConf conf) {
-		return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
-			true);
-	}
-
-	private void markSuccessfulOutputDir(JobConf conf)
-		throws IOException {
-		Path outputPath = FileOutputFormat.getOutputPath(conf);
-		if (outputPath != null) {
-			FileSystem fileSys = outputPath.getFileSystem(conf);
-			// create a file in the folder to mark it
-			if (fileSys.exists(outputPath)) {
-				Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
-				fileSys.create(filePath).close();
-			}
-		}
-	}
-
-	private Path getFinalPath(Path jobOutputDir, Path taskOutput,
-							Path taskOutputPath) throws IOException {
-		URI taskOutputUri = taskOutput.toUri();
-		URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
-		if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
-			throw new IOException("Can not get the relative path: base = " +
-				taskOutputPath + " child = " + taskOutput);
-		}
-		if (relativePath.getPath().length() > 0) {
-			return new Path(jobOutputDir, relativePath.getPath());
-		} else {
-			return jobOutputDir;
-		}
-	}
-	private void moveTaskOutputs(JobConf conf, TaskAttemptID taskAttemptID,
-								FileSystem fs,
-								Path jobOutputDir,
-								Path taskOutput)
-		throws IOException {
-		if (fs.isFile(taskOutput)) {
-			Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
-				getTempTaskOutputPath(conf, taskAttemptID));
-			if (!fs.rename(taskOutput, finalOutputPath)) {
-				if (!fs.delete(finalOutputPath, true)) {
-					throw new IOException("Failed to delete earlier output of task: " +
-						taskAttemptID);
-				}
-				if (!fs.rename(taskOutput, finalOutputPath)) {
-					throw new IOException("Failed to save output of task: " +
-						taskAttemptID);
-				}
-			}
-			LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
-		} else if(fs.getFileStatus(taskOutput).isDir()) {
-			FileStatus[] paths = fs.listStatus(taskOutput);
-			Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
-				getTempTaskOutputPath(conf, taskAttemptID));
-			fs.mkdirs(finalOutputPath);
-			if (paths != null) {
-				for (FileStatus path : paths) {
-					moveTaskOutputs(conf,taskAttemptID, fs, jobOutputDir, path.getPath());
-				}
-			}
-		}
-	}
-
-	public void commitTask(JobConf conf, TaskAttemptID taskAttemptID)
-		throws IOException {
-		Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
-		if (taskOutputPath != null) {
-			FileSystem fs = taskOutputPath.getFileSystem(conf);
-			if (fs.exists(taskOutputPath)) {
-				Path jobOutputPath = taskOutputPath.getParent().getParent();
-				// Move the task outputs to their final place
-				moveTaskOutputs(conf,taskAttemptID, fs, jobOutputPath, taskOutputPath);
-				// Delete the temporary task-specific output directory
-				if (!fs.delete(taskOutputPath, true)) {
-					LOG.info("Failed to delete the temporary output" +
-						" directory of task: " + taskAttemptID + " - " + taskOutputPath);
-				}
-				LOG.info("Saved output of task '" + taskAttemptID + "' to " +
-					jobOutputPath);
-			}
-		}
-	}
-	public boolean needsTaskCommit(JobConf conf, TaskAttemptID taskAttemptID)
-		throws IOException {
-		try {
-			Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
-			if (taskOutputPath != null) {
-				// Get the file-system for the task output directory
-				FileSystem fs = taskOutputPath.getFileSystem(conf);
-				// since task output path is created on demand,
-				// if it exists, task needs a commit
-				if (fs.exists(taskOutputPath)) {
-					return true;
-				}
-			}
-		} catch (IOException  ioe) {
-			throw ioe;
-		}
-		return false;
-	}
-
-	public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) {
-		Path outputPath = FileOutputFormat.getOutputPath(conf);
-		if (outputPath != null) {
-			Path p = new Path(outputPath,
-				(FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
-					"_" + taskAttemptID.toString()));
-			try {
-				FileSystem fs = p.getFileSystem(conf);
-				return p.makeQualified(fs);
-			} catch (IOException ie) {
-				LOG.warn(StringUtils.stringifyException(ie));
-				return p;
-			}
-		}
-		return null;
-	}
-	public void cleanupJob(JobConf conf) throws IOException {
-		// do the clean up of temporary directory
-		Path outputPath = FileOutputFormat.getOutputPath(conf);
-		if (outputPath != null) {
-			Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
-			FileSystem fileSys = tmpDir.getFileSystem(conf);
-			if (fileSys.exists(tmpDir)) {
-				fileSys.delete(tmpDir, true);
-			}
-		} else {
-			LOG.warn("Output path is null in cleanup");
-		}
-	}
-
-	public void commitJob(JobConf conf) throws IOException {
-		cleanupJob(conf);
-		if (getOutputDirMarking(conf)) {
-			markSuccessfulOutputDir(conf);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopConfiguration.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopConfiguration.java
deleted file mode 100644
index 350871d..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopConfiguration.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility;
-
-import java.util.Map;
-
-import org.apache.hadoop.mapred.JobConf;
-
-import eu.stratosphere.runtime.fs.hdfs.DistributedFileSystem;
-
-/**
- * merge hadoopConf into jobConf. This is necessary for the hdfs configuration
-
- */
-
-public class HadoopConfiguration {
-	public static void mergeHadoopConf(JobConf jobConf) {
-		org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
-		for (Map.Entry<String, String> e : hadoopConf) {
-			jobConf.set(e.getKey(), e.getValue());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSink.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSink.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSink.java
deleted file mode 100644
index 62a7eba..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSink.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility;
-
-import java.util.List;
-
-import eu.stratosphere.types.Record;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import eu.stratosphere.api.java.record.operators.GenericDataSink;
-import eu.stratosphere.api.common.operators.Operator;
-import eu.stratosphere.compiler.contextcheck.Validatable;
-import eu.stratosphere.hadoopcompatibility.datatypes.DefaultStratosphereTypeConverter;
-import eu.stratosphere.hadoopcompatibility.datatypes.StratosphereTypeConverter;
-
-/**
- * The HadoopDataSink is a generic wrapper for all Hadoop OutputFormats.
- *
- * Example usage:
- * <pre>
- * 		HadoopDataSink out = new HadoopDataSink(new org.apache.hadoop.mapred.TextOutputFormat<Text, IntWritable>(), new JobConf(), "Hadoop TextOutputFormat",reducer, Text.class,IntWritable.class);
- *		org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
- * </pre>
- *
- * Note that it is possible to provide custom data type converter.
- *
- * The HadoopDataSink provides a default converter: {@link eu.stratosphere.hadoopcompatibility.datatypes.DefaultStratosphereTypeConverter}
- **/
-public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable {
-
-	private static String DEFAULT_NAME = "<Unnamed Hadoop Data Sink>";
-
-	private JobConf jobConf;
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, StratosphereTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, name, ImmutableList.<Operator<Record>>of(input), conv, keyClass, valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, name, input, new DefaultStratosphereTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, DEFAULT_NAME, input, new DefaultStratosphereTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new DefaultStratosphereTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-
-
-	@SuppressWarnings("deprecation")
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, StratosphereTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
-		super(new HadoopOutputFormatWrapper<K,V>(hadoopFormat, jobConf, conv),input, name);
-		Preconditions.checkNotNull(hadoopFormat);
-		Preconditions.checkNotNull(jobConf);
-		this.name = name;
-		this.jobConf = jobConf;
-		jobConf.setOutputKeyClass(keyClass);
-		jobConf.setOutputValueClass(valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, name, input, new DefaultStratosphereTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, DEFAULT_NAME, input, new DefaultStratosphereTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new DefaultStratosphereTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-	public JobConf getJobConf() {
-		return this.jobConf;
-	}
-
-	@Override
-	public void check() {
-		// see for more details https://github.com/stratosphere/stratosphere/pull/531
-		Preconditions.checkNotNull(FileOutputFormat.getOutputPath(jobConf), "The HadoopDataSink currently expects a correct outputPath.");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSource.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSource.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSource.java
deleted file mode 100644
index fe1f365..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSource.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility;
-
-
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.google.common.base.Preconditions;
-
-import eu.stratosphere.api.java.record.operators.GenericDataSource;
-import eu.stratosphere.hadoopcompatibility.datatypes.DefaultHadoopTypeConverter;
-import eu.stratosphere.hadoopcompatibility.datatypes.HadoopTypeConverter;
-
-/**
- * The HadoopDataSource is a generic wrapper for all Hadoop InputFormats.
- * 
- * Example usage:
- * <pre>
- * 		HadoopDataSource source = new HadoopDataSource(new org.apache.hadoop.mapred.TextInputFormat(), new JobConf(), "Input Lines");
- *		org.apache.hadoop.mapred.TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
- * </pre>
- * 
- * Note that it is possible to provide custom data type converter.
- * 
- * The HadoopDataSource provides two different standard converters:
- * * WritableWrapperConverter: Converts Hadoop Types to a record that contains a WritableComparableWrapper (key) and a WritableWrapper
- * * DefaultHadoopTypeConverter: Converts the standard hadoop types (longWritable, Text) to Stratosphere's standard types.
- *
- */
-public class HadoopDataSource<K,V> extends GenericDataSource<HadoopInputFormatWrapper<K,V>> {
-
-	private static String DEFAULT_NAME = "<Unnamed Hadoop Data Source>";
-	
-	private JobConf jobConf;
-	
-	/**
-	 * 
-	 * @param hadoopFormat Implementation of a Hadoop input format
-	 * @param jobConf JobConf object (Hadoop)
-	 * @param name Name of the DataSource
-	 * @param conv Definition of a custom type converter {@link DefaultHadoopTypeConverter}.
-	 */
-	public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name, HadoopTypeConverter<K,V> conv) {
-		super(new HadoopInputFormatWrapper<K,V>(hadoopFormat, jobConf, conv),name);
-		Preconditions.checkNotNull(hadoopFormat);
-		Preconditions.checkNotNull(jobConf);
-		Preconditions.checkNotNull(conv);
-		this.name = name;
-		this.jobConf = jobConf;
-	}
-	
-	public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name) {
-		this(hadoopFormat, jobConf, name, new DefaultHadoopTypeConverter<K,V>() );
-	}
-	public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf) {
-		this(hadoopFormat, jobConf, DEFAULT_NAME);
-	}
-	
-	public HadoopDataSource(InputFormat<K,V> hadoopFormat) {
-		this(hadoopFormat, new JobConf(), DEFAULT_NAME);
-	}
-
-	public JobConf getJobConf() {
-		return this.jobConf;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputFormatWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputFormatWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputFormatWrapper.java
deleted file mode 100644
index 968b746..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputFormatWrapper.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import eu.stratosphere.api.common.io.InputFormat;
-import eu.stratosphere.api.common.io.statistics.BaseStatistics;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.hadoopcompatibility.datatypes.HadoopTypeConverter;
-import eu.stratosphere.types.Record;
-
-public class HadoopInputFormatWrapper<K, V> implements InputFormat<Record, HadoopInputSplitWrapper> {
-
-	private static final long serialVersionUID = 1L;
-
-	public org.apache.hadoop.mapred.InputFormat<K, V> hadoopInputFormat;
-	public HadoopTypeConverter<K,V> converter;
-	private String hadoopInputFormatName;
-	public JobConf jobConf;
-	public transient K key;
-	public transient V value;
-	public RecordReader<K, V> recordReader;
-	private boolean fetched = false;
-	private boolean hasNext;
-		
-	public HadoopInputFormatWrapper() {
-		super();
-	}
-	
-	public HadoopInputFormatWrapper(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat, JobConf job, HadoopTypeConverter<K,V> conv) {
-		super();
-		this.hadoopInputFormat = hadoopInputFormat;
-		this.hadoopInputFormatName = hadoopInputFormat.getClass().getName();
-		this.converter = conv;
-		HadoopConfiguration.mergeHadoopConf(job);
-		this.jobConf = job;
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-		
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
-		return null;
-	}
-
-	@Override
-	public HadoopInputSplitWrapper[] createInputSplits(int minNumSplits)
-			throws IOException {
-		org.apache.hadoop.mapred.InputSplit[] splitArray = hadoopInputFormat.getSplits(jobConf, minNumSplits);
-		HadoopInputSplitWrapper[] hiSplit = new HadoopInputSplitWrapper[splitArray.length];
-		for(int i=0;i<splitArray.length;i++){
-			hiSplit[i] = new HadoopInputSplitWrapper(splitArray[i], jobConf);
-		}
-		return hiSplit;
-	}
-
-	@Override
-	public Class<? extends HadoopInputSplitWrapper> getInputSplitType() {
-		return HadoopInputSplitWrapper.class;
-	}
-
-	@Override
-	public void open(HadoopInputSplitWrapper split) throws IOException {
-		this.recordReader = this.hadoopInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new DummyHadoopReporter());
-		key = this.recordReader.createKey();
-		value = this.recordReader.createValue();
-		this.fetched = false;
-	}
-
-	private void fetchNext() throws IOException {
-		hasNext = this.recordReader.next(key, value);
-		fetched = true;
-	}
-	
-	@Override
-	public boolean reachedEnd() throws IOException {
-		if(!fetched) {
-			fetchNext();
-		}
-		return !hasNext;
-	}
-
-	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if(!fetched) {
-			fetchNext();
-		}
-		if(!hasNext) {
-			return null;
-		}
-		converter.convert(record, key, value);
-		fetched = false;
-		return record;
-	}
-
-	@Override
-	public void close() throws IOException {
-		this.recordReader.close();
-	}
-	
-	/**
-	 * Custom serialization methods.
-	 *  @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
-	 */
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(hadoopInputFormatName);
-		jobConf.write(out);
-		out.writeObject(converter);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		hadoopInputFormatName = in.readUTF();
-		if(jobConf == null) {
-			jobConf = new JobConf();
-		}
-		jobConf.readFields(in);
-		try {
-			this.hadoopInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(this.hadoopInputFormatName).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
-		}
-		ReflectionUtils.setConf(hadoopInputFormat, jobConf);
-		converter = (HadoopTypeConverter<K,V>) in.readObject();
-	}
-	
-	public void setJobConf(JobConf job) {
-		this.jobConf = job;
-	}
-		
-
-	public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() {
-		return hadoopInputFormat;
-	}
-	
-	public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat) {
-		this.hadoopInputFormat = hadoopInputFormat;
-	}
-	
-	public JobConf getJobConf() {
-		return jobConf;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java
deleted file mode 100644
index 936d5c8..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapred.JobConf;
-
-import eu.stratosphere.core.io.InputSplit;
-
-
-public class HadoopInputSplitWrapper implements InputSplit {
-
-	public transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
-	public JobConf jobConf;
-	private int splitNumber;
-	private String hadoopInputSplitTypeName;
-	
-	
-	public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
-		return hadoopInputSplit;
-	}
-	
-	
-	public HadoopInputSplitWrapper() {
-		super();
-	}
-	
-	
-	public HadoopInputSplitWrapper(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
-		this.hadoopInputSplit = hInputSplit;
-		this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
-		this.jobConf=jobconf;
-	}
-	
-	@Override
-	public void write(DataOutput out) throws IOException {
-		out.writeInt(splitNumber);
-		out.writeUTF(hadoopInputSplitTypeName);
-		hadoopInputSplit.write(out);
-	}
-
-	@Override
-	public void read(DataInput in) throws IOException {
-		this.splitNumber=in.readInt();
-		this.hadoopInputSplitTypeName = in.readUTF();
-		if(hadoopInputSplit == null) {
-			try {
-				Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
-						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
-				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Unable to create InputSplit", e);
-			}
-		}
-		this.hadoopInputSplit.readFields(in);
-	}
-
-	@Override
-	public int getSplitNumber() {
-		return this.splitNumber;
-	}
-
-	public void setSplitNumber(int splitNumber) {
-		this.splitNumber = splitNumber;
-	}
-	
-	public void setHadoopInputSplit(
-			org.apache.hadoop.mapred.InputSplit hadoopInputSplit) {
-		this.hadoopInputSplit = hadoopInputSplit;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopOutputFormatWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopOutputFormatWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopOutputFormatWrapper.java
deleted file mode 100644
index 93a203f..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopOutputFormatWrapper.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import eu.stratosphere.api.common.io.OutputFormat;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.hadoopcompatibility.datatypes.StratosphereTypeConverter;
-import eu.stratosphere.types.Record;
-
-
-public class HadoopOutputFormatWrapper<K,V> implements OutputFormat<Record> {
-
-	private static final long serialVersionUID = 1L;
-
-	public JobConf jobConf;
-
-	public org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat;
-
-	private String hadoopOutputFormatName;
-
-	public RecordWriter<K,V> recordWriter;
-
-	public StratosphereTypeConverter<K,V> converter;
-
-	public FileOutputCommitterWrapper fileOutputCommitterWrapper;
-
-	public HadoopOutputFormatWrapper(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopFormat, JobConf job, StratosphereTypeConverter<K,V> conv) {
-		super();
-		this.hadoopOutputFormat = hadoopFormat;
-		this.hadoopOutputFormatName = hadoopFormat.getClass().getName();
-		this.converter = conv;
-		this.fileOutputCommitterWrapper = new FileOutputCommitterWrapper();
-		HadoopConfiguration.mergeHadoopConf(job);
-		this.jobConf = job;
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-	}
-
-	/**
-	 * create the temporary output file for hadoop RecordWriter.
-	 * @param taskNumber The number of the parallel instance.
-	 * @param numTasks The number of parallel tasks.
-	 * @throws IOException
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		this.fileOutputCommitterWrapper.setupJob(this.jobConf);
-		if (Integer.toString(taskNumber + 1).length() <= 6) {
-			this.jobConf.set("mapred.task.id", "attempt__0000_r_" + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + Integer.toString(taskNumber + 1) + "_0");
-			//compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
-			this.jobConf.set("mapreduce.task.output.dir", this.fileOutputCommitterWrapper.getTempTaskOutputPath(this.jobConf,TaskAttemptID.forName(this.jobConf.get("mapred.task.id"))).toString());
-		} else {
-			throw new IOException("task id too large");
-		}
-		this.recordWriter = this.hadoopOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new DummyHadoopProgressable());
-	}
-
-
-	@Override
-	public void writeRecord(Record record) throws IOException {
-		K key = this.converter.convertKey(record);
-		V value = this.converter.convertValue(record);
-		this.recordWriter.write(key, value);
-	}
-
-	/**
-	 * commit the task by moving the output file out from the temporary directory.
-	 * @throws IOException
-	 */
-	@Override
-	public void close() throws IOException {
-		this.recordWriter.close(new DummyHadoopReporter());
-		if (this.fileOutputCommitterWrapper.needsTaskCommit(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id")))) {
-			this.fileOutputCommitterWrapper.commitTask(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id")));
-		}
-	//TODO: commitjob when all the tasks are finished
-	}
-
-
-	/**
-	 * Custom serialization methods.
-	 *  @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
-	 */
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(hadoopOutputFormatName);
-		jobConf.write(out);
-		out.writeObject(converter);
-		out.writeObject(fileOutputCommitterWrapper);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		hadoopOutputFormatName = in.readUTF();
-		if(jobConf == null) {
-			jobConf = new JobConf();
-		}
-		jobConf.readFields(in);
-		try {
-			this.hadoopOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(this.hadoopOutputFormatName).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
-		}
-		ReflectionUtils.setConf(hadoopOutputFormat, jobConf);
-		converter = (StratosphereTypeConverter<K,V>) in.readObject();
-		fileOutputCommitterWrapper = (FileOutputCommitterWrapper) in.readObject();
-	}
-
-
-	public void setJobConf(JobConf job) {
-		this.jobConf = job;
-	}
-
-	public JobConf getJobConf() {
-		return jobConf;
-	}
-
-	public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() {
-		return hadoopOutputFormat;
-	}
-
-	public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat) {
-		this.hadoopOutputFormat = hadoopOutputFormat;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java
deleted file mode 100644
index 42f81de..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility.datatypes;
-
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-
-import eu.stratosphere.types.BooleanValue;
-import eu.stratosphere.types.ByteValue;
-import eu.stratosphere.types.DoubleValue;
-import eu.stratosphere.types.FloatValue;
-import eu.stratosphere.types.IntValue;
-import eu.stratosphere.types.LongValue;
-import eu.stratosphere.types.NullValue;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.types.StringValue;
-import eu.stratosphere.types.Value;
-
-
-/**
- * Converter for the default hadoop writables.
- * Key will be in field 0, Value in field 1 of a Stratosphere Record.
- */
-public class DefaultHadoopTypeConverter<K, V> implements HadoopTypeConverter<K, V> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void convert(Record stratosphereRecord, K hadoopKey, V hadoopValue) {
-		stratosphereRecord.setField(0, convert(hadoopKey));
-		stratosphereRecord.setField(1, convert(hadoopValue));
-	}
-	
-	protected Value convert(Object hadoopType) {
-		if(hadoopType instanceof org.apache.hadoop.io.LongWritable ) {
-			return new LongValue(((LongWritable)hadoopType).get());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.Text) {
-			return new StringValue(((Text)hadoopType).toString());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.IntWritable) {
-			return new IntValue(((IntWritable)hadoopType).get());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.FloatWritable) {
-			return new FloatValue(((FloatWritable)hadoopType).get());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.DoubleWritable) {
-			return new DoubleValue(((DoubleWritable)hadoopType).get());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.BooleanWritable) {
-			return new BooleanValue(((BooleanWritable)hadoopType).get());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.ByteWritable) {
-			return new ByteValue(((ByteWritable)hadoopType).get());
-		}
-		if (hadoopType instanceof NullWritable) {
-			return NullValue.getInstance();
-		}
-		
-		throw new RuntimeException("Unable to convert Hadoop type ("+hadoopType.getClass().getCanonicalName()+") to Stratosphere.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultStratosphereTypeConverter.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultStratosphereTypeConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultStratosphereTypeConverter.java
deleted file mode 100644
index 9dbe318..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultStratosphereTypeConverter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility.datatypes;
-
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import eu.stratosphere.types.BooleanValue;
-import eu.stratosphere.types.ByteValue;
-import eu.stratosphere.types.DoubleValue;
-import eu.stratosphere.types.FloatValue;
-import eu.stratosphere.types.IntValue;
-import eu.stratosphere.types.LongValue;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.types.StringValue;
-
-/**
- * Converter Stratosphere Record into the default hadoop writables.
- *
- */
-public class DefaultStratosphereTypeConverter<K,V> implements StratosphereTypeConverter<K,V> {
-	private static final long serialVersionUID = 1L;
-
-	private Class<K> keyClass;
-	private Class<V> valueClass;
-
-	public DefaultStratosphereTypeConverter(Class<K> keyClass, Class<V> valueClass) {
-		this.keyClass= keyClass;
-		this.valueClass = valueClass;
-	}
-	@Override
-	public K convertKey(Record stratosphereRecord) {
-		if(stratosphereRecord.getNumFields() > 0) {
-			return convert(stratosphereRecord, 0, this.keyClass);
-		} else {
-			return null;
-		}
-	}
-
-	@Override
-	public V convertValue(Record stratosphereRecord) {
-		if(stratosphereRecord.getNumFields() > 1) {
-			return convert(stratosphereRecord, 1, this.valueClass);
-		} else {
-			return null;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private<T> T convert(Record stratosphereType, int pos, Class<T> hadoopType) {
-		if(hadoopType == LongWritable.class ) {
-			return (T) new LongWritable((stratosphereType.getField(pos, LongValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.Text.class) {
-			return (T) new Text((stratosphereType.getField(pos, StringValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.IntWritable.class) {
-			return (T) new IntWritable((stratosphereType.getField(pos, IntValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.FloatWritable.class) {
-			return (T) new FloatWritable((stratosphereType.getField(pos, FloatValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.DoubleWritable.class) {
-			return (T) new DoubleWritable((stratosphereType.getField(pos, DoubleValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.BooleanWritable.class) {
-			return (T) new BooleanWritable((stratosphereType.getField(pos, BooleanValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.ByteWritable.class) {
-			return (T) new ByteWritable((stratosphereType.getField(pos, ByteValue.class)).getValue());
-		}
-
-		throw new RuntimeException("Unable to convert Stratosphere type ("+stratosphereType.getClass().getCanonicalName()+") to Hadoop.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/HadoopTypeConverter.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/HadoopTypeConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/HadoopTypeConverter.java
deleted file mode 100644
index d279417..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/HadoopTypeConverter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility.datatypes;
-
-import java.io.Serializable;
-
-import eu.stratosphere.types.Record;
-
-
-/**
- * An interface describing a class that is able to 
- * convert Hadoop types into Stratosphere's Record model.
- * 
- * The converter must be Serializable.
- * 
- * Stratosphere provides a DefaultHadoopTypeConverter. Custom implementations should
- * chain the type converters.
- */
-public interface HadoopTypeConverter<K, V> extends Serializable {
-	
-	/**
-	 * Convert a Hadoop type to a Stratosphere type.
-	 */
-	public void convert(Record stratosphereRecord, K hadoopKey, V hadoopValue);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/StratosphereTypeConverter.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/StratosphereTypeConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/StratosphereTypeConverter.java
deleted file mode 100644
index 1800248..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/StratosphereTypeConverter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility.datatypes;
-
-import java.io.Serializable;
-
-import eu.stratosphere.types.Record;
-
-/**
- * An interface describing a class that is able to
- * convert Stratosphere's Record into Hadoop types model.
- *
- * The converter must be Serializable.
- *
- * Stratosphere provides a DefaultStratosphereTypeConverter. Custom implementations should
- * chain the type converters.
- */
-public interface StratosphereTypeConverter<K,V> extends Serializable {
-
-	/**
-	 * Convert a Stratosphere type to a Hadoop type.
-	 */
-	public K convertKey(Record stratosphereRecord);
-
-	public V convertValue(Record stratosphereRecord);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableComparableWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableComparableWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableComparableWrapper.java
deleted file mode 100644
index 90787a4..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableComparableWrapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility.datatypes;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import eu.stratosphere.types.Key;
-
-public class WritableComparableWrapper<T extends WritableComparable<T>> extends WritableWrapper<T> implements Key<WritableComparableWrapper<T>> {
-	private static final long serialVersionUID = 1L;
-	
-	public WritableComparableWrapper() {
-		super();
-	}
-	
-	public WritableComparableWrapper(T toWrap) {
-		super(toWrap);
-	}
-
-	@Override
-	public int compareTo(WritableComparableWrapper<T> o) {
-		return super.value().compareTo(o.value());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapper.java
deleted file mode 100644
index f84d9c0..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility.datatypes;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-import eu.stratosphere.types.Value;
-import eu.stratosphere.util.InstantiationUtil;
-
-public class WritableWrapper<T extends Writable> implements Value {
-	private static final long serialVersionUID = 2L;
-	
-	private T wrapped;
-	private String wrappedType;
-	private ClassLoader cl;
-	
-	public WritableWrapper() {
-	}
-	
-	public WritableWrapper(T toWrap) {
-		wrapped = toWrap;
-		wrappedType = toWrap.getClass().getCanonicalName();
-	}
-
-	public T value() {
-		return wrapped;
-	}
-	
-	@Override
-	public void write(DataOutput out) throws IOException {
-		out.writeUTF(wrappedType);
-		wrapped.write(out);
-	}
-
-	@Override
-	public void read(DataInput in) throws IOException {
-		if(cl == null) {
-			cl = Thread.currentThread().getContextClassLoader();
-		}
-		wrappedType = in.readUTF();
-		try {
-			@SuppressWarnings("unchecked")
-			Class<T> wrClass = (Class<T>) Class.forName(wrappedType, true, cl).asSubclass(Writable.class);
-			wrapped = InstantiationUtil.instantiate(wrClass, Writable.class);
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Error creating the WritableWrapper", e);
-		}
-		wrapped.readFields(in);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapperConverter.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapperConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapperConverter.java
deleted file mode 100644
index 0831a8d..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapperConverter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility.datatypes;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import eu.stratosphere.types.Record;
-import eu.stratosphere.types.Value;
-
-@SuppressWarnings("rawtypes")
-public class WritableWrapperConverter<K extends WritableComparable, V extends Writable> implements HadoopTypeConverter<K,V> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void convert(Record stratosphereRecord, K hadoopKey, V hadoopValue) {
-		stratosphereRecord.setField(0, convertKey(hadoopKey));
-		stratosphereRecord.setField(1, convertValue(hadoopValue));
-	}
-	
-	@SuppressWarnings("unchecked")
-	private final Value convertKey(K in) {
-		return new WritableComparableWrapper(in);
-	}
-	
-	private final Value convertValue(V in) {
-		return new WritableWrapper<V>(in);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCount.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCount.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCount.java
deleted file mode 100644
index 204a80b..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCount.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility.example;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import eu.stratosphere.api.common.Plan;
-import eu.stratosphere.api.common.Program;
-import eu.stratosphere.api.common.ProgramDescription;
-import eu.stratosphere.api.java.record.operators.FileDataSink;
-import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import eu.stratosphere.api.java.record.functions.MapFunction;
-import eu.stratosphere.api.java.record.functions.ReduceFunction;
-import eu.stratosphere.api.java.record.io.CsvOutputFormat;
-import eu.stratosphere.api.java.record.operators.MapOperator;
-import eu.stratosphere.api.java.record.operators.ReduceOperator;
-import eu.stratosphere.api.java.record.operators.ReduceOperator.Combinable;
-import eu.stratosphere.client.LocalExecutor;
-import eu.stratosphere.hadoopcompatibility.HadoopDataSource;
-import eu.stratosphere.hadoopcompatibility.datatypes.WritableWrapperConverter;
-import eu.stratosphere.types.IntValue;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.types.StringValue;
-import eu.stratosphere.util.Collector;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- */
-public class WordCount implements Program, ProgramDescription {
-	
-	private static final long serialVersionUID = 1L;
-
-
-	/**
-	 * Converts a Record containing one string in to multiple string/integer pairs.
-	 * The string is tokenized by whitespaces. For each token a new record is emitted,
-	 * where the token is the first field and an Integer(1) is the second field.
-	 */
-	public static class TokenizeLine extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void map(Record record, Collector<Record> collector) {
-			// get the first field (as type StringValue) from the record
-			String line = record.getField(1, StringValue.class).getValue();
-			// normalize the line
-			line = line.replaceAll("\\W+", " ").toLowerCase();
-			
-			// tokenize the line
-			StringTokenizer tokenizer = new StringTokenizer(line);
-			while (tokenizer.hasMoreTokens()) {
-				String word = tokenizer.nextToken();
-				
-				// we emit a (word, 1) pair 
-				collector.collect(new Record(new StringValue(word), new IntValue(1)));
-			}
-		}
-	}
-
-	/**
-	 * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
-	 * in the record. The other fields are not modified.
-	 */
-	@Combinable
-	@ConstantFields(0)
-	public static class CountWords extends ReduceFunction implements Serializable {
-		
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-			Record element = null;
-			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
-				int cnt = element.getField(1, IntValue.class).getValue();
-				sum += cnt;
-			}
-
-			element.setField(1, new IntValue(sum));
-			out.collect(element);
-		}
-		
-		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
-			// the logic is the same as in the reduce function, so simply call the reduce method
-			reduce(records, out);
-		}
-	}
-
-
-	@SuppressWarnings({ "rawtypes", "unchecked", "unused" })
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String dataInput = (args.length > 1 ? args[1] : "");
-		String output    = (args.length > 2 ? args[2] : "");
-		
-		
-		HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines");
-		TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
-		
-		// Example with Wrapper Converter
-		HadoopDataSource<LongWritable,Text> sourceHadoopType = new HadoopDataSource<LongWritable, Text>(
-				new TextInputFormat(), new JobConf(), "Input Lines", new WritableWrapperConverter<LongWritable, Text>());
-		TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
-		
-		MapOperator mapper = MapOperator.builder(new TokenizeLine())
-			.input(source)
-			.name("Tokenize Lines")
-			.build();
-		ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
-			.input(mapper)
-			.name("Count Words")
-			.build();
-		FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, reducer, "Word Counts");
-		CsvOutputFormat.configureRecordFormat(out)
-			.recordDelimiter('\n')
-			.fieldDelimiter(' ')
-			.field(StringValue.class, 0)
-			.field(IntValue.class, 1);
-		
-		Plan plan = new Plan(out, "WordCount Example");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [numSubStasks] [input] [output]";
-	}
-
-	
-	public static void main(String[] args) throws Exception {
-		WordCount wc = new WordCount();
-		
-		if (args.length < 3) {
-			System.err.println(wc.getDescription());
-			System.exit(1);
-		}
-		
-		Plan plan = wc.getPlan(args);
-		
-		// This will execute the word-count embedded in a local context. replace this line by the commented
-		// succeeding line to send the job to a local installation or to a cluster for execution
-		LocalExecutor.execute(plan);
-//		PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
-//		ex.executePlan(plan);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a65b7591/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCountWithHadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCountWithHadoopOutputFormat.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCountWithHadoopOutputFormat.java
deleted file mode 100644
index 08a989d..0000000
--- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCountWithHadoopOutputFormat.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.hadoopcompatibility.example;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-import eu.stratosphere.api.common.Plan;
-import eu.stratosphere.api.common.Program;
-import eu.stratosphere.api.common.ProgramDescription;
-import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import eu.stratosphere.api.java.record.functions.MapFunction;
-import eu.stratosphere.api.java.record.functions.ReduceFunction;
-import eu.stratosphere.api.java.record.operators.MapOperator;
-import eu.stratosphere.api.java.record.operators.ReduceOperator;
-import eu.stratosphere.api.java.record.operators.ReduceOperator.Combinable;
-import eu.stratosphere.client.LocalExecutor;
-import eu.stratosphere.hadoopcompatibility.HadoopDataSink;
-import eu.stratosphere.hadoopcompatibility.HadoopDataSource;
-import eu.stratosphere.types.IntValue;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.types.StringValue;
-import eu.stratosphere.util.Collector;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- */
-@SuppressWarnings("serial")
-public class WordCountWithHadoopOutputFormat implements Program, ProgramDescription {
-
-	/**
-	 * Converts a Record containing one string in to multiple string/integer pairs.
-	 * The string is tokenized by whitespaces. For each token a new record is emitted,
-	 * where the token is the first field and an Integer(1) is the second field.
-	 */
-	public static class TokenizeLine extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void map(Record record, Collector<Record> collector) {
-			// get the first field (as type StringValue) from the record
-			String line = record.getField(1, StringValue.class).getValue();
-			// normalize the line
-			line = line.replaceAll("\\W+", " ").toLowerCase();
-
-			// tokenize the line
-			StringTokenizer tokenizer = new StringTokenizer(line);
-			while (tokenizer.hasMoreTokens()) {
-				String word = tokenizer.nextToken();
-
-				// we emit a (word, 1) pair 
-				collector.collect(new Record(new StringValue(word), new IntValue(1)));
-			}
-		}
-	}
-
-	/**
-	 * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
-	 * in the record. The other fields are not modified.
-	 */
-	@Combinable
-	@ConstantFields(0)
-	public static class CountWords extends ReduceFunction implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-			Record element = null;
-			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
-				int cnt = element.getField(1, IntValue.class).getValue();
-				sum += cnt;
-			}
-
-			element.setField(1, new IntValue(sum));
-			out.collect(element);
-		}
-
-		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
-			// the logic is the same as in the reduce function, so simply call the reduce method
-			reduce(records, out);
-		}
-	}
-
-
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String dataInput = (args.length > 1 ? args[1] : "");
-		String output    = (args.length > 2 ? args[2] : "");
-
-		HadoopDataSource<LongWritable, Text> source = new HadoopDataSource<LongWritable, Text>(
-				new TextInputFormat(), new JobConf(), "Input Lines");
-		TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
-
-
-		MapOperator mapper = MapOperator.builder(new TokenizeLine())
-				.input(source)
-				.name("Tokenize Lines")
-				.build();
-		ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
-				.input(mapper)
-				.name("Count Words")
-				.build();
-		HadoopDataSink<Text, IntWritable> out = new HadoopDataSink<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(),new JobConf(), "Hadoop TextOutputFormat", reducer, Text.class, IntWritable.class);
-		TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
-
-		Plan plan = new Plan(out, "Hadoop OutputFormat Example");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [numSubStasks] [input] [output]";
-	}
-
-
-	public static void main(String[] args) throws Exception {
-		WordCountWithHadoopOutputFormat wc = new WordCountWithHadoopOutputFormat();
-
-		if (args.length < 3) {
-			System.err.println(wc.getDescription());
-			System.exit(1);
-		}
-
-		Plan plan = wc.getPlan(args);
-
-		// This will execute the word-count embedded in a local context. replace this line by the commented
-		// succeeding line to send the job to a local installation or to a cluster for execution
-		LocalExecutor.execute(plan);
-//		PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
-//		ex.executePlan(plan);
-	}
-}


Mime
View raw message