flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [68/92] [abbrv] prefix all projects in addons and quickstarts with flink-
Date Tue, 22 Jul 2014 10:41:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..da46690
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapred.JobConf;
+
+
+public class HadoopInputSplit implements InputSplit {
+
+	private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
+	private JobConf jobConf;
+	private int splitNumber;
+	private String hadoopInputSplitTypeName;
+	
+	
+	public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
+		return hadoopInputSplit;
+	}
+	
+	
+	public HadoopInputSplit() {
+		super();
+	}
+	
+	
+	public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
+		this.hadoopInputSplit = hInputSplit;
+		this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
+		this.jobConf = jobconf;
+	}
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(splitNumber);
+		out.writeUTF(hadoopInputSplitTypeName);
+		hadoopInputSplit.write(out);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.splitNumber=in.readInt();
+		this.hadoopInputSplitTypeName = in.readUTF();
+		if(hadoopInputSplit == null) {
+			try {
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
+						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
+				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
+			}
+			catch (Exception e) {
+				throw new RuntimeException("Unable to create InputSplit", e);
+			}
+		}
+		this.hadoopInputSplit.readFields(in);
+	}
+
+	@Override
+	public int getSplitNumber() {
+		return this.splitNumber;
+	}
+
+	public void setSplitNumber(int splitNumber) {
+		this.splitNumber = splitNumber;
+	}
+	
+	public void setHadoopInputSplit(
+			org.apache.hadoop.mapred.InputSplit hadoopInputSplit) {
+		this.hadoopInputSplit = hadoopInputSplit;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
new file mode 100644
index 0000000..cf12cae
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapreduce;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.types.TypeInformation;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class);
+	
+	private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
+	private Class<K> keyClass;
+	private Class<V> valueClass;
+	private org.apache.hadoop.conf.Configuration configuration;
+	
+	private transient RecordReader<K, V> recordReader;
+	private boolean fetched = false;
+	private boolean hasNext;
+	
+	public HadoopInputFormat() {
+		super();
+	}
+	
+	public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
+		super();
+		this.mapreduceInputFormat = mapreduceInputFormat;
+		this.keyClass = key;
+		this.valueClass = value;
+		this.configuration = job.getConfiguration();
+		HadoopUtils.mergeHadoopConf(configuration);
+	}
+	
+	public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
+		this.configuration = configuration;
+	}
+	
+	public org.apache.hadoop.mapreduce.InputFormat<K,V> getHadoopInputFormat() {
+		return this.mapreduceInputFormat;
+	}
+	
+	public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat) {
+		this.mapreduceInputFormat = mapreduceInputFormat;
+	}
+	
+	public org.apache.hadoop.conf.Configuration getConfiguration() {
+		return this.configuration;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  InputFormat
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+	
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+		// only gather base statistics for FileInputFormats
+		if(!(mapreduceInputFormat instanceof FileInputFormat)) {
+			return null;
+		}
+		
+		JobContext jobContext = null;
+		try {
+			jobContext = HadoopUtils.instantiateJobContext(configuration, null);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		
+		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+				(FileBaseStatistics) cachedStats : null;
+				
+				try {
+					final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
+					return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
+				} catch (IOException ioex) {
+					if (LOG.isWarnEnabled()) {
+						LOG.warn("Could not determine statistics due to an io error: "
+								+ ioex.getMessage());
+					}
+				} catch (Throwable t) {
+					if (LOG.isErrorEnabled()) {
+						LOG.error("Unexpected problem while getting the file statistics: "
+								+ t.getMessage(), t);
+					}
+				}
+				
+				// no statistics available
+				return null;
+	}
+	
+	@Override
+	public HadoopInputSplit[] createInputSplits(int minNumSplits)
+			throws IOException {
+		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
+		
+		JobContext jobContext = null;
+		try {
+			jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		
+		List<org.apache.hadoop.mapreduce.InputSplit> splits;
+		try {
+			splits = this.mapreduceInputFormat.getSplits(jobContext);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not get Splits.", e);
+		}
+		HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
+		
+		for(int i = 0; i < hadoopInputSplits.length; i++){
+			hadoopInputSplits[i] = new HadoopInputSplit(splits.get(i), jobContext);
+		}
+		return hadoopInputSplits;
+	}
+	
+	@Override
+	public Class<? extends HadoopInputSplit> getInputSplitType() {
+		return HadoopInputSplit.class;
+	}
+	
+	@Override
+	public void open(HadoopInputSplit split) throws IOException {
+		TaskAttemptContext context = null;
+		try {
+			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+		} catch(Exception e) {
+			throw new RuntimeException(e);
+		}
+		
+		try {
+			this.recordReader = this.mapreduceInputFormat
+					.createRecordReader(split.getHadoopInputSplit(), context);
+			this.recordReader.initialize(split.getHadoopInputSplit(), context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not create RecordReader.", e);
+		} finally {
+			this.fetched = false;
+		}
+	}
+	
+	@Override
+	public boolean reachedEnd() throws IOException {
+		if(!this.fetched) {
+			fetchNext();
+		}
+		return !this.hasNext;
+	}
+	
+	private void fetchNext() throws IOException {
+		try {
+			this.hasNext = this.recordReader.nextKeyValue();
+		} catch (InterruptedException e) {
+			throw new IOException("Could not fetch next KeyValue pair.", e);
+		} finally {
+			this.fetched = true;
+		}
+	}
+	
+	@Override
+	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
+		if(!this.fetched) {
+			fetchNext();
+		}
+		if(!this.hasNext) {
+			return null;
+		}
+		try {
+			record.f0 = this.recordReader.getCurrentKey();
+			record.f1 = this.recordReader.getCurrentValue();
+		} catch (InterruptedException e) {
+			throw new IOException("Could not get KeyValue pair.", e);
+		}
+		this.fetched = false;
+		
+		return record;
+	}
+	
+	@Override
+	public void close() throws IOException {
+		this.recordReader.close();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Helper methods
+	// --------------------------------------------------------------------------------------------
+	
+	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
+			ArrayList<FileStatus> files) throws IOException {
+		
+		long latestModTime = 0L;
+		
+		// get the file info and check whether the cached statistics are still valid.
+		for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
+			
+			final Path filePath = new Path(hadoopPath.toUri());
+			final FileSystem fs = FileSystem.get(filePath.toUri());
+			
+			final FileStatus file = fs.getFileStatus(filePath);
+			latestModTime = Math.max(latestModTime, file.getModificationTime());
+			
+			// enumerate all files and check their modification time stamp.
+			if (file.isDir()) {
+				FileStatus[] fss = fs.listStatus(filePath);
+				files.ensureCapacity(files.size() + fss.length);
+				
+				for (FileStatus s : fss) {
+					if (!s.isDir()) {
+						files.add(s);
+						latestModTime = Math.max(s.getModificationTime(), latestModTime);
+					}
+				}
+			} else {
+				files.add(file);
+			}
+		}
+		
+		// check whether the cached statistics are still valid, if we have any
+		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
+			return cachedStats;
+		}
+		
+		// calculate the whole length
+		long len = 0;
+		for (FileStatus s : files) {
+			len += s.getLen();
+		}
+		
+		// sanity check
+		if (len <= 0) {
+			len = BaseStatistics.SIZE_UNKNOWN;
+		}
+		
+		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+	
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(this.mapreduceInputFormat.getClass().getName());
+		out.writeUTF(this.keyClass.getName());
+		out.writeUTF(this.valueClass.getName());
+		this.configuration.write(out);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopInputFormatClassName = in.readUTF();
+		String keyClassName = in.readUTF();
+		String valueClassName = in.readUTF();
+		
+		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+		configuration.readFields(in);
+		
+		if(this.configuration == null) {
+			this.configuration = configuration;
+		}
+		
+		try {
+			this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
+		}
+		try {
+			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find key class.", e);
+		}
+		try {
+			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find value class.", e);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  ResultTypeQueryable
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public TypeInformation<Tuple2<K,V>> getProducedType() {
+		return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
new file mode 100644
index 0000000..9eabc03
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapreduce;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+
+public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private org.apache.hadoop.conf.Configuration configuration;
+	private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
+	private transient RecordWriter<K,V> recordWriter;
+	private transient FileOutputCommitter fileOutputCommitter;
+	private transient TaskAttemptContext context;
+	
+	public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) {
+		super();
+		this.mapreduceOutputFormat = mapreduceOutputFormat;
+		this.configuration = job.getConfiguration();
+		HadoopUtils.mergeHadoopConf(configuration);
+	}
+	
+	public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
+		this.configuration = configuration;
+	}
+	
+	public org.apache.hadoop.conf.Configuration getConfiguration() {
+		return this.configuration;
+	}
+	
+	public org.apache.hadoop.mapreduce.OutputFormat<K,V> getHadoopOutputFormat() {
+		return this.mapreduceOutputFormat;
+	}
+	
+	public void setHadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat) {
+		this.mapreduceOutputFormat = mapreduceOutputFormat;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  OutputFormat
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+	
+	/**
+	 * create the temporary output file for hadoop RecordWriter.
+	 * @param taskNumber The number of the parallel instance.
+	 * @param numTasks The number of parallel tasks.
+	 * @throws IOException
+	 */
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		if (Integer.toString(taskNumber + 1).length() > 6) {
+			throw new IOException("Task id too large.");
+		}
+		
+		// for hadoop 2.2
+		this.configuration.set("mapreduce.output.basename", "tmp");
+		
+		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
+				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") 
+				+ Integer.toString(taskNumber + 1) 
+				+ "_0");
+		
+		try {
+			this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		this.configuration.set("mapred.task.id", taskAttemptID.toString());
+		// for hadoop 2.2
+		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+		
+		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
+		
+		try {
+			this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		
+		// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
+		this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString());
+		
+		try {
+			this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not create RecordWriter.", e);
+		}
+	}
+	
+	
+	@Override
+	public void writeRecord(Tuple2<K, V> record) throws IOException {
+		try {
+			this.recordWriter.write(record.f0, record.f1);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not write Record.", e);
+		}
+	}
+	
+	/**
+	 * commit the task by moving the output file out from the temporary directory.
+	 * @throws IOException
+	 */
+	@Override
+	public void close() throws IOException {
+		try {
+			this.recordWriter.close(this.context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not close RecordReader.", e);
+		}
+		
+		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
+			this.fileOutputCommitter.commitTask(this.context);
+		}
+		this.fileOutputCommitter.commitJob(this.context);
+		
+		// rename tmp-* files to final name
+		FileSystem fs = FileSystem.get(this.configuration);
+		
+		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
+
+		final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)");
+		
+		// isDirectory does not work in hadoop 1
+		if(fs.getFileStatus(outputPath).isDir()) {
+			FileStatus[] files = fs.listStatus(outputPath);
+			
+			for(FileStatus f : files) {
+				Matcher m = p.matcher(f.getPath().getName());
+				if(m.matches()) {
+					int part = Integer.valueOf(m.group(2));
+					fs.rename(f.getPath(), new Path(outputPath.toString()+"/"+part));
+				}
+			}
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+	
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
+		this.configuration.write(out);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopOutputFormatClassName = in.readUTF();
+		
+		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+		configuration.readFields(in);
+		
+		if(this.configuration == null) {
+			this.configuration = configuration;
+		}
+		
+		try {
+			this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
new file mode 100644
index 0000000..36ea378
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapreduce.example;
+
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ * 
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to 
+ * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
+ */
+@SuppressWarnings("serial")
+public class WordCount {
+	
+	public static void main(String[] args) throws Exception {
+		if (args.length < 2) {
+			System.err.println("Usage: WordCount <input path> <result path>");
+			return;
+		}
+		
+		final String inputPath = args[0];
+		final String outputPath = args[1];
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(1);
+		
+		// Set up the Hadoop Input Format
+		Job job = Job.getInstance();
+		HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
+		TextInputFormat.addInputPath(job, new Path(inputPath));
+		
+		// Create a Flink job with it
+		DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
+		
+		// Tokenize the line and convert from Writable "Text" to String for better handling
+		DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
+		
+		// Sum up the words
+		DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
+		
+		// Convert String back to Writable "Text" for use with Hadoop Output Format
+		DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
+		
+		// Set up Hadoop Output Format
+		HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
+		hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+		hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
+		// is being executed with both types (hadoop1 and hadoop2 profile)
+		TextOutputFormat.setOutputPath(job, new Path(outputPath));
+		
+		// Output & Execute
+		hadoopResult.output(hadoopOutputFormat);
+		env.execute("Word Count");
+	}
+	
+	/**
+	 * Splits a line into words and converts Hadoop Writables into normal Java data types.
+	 */
+	public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+		
+		@Override
+		public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String line = value.f1.toString();
+			String[] tokens = line.toLowerCase().split("\\W+");
+			
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+	
+	/**
+	 * Converts Java data types to Hadoop Writables.
+	 */
+	public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+		
+		@Override
+		public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
+			return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
+		}
+		
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
new file mode 100644
index 0000000..eadbd0b
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapreduce.utils;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class HadoopUtils {
+	
+	/**
+	 * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
+	 */
+	public static void mergeHadoopConf(Configuration configuration) {
+		Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
+		
+		for (Map.Entry<String, String> e : hadoopConf) {
+			configuration.set(e.getKey(), e.getValue());
+		}
+	}
+	
+	public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception {
+		try {
+			Class<?> clazz;
+			// for Hadoop 1.xx
+			if(JobContext.class.isInterface()) {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
+			}
+			// for Hadoop 2.xx
+			else {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader());
+			}
+			Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class);
+			JobContext context = (JobContext) constructor.newInstance(configuration, jobId);
+			
+			return context;
+		} catch(Exception e) {
+			throw new Exception("Could not create instance of JobContext.");
+		}
+	}
+	
+	public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID taskAttemptID) throws Exception {
+		try {
+			Class<?> clazz;
+			// for Hadoop 1.xx
+			if(JobContext.class.isInterface()) {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+			}
+			// for Hadoop 2.xx
+			else {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
+			}
+			Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class);
+			TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID);
+			
+			return context;
+		} catch(Exception e) {
+			throw new Exception("Could not create instance of TaskAttemptContext.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..b53fc9f
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapreduce.wrapper;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapreduce.JobContext;
+
+
+public class HadoopInputSplit implements InputSplit {
+	
+	public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit;
+	public transient JobContext jobContext;
+	
+	private int splitNumber;	
+	
+	public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
+		return mapreduceInputSplit;
+	}
+	
+	
+	public HadoopInputSplit() {
+		super();
+	}
+	
+	
+	public HadoopInputSplit(org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
+		if(!(mapreduceInputSplit instanceof Writable)) {
+			throw new IllegalArgumentException("InputSplit must implement Writable interface.");
+		}
+		this.mapreduceInputSplit = mapreduceInputSplit;
+		this.jobContext = jobContext;
+	}
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(this.splitNumber);
+		out.writeUTF(this.mapreduceInputSplit.getClass().getName());
+		Writable w = (Writable) this.mapreduceInputSplit;
+		w.write(out);
+	}
+	
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.splitNumber=in.readInt();
+		String className = in.readUTF();
+		
+		if(this.mapreduceInputSplit == null) {
+			try {
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
+						Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
+				this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
+			} catch (Exception e) {
+				throw new RuntimeException("Unable to create InputSplit", e);
+			}
+		}
+		((Writable)this.mapreduceInputSplit).readFields(in);
+	}
+	
+	@Override
+	public int getSplitNumber() {
+		return this.splitNumber;
+	}
+	
+	public void setSplitNumber(int splitNumber) {
+		this.splitNumber = splitNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
new file mode 100644
index 0000000..d13d0f2
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.flink.hadoopcompatibility.mapred.example.WordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopInputOutputITCase extends JavaProgramTestBase {
+	
+	protected String textPath;
+	protected String resultPath;
+	
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+	}
+	
+	@Override
+	protected void testProgram() throws Exception {
+		WordCount.main(new String[] { textPath, resultPath });
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
new file mode 100644
index 0000000..547ea60
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.hadoopcompatibility.mapred.record;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.hadoopcompatibility.mapred.record.example.WordCountWithOutputFormat;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.RecordAPITestBase;
+
+/**
+ * test the hadoop inputformat and outputformat
+ */
+public class HadoopRecordInputOutputITCase extends RecordAPITestBase {
+	protected String textPath;
+	protected String resultPath;
+	protected String counts;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+		counts = WordCountData.COUNTS.replaceAll(" ", "\t");
+	}
+
+	@Override
+	protected Plan getTestJob() {
+		//WordCountWithHadoopOutputFormat takes hadoop TextInputFormat as input and output file in hadoop TextOutputFormat
+		WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
+		return wc.getPlan("1", textPath, resultPath);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		// Test results, append /1 to resultPath due to the generated _temproray file.
+		compareResultsByLinesInMemory(counts, resultPath + "/1");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
new file mode 100644
index 0000000..10dab3f
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapreduce;
+
+import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopInputOutputITCase extends JavaProgramTestBase {
+	
+	protected String textPath;
+	protected String resultPath;
+	
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+	}
+	
+	@Override
+	protected void testProgram() throws Exception {
+		WordCount.main(new String[] { textPath, resultPath });
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml
new file mode 100644
index 0000000..c973212
--- /dev/null
+++ b/flink-addons/flink-hbase/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<artifactId>flink-addons</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>0.6-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+	
+	<repositories>
+		<repository>
+			<id>cloudera-releases</id>
+			<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+			<releases>
+				<enabled>true</enabled>
+			</releases>
+			<snapshots>
+				<enabled>false</enabled>
+			</snapshots>
+		</repository>
+	</repositories>
+
+	<properties>
+ 		<hbase.version>0.96.0-hadoop2</hbase.version>
+	</properties>
+
+	<artifactId>flink-hbase</artifactId>
+	<name>flink-hbase</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase</artifactId>
+			<version>0.94.2-cdh4.2.1</version>
+			<exclusions>
+				<!-- jruby is used for the hbase shell. -->
+				<exclusion>
+					<groupId>org.jruby</groupId>
+					<artifactId>jruby-complete</artifactId>
+					</exclusion>
+				</exclusions> 
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-client</artifactId>
+			<version>${hadoop.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>asm</groupId>
+					<artifactId>asm</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+	</dependencies>
+		<!-- <dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-server</artifactId>
+			<version>${hbase.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-client</artifactId>
+			<version>${hbase.version}</version>
+		</dependency>
+		 -->
+
+	<!-- hadoop-client is available for yarn and non-yarn, so there is no need 
+		to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009 
+		for description of hadoop-clients -->
+
+	<reporting>
+		<plugins>
+		</plugins>
+	</reporting>
+
+	<build>
+		<plugins>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
new file mode 100644
index 0000000..9029030
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Record;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public abstract class GenericTableOutputFormat implements OutputFormat<Record> {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final String JT_ID_KEY = "pact.hbase.jtkey";
+
+	public static final String JOB_ID_KEY = "pact.job.id";
+
+	private RecordWriter<ImmutableBytesWritable, KeyValue> writer;
+
+	private Configuration config;
+
+	private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+	private TaskAttemptContext context;
+
+	private String jtID;
+
+	private int jobId;
+
+
+	@Override
+	public void configure(Configuration parameters) {
+		this.config = parameters;
+
+		// get the ID parameters
+		this.jtID = parameters.getString(JT_ID_KEY, null);
+		if (this.jtID == null) {
+			throw new RuntimeException("Missing JT_ID entry in hbase config.");
+		}
+		this.jobId = parameters.getInteger(JOB_ID_KEY, -1);
+		if (this.jobId < 0) {
+			throw new RuntimeException("Missing or invalid job id in input config.");
+		}
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		this.hadoopConfig = getHadoopConfig(this.config);
+		
+		/**
+		 * PLASE NOTE:
+		 * If you are a Eclipse+Maven Integration user and you have two (or more) warnings here, please
+		 * close the pact-hbase project OR set the maven profile to hadoop_yarn
+		 * 
+		 * pact-hbase requires hadoop_yarn, but Eclipse is not able to parse maven profiles properly. Therefore,
+		 * it imports the pact-hbase project even if it is not included in the standard profile (hadoop_v1)
+		 */
+		final TaskAttemptID attemptId = new TaskAttemptID(this.jtID, this.jobId, TaskType.MAP, taskNumber - 1, 0);
+
+		this.context = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(this.hadoopConfig, attemptId);
+		final HFileOutputFormat outFormat = new HFileOutputFormat();
+		try {
+			this.writer = outFormat.getRecordWriter(this.context);
+		} catch (InterruptedException iex) {
+			throw new IOException("Opening the writer was interrupted.", iex);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		final RecordWriter<ImmutableBytesWritable, KeyValue> writer = this.writer;
+		this.writer = null;
+		if (writer != null) {
+			try {
+				writer.close(this.context);
+			} catch (InterruptedException iex) {
+				throw new IOException("Closing was interrupted.", iex);
+			}
+		}
+	}
+
+	public void collectKeyValue(KeyValue kv) throws IOException {
+		try {
+			this.writer.write(null, kv);
+		} catch (InterruptedException iex) {
+			throw new IOException("Write request was interrupted.", iex);
+		}
+	}
+
+	public abstract org.apache.hadoop.conf.Configuration getHadoopConfig(Configuration config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
new file mode 100644
index 0000000..ac41927
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.record.operators.GenericDataSink;
+
+/**
+ * A sink for writing to HBase
+ */
+public class HBaseDataSink extends GenericDataSink {
+	
+	private static final int IDENTIFYIER_LEN = 16;
+	
+	public HBaseDataSink(GenericTableOutputFormat f, Operator input, String name) {
+		super(f, input, name);
+		
+		// generate a random unique identifier string
+		final Random rnd = new Random();
+		final StringBuilder bld = new StringBuilder();
+		for (int i = 0; i < IDENTIFYIER_LEN; i++) {
+			bld.append((char) (rnd.nextInt(26) + 'a'));
+		}
+		
+		setParameter(GenericTableOutputFormat.JT_ID_KEY, bld.toString());
+		setParameter(GenericTableOutputFormat.JOB_ID_KEY, rnd.nextInt());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
new file mode 100644
index 0000000..9ff5af7
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.addons.hbase.common.HBaseKey;
+import org.apache.flink.addons.hbase.common.HBaseResult;
+import org.apache.flink.addons.hbase.common.HBaseUtil;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables.
+ */
+public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
+
+	/** A handle on an HBase table */
+	private HTable table;
+
+	/** The scanner that performs the actual access on the table. HBase object */
+	private Scan scan;
+
+	/** Hbase' iterator wrapper */
+	private TableRecordReader tableRecordReader;
+
+	/** helper variable to decide whether the input is exhausted or not */
+	private boolean endReached = false;
+
+	/** Job parameter that specifies the input table. */
+	public static final String INPUT_TABLE = "hbase.inputtable";
+
+	/** Location of the hbase-site.xml. If set, the HBaseAdmin will build inside */
+	public static final String CONFIG_LOCATION = "hbase.config.location";
+
+	/**
+	 * Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
+	 * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
+	 */
+	public static final String SCAN = "hbase.scan";
+
+	/** Column Family to Scan */
+	public static final String SCAN_COLUMN_FAMILY = "hbase.scan.column.family";
+
+	/** Space delimited list of columns to scan. */
+	public static final String SCAN_COLUMNS = "hbase.scan.columns";
+
+	/** The timestamp used to filter columns with a specific timestamp. */
+	public static final String SCAN_TIMESTAMP = "hbase.scan.timestamp";
+
+	/** The starting timestamp used to filter columns with a specific range of versions. */
+	public static final String SCAN_TIMERANGE_START = "hbase.scan.timerange.start";
+
+	/** The ending timestamp used to filter columns with a specific range of versions. */
+	public static final String SCAN_TIMERANGE_END = "hbase.scan.timerange.end";
+
+	/** The maximum number of version to return. */
+	public static final String SCAN_MAXVERSIONS = "hbase.scan.maxversions";
+
+	/** Set to false to disable server-side caching of blocks for this scan. */
+	public static final String SCAN_CACHEBLOCKS = "hbase.scan.cacheblocks";
+
+	/** The number of rows for caching that will be passed to scanners. */
+	public static final String SCAN_CACHEDROWS = "hbase.scan.cachedrows";
+
+	/** mutable objects that are used to avoid recreation of wrapper objects */
+	protected HBaseKey hbaseKey;
+
+	protected HBaseResult hbaseResult;
+
+	private org.apache.hadoop.conf.Configuration hConf;
+
+	@Override
+	public void configure(Configuration parameters) {
+		HTable table = createTable(parameters);
+		setTable(table);
+		Scan scan = createScanner(parameters);
+		setScan(scan);
+	}
+
+	/**
+	 * Read the configuration and creates a {@link Scan} object.
+	 * 
+	 * @param parameters
+	 * @return
+	 */
+	protected Scan createScanner(Configuration parameters) {
+		Scan scan = null;
+		if (parameters.getString(SCAN, null) != null) {
+			try {
+				scan = HBaseUtil.convertStringToScan(parameters.getString(SCAN, null));
+			} catch (IOException e) {
+				LOG.error("An error occurred.", e);
+			}
+		} else {
+			try {
+				scan = new Scan();
+
+				// if (parameters.getString(SCAN_COLUMNS, null) != null) {
+				// scan.addColumns(parameters.getString(SCAN_COLUMNS, null));
+				// }
+
+				if (parameters.getString(SCAN_COLUMN_FAMILY, null) != null) {
+					scan.addFamily(Bytes.toBytes(parameters.getString(SCAN_COLUMN_FAMILY, null)));
+				}
+
+				if (parameters.getString(SCAN_TIMESTAMP, null) != null) {
+					scan.setTimeStamp(Long.parseLong(parameters.getString(SCAN_TIMESTAMP, null)));
+				}
+
+				if (parameters.getString(SCAN_TIMERANGE_START, null) != null
+					&& parameters.getString(SCAN_TIMERANGE_END, null) != null) {
+					scan.setTimeRange(
+						Long.parseLong(parameters.getString(SCAN_TIMERANGE_START, null)),
+						Long.parseLong(parameters.getString(SCAN_TIMERANGE_END, null)));
+				}
+
+				if (parameters.getString(SCAN_MAXVERSIONS, null) != null) {
+					scan.setMaxVersions(Integer.parseInt(parameters.getString(SCAN_MAXVERSIONS, null)));
+				}
+
+				if (parameters.getString(SCAN_CACHEDROWS, null) != null) {
+					scan.setCaching(Integer.parseInt(parameters.getString(SCAN_CACHEDROWS, null)));
+				}
+
+				// false by default, full table scans generate too much BC churn
+				scan.setCacheBlocks((parameters.getBoolean(SCAN_CACHEBLOCKS, false)));
+			} catch (Exception e) {
+				LOG.error(StringUtils.stringifyException(e));
+			}
+		}
+		return scan;
+	}
+
+	/**
+	 * Create an {@link HTable} instance and set it into this format.
+	 * 
+	 * @param parameters
+	 *        a {@link Configuration} that holds at least the table name.
+	 */
+	protected HTable createTable(Configuration parameters) {
+		String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
+		LOG.info("Got config location: " + configLocation);
+		if (configLocation != null)
+		{
+			org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
+			if(OperatingSystem.isWindows()) {
+				dummyConf.addResource(new Path("file:/" + configLocation));
+			} else {
+				dummyConf.addResource(new Path("file://" + configLocation));
+			}
+			hConf = HBaseConfiguration.create(dummyConf);
+			;
+			// hConf.set("hbase.master", "im1a5.internetmemory.org");
+			LOG.info("hbase master: " + hConf.get("hbase.master"));
+			LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
+
+		}
+		String tableName = parameters.getString(INPUT_TABLE, "");
+		try {
+			return new HTable(this.hConf, tableName);
+		} catch (Exception e) {
+			LOG.error(StringUtils.stringifyException(e));
+		}
+		return null;
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return this.endReached;
+	}
+
+	protected boolean nextResult() throws IOException {
+		if (this.tableRecordReader == null)
+		{
+			throw new IOException("No table record reader provided!");
+		}
+
+		try {
+			if (this.tableRecordReader.nextKeyValue())
+			{
+				ImmutableBytesWritable currentKey = this.tableRecordReader.getCurrentKey();
+				Result currentValue = this.tableRecordReader.getCurrentValue();
+
+				hbaseKey.setWritable(currentKey);
+				hbaseResult.setResult(currentValue);
+			} else
+			{
+				this.endReached = true;
+				return false;
+			}
+		} catch (InterruptedException e) {
+			LOG.error("Table reader has been interrupted", e);
+			throw new IOException(e);
+		}
+
+		return true;
+	}
+
+	@Override
+	public Record nextRecord(Record record) throws IOException {
+		if (nextResult()) {
+			mapResultToRecord(record, hbaseKey, hbaseResult);
+			return record;
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Maps the current HBase Result into a Record.
+	 * This implementation simply stores the HBaseKey at position 0, and the HBase Result object at position 1.
+	 * 
+	 * @param record
+	 * @param key
+	 * @param result
+	 */
+	public void mapResultToRecord(Record record, HBaseKey key, HBaseResult result) {
+		record.setField(0, key);
+		record.setField(1, result);
+	}
+
+	@Override
+	public void close() throws IOException {
+		this.tableRecordReader.close();
+	}
+
+	@Override
+	public void open(TableInputSplit split) throws IOException {
+		if (split == null)
+		{
+			throw new IOException("Input split is null!");
+		}
+
+		if (this.table == null)
+		{
+			throw new IOException("No HTable provided!");
+		}
+
+		if (this.scan == null)
+		{
+			throw new IOException("No Scan instance provided");
+		}
+
+		this.tableRecordReader = new TableRecordReader();
+
+		this.tableRecordReader.setHTable(this.table);
+
+		Scan sc = new Scan(this.scan);
+		sc.setStartRow(split.getStartRow());
+		LOG.info("split start row: " + new String(split.getStartRow()));
+		sc.setStopRow(split.getEndRow());
+		LOG.info("split end row: " + new String(split.getEndRow()));
+
+		this.tableRecordReader.setScan(sc);
+		this.tableRecordReader.restart(split.getStartRow());
+
+		this.hbaseKey = new HBaseKey();
+		this.hbaseResult = new HBaseResult();
+
+		endReached = false;
+	}
+
+
+	@Override
+	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
+
+		if (this.table == null) {
+			throw new IOException("No table was provided.");
+		}
+
+		final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
+
+		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
+
+			throw new IOException("Expecting at least one region.");
+		}
+		int count = 0;
+		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
+		for (int i = 0; i < keys.getFirst().length; i++) {
+
+			if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+				continue;
+			}
+
+			final String regionLocation = this.table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort();
+			final byte[] startRow = this.scan.getStartRow();
+			final byte[] stopRow = this.scan.getStopRow();
+
+			// determine if the given start an stop key fall into the region
+			if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+				Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+				(stopRow.length == 0 ||
+				Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+
+				final byte[] splitStart = startRow.length == 0 ||
+					Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
+					keys.getFirst()[i] : startRow;
+				final byte[] splitStop = (stopRow.length == 0 ||
+					Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
+					keys.getSecond()[i].length > 0 ?
+					keys.getSecond()[i] : stopRow;
+				final TableInputSplit split = new TableInputSplit(splits.size(), new String[] { regionLocation },
+					this.table.getTableName(), splitStart, splitStop);
+				splits.add(split);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
+				}
+			}
+		}
+
+		return splits.toArray(new TableInputSplit[0]);
+	}
+
+	/**
+	 * Test if the given region is to be included in the InputSplit while splitting
+	 * the regions of a table.
+	 * <p>
+	 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
+	 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
+	 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
+	 * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
+	 * to the ordering of the keys. <br>
+	 * <br>
+	 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
+	 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
+	 * i.e. all regions are included).
+	 * 
+	 * @param startKey
+	 *        Start key of the region
+	 * @param endKey
+	 *        End key of the region
+	 * @return true, if this region needs to be included as part of the input (default).
+	 */
+	private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
+		return true;
+	}
+
+
+	@Override
+	public Class<TableInputSplit> getInputSplitType() {
+
+		return TableInputSplit.class;
+	}
+
+	public void setTable(HTable table)
+	{
+		this.table = table;
+	}
+
+	public HTable getTable() {
+		return table;
+	}
+
+	public void setScan(Scan scan)
+	{
+		this.scan = scan;
+	}
+
+	public Scan getScan() {
+		return scan;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
new file mode 100644
index 0000000..a77402d
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * This class implements a input splits for HBase. Each table input split corresponds to a key range (low, high). All
+ * references to row below refer to the key of the row.
+ */
+public class TableInputSplit extends LocatableInputSplit {
+
+	/**
+	 * The name of the table to retrieve data from
+	 */
+	private byte[] tableName;
+
+	/**
+	 * The start row of the split.
+	 */
+	private byte[] startRow;
+
+	/**
+	 * The end row of the split.
+	 */
+	private byte[] endRow;
+
+	/**
+	 * Creates a new table input split
+	 * 
+	 * @param splitNumber
+	 *        the number of the input split
+	 * @param hostnames
+	 *        the names of the hosts storing the data the input split refers to
+	 * @param tableName
+	 *        the name of the table to retrieve data from
+	 * @param startRow
+	 *        the start row of the split
+	 * @param endRow
+	 *        the end row of the split
+	 */
+	TableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,
+			final byte[] endRow) {
+		super(splitNumber, hostnames);
+
+		this.tableName = tableName;
+		this.startRow = startRow;
+		this.endRow = endRow;
+	}
+
+	/**
+	 * Default constructor for serialization/deserialization.
+	 */
+	public TableInputSplit() {
+		super();
+
+		this.tableName = null;
+		this.startRow = null;
+		this.endRow = null;
+	}
+
+	/**
+	 * Returns the table name.
+	 * 
+	 * @return The table name.
+	 */
+	public byte[] getTableName() {
+		return this.tableName;
+	}
+
+	/**
+	 * Returns the start row.
+	 * 
+	 * @return The start row.
+	 */
+	public byte[] getStartRow() {
+		return this.startRow;
+	}
+
+	/**
+	 * Returns the end row.
+	 * 
+	 * @return The end row.
+	 */
+	public byte[] getEndRow() {
+		return this.endRow;
+	}
+
+
+	@Override
+	public void write(final DataOutputView out) throws IOException {
+
+		super.write(out);
+
+		// Write the table name
+		if (this.tableName == null) {
+			out.writeInt(-1);
+		} else {
+			out.writeInt(this.tableName.length);
+			out.write(this.tableName);
+		}
+
+		// Write the start row
+		if (this.startRow == null) {
+			out.writeInt(-1);
+		} else {
+			out.writeInt(this.startRow.length);
+			out.write(this.startRow);
+		}
+
+		// Write the end row
+		if (this.endRow == null) {
+			out.writeInt(-1);
+		} else {
+			out.writeInt(this.endRow.length);
+			out.write(this.endRow);
+		}
+	}
+
+
+	@Override
+	public void read(final DataInputView in) throws IOException {
+
+		super.read(in);
+
+		// Read the table name
+		int len = in.readInt();
+		if (len >= 0) {
+			this.tableName = new byte[len];
+			in.readFully(this.tableName);
+		}
+
+		// Read the start row
+		len = in.readInt();
+		if (len >= 0) {
+			this.startRow = new byte[len];
+			in.readFully(this.startRow);
+		}
+
+		// Read the end row
+		len = in.readInt();
+		if (len >= 0) {
+			this.endRow = new byte[len];
+			in.readFully(this.endRow);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
new file mode 100644
index 0000000..44d64de
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase.common;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Key;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Simple wrapper to encapsulate an HBase h{@link ImmutableBytesWritable} as a Key
+ */
+public class HBaseKey implements Key<HBaseKey> {
+
+	private static final long serialVersionUID = 1L;
+
+	private ImmutableBytesWritable writable;
+	
+
+	public HBaseKey() {
+		this.writable = new ImmutableBytesWritable();
+	}
+	
+
+	public HBaseKey(ImmutableBytesWritable writable) {
+		this.writable = writable;
+	}
+	
+	
+	public ImmutableBytesWritable getWritable() {
+		return writable;
+	}
+
+	public void setWritable(ImmutableBytesWritable writable) {
+		this.writable = writable;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		this.writable.write(out);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.writable.readFields(in);
+	}
+
+	@Override
+	public int hashCode() {
+		return this.writable.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == HBaseKey.class) {
+			return this.writable.equals(((HBaseKey) obj).writable);
+		} else {
+			return false;
+		}
+	}
+	
+	@Override
+	public int compareTo(HBaseKey other) {
+		return this.writable.compareTo(other.writable);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
new file mode 100644
index 0000000..d66f59f
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase.common;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
+import org.apache.hadoop.hbase.client.Result;
+
+public class HBaseResult implements Value {
+	
+	private static final long serialVersionUID = 1L;
+
+	private Result result;
+	
+	
+	public HBaseResult() {
+		this.result = new Result();
+	}
+	
+	public HBaseResult(Result result) {
+		this.result = result;
+	}
+	
+	
+	public Result getResult() {
+		return this.result;
+	}
+	
+	public void setResult(Result result) {
+		this.result = result;
+	}
+	
+	public String getStringData() {
+		if(this.result != null) {
+			return this.result.toString();
+		}
+		return null;
+	}
+	
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.result.readFields(in);
+	}
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		this.result.write(out);	
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
new file mode 100644
index 0000000..c1911c5
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase.common;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Base64;
+
+/**
+ * Utility for {@link TableInputFormat}
+ */
+public class HBaseUtil {
+
+	/**
+	 * Writes the given scan into a Base64 encoded string.
+	 * 
+	 * @param scan
+	 *        The scan to write out.
+	 * @return The scan saved in a Base64 encoded string.
+	 * @throws IOException
+	 *         When writing the scan fails.
+	 */
+	static String convertScanToString(Scan scan) throws IOException {
+		ByteArrayOutputStream out = new ByteArrayOutputStream();
+		DataOutputStream dos = new DataOutputStream(out);
+		scan.write(dos);
+		return Base64.encodeBytes(out.toByteArray());
+	}
+
+	/**
+	 * Converts the given Base64 string back into a Scan instance.
+	 * 
+	 * @param base64
+	 *        The scan details.
+	 * @return The newly created Scan instance.
+	 * @throws IOException
+	 *         When reading the scan instance fails.
+	 */
+	public static Scan convertStringToScan(String base64) throws IOException {
+		ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
+		DataInputStream dis = new DataInputStream(bis);
+		Scan scan = new Scan();
+		scan.readFields(dis);
+		return scan;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
new file mode 100644
index 0000000..a7bc2b3
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.TableInputFormat;
+import org.apache.flink.addons.hbase.common.HBaseKey;
+import org.apache.flink.addons.hbase.common.HBaseResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.Program;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.record.io.CsvOutputFormat;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.GenericDataSource;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * the occurrences of each word in the file.
+ */
+public class HBaseReadExample implements Program, ProgramDescription {
+	
+	public static class MyTableInputFormat extends  TableInputFormat {
+		
+		private static final long serialVersionUID = 1L;
+
+		private final byte[] META_FAMILY = "meta".getBytes();
+		
+		private final byte[] USER_COLUMN = "user".getBytes();
+		
+		private final byte[] TIMESTAMP_COLUMN = "timestamp".getBytes();
+		
+		private final byte[] TEXT_FAMILY = "text".getBytes();
+		
+		private final byte[] TWEET_COLUMN = "tweet".getBytes();
+		
+		public MyTableInputFormat() {
+			super();
+			
+		}
+		
+		@Override
+		protected HTable createTable(Configuration parameters) {
+			return super.createTable(parameters);
+		}
+		
+		@Override
+		protected Scan createScanner(Configuration parameters) {
+			Scan scan = new Scan ();
+			scan.addColumn (META_FAMILY, USER_COLUMN);
+			scan.addColumn (META_FAMILY, TIMESTAMP_COLUMN);
+			scan.addColumn (TEXT_FAMILY, TWEET_COLUMN);
+			return scan;
+		}
+		
+		StringValue row_string = new StringValue();
+		StringValue user_string = new StringValue();
+		StringValue timestamp_string = new StringValue();
+		StringValue tweet_string = new StringValue();
+		
+		@Override
+		public void mapResultToRecord(Record record, HBaseKey key,
+				HBaseResult result) {
+			Result res = result.getResult();
+			res.getRow();
+			record.setField(0, toString(row_string, res.getRow()));
+			record.setField(1, toString (user_string, res.getValue(META_FAMILY, USER_COLUMN)));
+			record.setField(2, toString (timestamp_string, res.getValue(META_FAMILY, TIMESTAMP_COLUMN)));
+			record.setField(3, toString (tweet_string, res.getValue(TEXT_FAMILY, TWEET_COLUMN)));
+		}
+		
+		private final StringValue toString (StringValue string, byte[] bytes) {
+			string.setValueAscii(bytes, 0, bytes.length);
+			return string;
+		}
+		
+	}
+	
+
+	@Override
+	public Plan getPlan(String... args) {
+		// parse job parameters
+		int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
+		String output    = (args.length > 1 ? args[1] : "");
+
+		GenericDataSource<TableInputFormat> source = new GenericDataSource<TableInputFormat>(new MyTableInputFormat(), "HBase Input");
+		source.setParameter(TableInputFormat.INPUT_TABLE, "twitter");
+		source.setParameter(TableInputFormat.CONFIG_LOCATION, "/etc/hbase/conf/hbase-site.xml");
+		FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, source, "HBase String dump");
+		CsvOutputFormat.configureRecordFormat(out)
+			.recordDelimiter('\n')
+			.fieldDelimiter(' ')
+			.field(StringValue.class, 0)
+			.field(StringValue.class, 1)
+			.field(StringValue.class, 2)
+			.field(StringValue.class, 3);
+		
+		Plan plan = new Plan(out, "HBase access Example");
+		plan.setDefaultParallelism(numSubTasks);
+		return plan;
+	}
+
+
+	@Override
+	public String getDescription() {
+		return "Parameters: [numSubStasks] [input] [output]";
+	}
+}


Mime
View raw message