flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [02/15] flink git commit: [FLINK-1266] Generalize DistributedFileSystem implementation to HadoopFileSystem wrapper, which supports all subclasses of org.apache.hadoop.fs.FileSystem. This allows us to let users use all file systems with support for HDFS.
Date Sat, 10 Jan 2015 18:19:39 GMT
[FLINK-1266] Generalize DistributedFileSystem implementation
to HadoopFileSystem wrapper, which supports all subclasses of org.apache.hadoop.fs.FileSystem.
This allows us to let users use all file systems with support for HDFS.
The change has been tested with Tachyon, Google Cloud Storage Hadoop Adapter and HDFS.

The change also cleans up the Hadoop dependency exclusions.

Conflicts:
	flink-addons/flink-hadoop-compatibility/pom.xml


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76343101
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76343101
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76343101

Branch: refs/heads/release-0.8
Commit: 763431013d9da45cd6bfbe6deb06c614f23c50bb
Parents: d533ddc
Author: Robert Metzger <rmetzger@apache.org>
Authored: Fri Dec 12 16:45:02 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Thu Jan 8 13:42:21 2015 +0100

----------------------------------------------------------------------
 docs/example_connectors.md                      |  52 ++-
 docs/hadoop_compatibility.md                    |   3 +-
 flink-addons/flink-hadoop-compatibility/pom.xml |  37 +-
 .../mapred/utils/HadoopUtils.java               |   4 +-
 .../mapreduce/utils/HadoopUtils.java            |   4 +-
 .../flink-streaming-connectors/pom.xml          |  22 +-
 flink-addons/flink-tachyon/pom.xml              | 109 +++++
 .../java/org/apache/flink/tachyon/HDFSTest.java | 120 +++++
 .../tachyon/TachyonFileSystemWrapperTest.java   | 173 +++++++
 .../src/test/resources/log4j.properties         |  23 +
 .../src/test/resources/tachyonHadoopConf.xml    |  28 ++
 flink-addons/flink-yarn/pom.xml                 | 138 +-----
 flink-addons/pom.xml                            |  16 +
 .../flink/core/fs/AbstractHadoopWrapper.java    |  28 ++
 .../org/apache/flink/core/fs/FileSystem.java    | 112 ++++-
 flink-runtime/pom.xml                           | 111 +----
 .../fs/hdfs/DistributedBlockLocation.java       | 139 ------
 .../fs/hdfs/DistributedDataInputStream.java     |  81 ----
 .../fs/hdfs/DistributedDataOutputStream.java    |  50 --
 .../runtime/fs/hdfs/DistributedFileStatus.java  |  94 ----
 .../runtime/fs/hdfs/DistributedFileSystem.java  | 431 -----------------
 .../runtime/fs/hdfs/HadoopBlockLocation.java    | 139 ++++++
 .../runtime/fs/hdfs/HadoopDataInputStream.java  |  81 ++++
 .../runtime/fs/hdfs/HadoopDataOutputStream.java |  50 ++
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java |  94 ++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 458 +++++++++++++++++++
 .../flink/runtime/fs/maprfs/MapRFileSystem.java |  28 +-
 pom.xml                                         | 178 +++++++
 28 files changed, 1666 insertions(+), 1137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/docs/example_connectors.md
----------------------------------------------------------------------
diff --git a/docs/example_connectors.md b/docs/example_connectors.md
index bef1216..a971b54 100644
--- a/docs/example_connectors.md
+++ b/docs/example_connectors.md
@@ -1,5 +1,5 @@
 ---
-title:  "Example: Connectors"
+title:  "Connecting to other systems"
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -20,14 +20,56 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Flink allows users to access many different systems as data sources or sinks. The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept of so called `InputFormat`s and `OutputFormat`s.
+## Reading from filesystems.
 
-One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows users to use all existing Hadoop input formats with Flink.
+Flink has build-in support for the following file systems:
 
-This page shows some examples for connecting Flink to other systems.
+| Filesystem        | Since           | Scheme  | Notes |
+| ------------- |-------------| -----| ------ |
+| Hadoop Distributed File System (HDFS)  | 0.2 | `hdfs://`| All HDFS versions are supported |
+| Amazon S3    |  0.2 | `s3://` |   |
+| MapR file system      | 0.7-incubating      |  `maprfs://` | The user has to manually place the required jar files in the `lib/` dir |
+| Tachyon   |  0.9 | `tachyon://` | Support through Hadoop file system implementation (see below) |
 
 
-## Access Microsoft Azure Table Storage
+
+### Using Hadoop file systems with Apache Flink
+
+Apache Flink allows users to use any file system implementing the `org.apache.hadoop.fs.FileSystem`
+interface. Hadoop ships adapters for FTP, [Hftp](http://hadoop.apache.org/docs/r1.2.1/hftp.html), and others.
+
+Flink has integrated testcases to validate the integration with [Tachyon](http://tachyon-project.org/).
+Other file systems we tested the integration is the
+[Google Cloud Storage Connector for Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector).
+
+In order to use a Hadoop file system with Flink, make sure that the `flink-conf.yaml` has set the
+`fs.hdfs.hadoopconf` property set to the Hadoop configuration directory.
+In addition to that, the Hadoop configuration (in that directory) needs to have an entry for each supported file system.
+For example for tachyon support, there must be the following entry in the `core-site.xml` file:
+
+~~~xml
+<property>
+  <name>fs.tachyon.impl</name>
+  <value>tachyon.hadoop.TFS</value>
+</property>
+~~~
+
+
+
+## Connecting to other systems using Input / Output Format wrappers for Hadoop
+
+Apache Flink allows users to access many different systems as data sources or sinks.
+The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
+of so called `InputFormat`s and `OutputFormat`s.
+
+One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows
+users to use all existing Hadoop input formats with Flink.
+
+This section shows some examples for connecting Flink to other systems.
+[Read more about Hadoop compatibility in Flink](hadoop_compatibility.html).
+
+
+### Access Microsoft Azure Table Storage
 
 _Note: This example works starting from Flink 0.6-incubating_
 

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/docs/hadoop_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/hadoop_compatibility.md b/docs/hadoop_compatibility.md
index 59e8c51..ed8eef4 100644
--- a/docs/hadoop_compatibility.md
+++ b/docs/hadoop_compatibility.md
@@ -33,7 +33,8 @@ You can:
 - use a Hadoop `Mapper` as [FlatMapFunction](dataset_transformations.html#flatmap).
 - use a Hadoop `Reducer` as [GroupReduceFunction](dataset_transformations.html#groupreduce-on-grouped-dataset).
 
-This document shows how to use existing Hadoop MapReduce code with Flink.
+This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the
+[Connecting to other systems](example_connectors.html) guide for reading from Hadoop supported file systems.
 
 ### Project Configuration
 

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/pom.xml b/flink-addons/flink-hadoop-compatibility/pom.xml
index 27cb0b6..072ae8e 100644
--- a/flink-addons/flink-hadoop-compatibility/pom.xml
+++ b/flink-addons/flink-hadoop-compatibility/pom.xml
@@ -17,8 +17,7 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
-	
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 	
 	<modelVersion>4.0.0</modelVersion>
@@ -74,40 +73,6 @@ under the License.
 				<dependency>
 					<groupId>org.apache.hadoop</groupId>
 					<artifactId>hadoop-mapreduce-client-core</artifactId>
-					<exclusions>
-						<exclusion>
-							<groupId>asm</groupId>
-							<artifactId>asm</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>tomcat</groupId>
-							<artifactId>jasper-compiler</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>tomcat</groupId>
-							<artifactId>jasper-runtime</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jetty</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jsp-api-2.1</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jsp-2.1</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jetty-util</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.eclipse.jdt</groupId>
-							<artifactId>core</artifactId>
-						</exclusion>
-					</exclusions>
 				</dependency>
 			</dependencies>
 		</profile>

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
index 137d741..2d2f518 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
@@ -22,7 +22,7 @@ package org.apache.flink.hadoopcompatibility.mapred.utils;
 import java.lang.reflect.Constructor;
 import java.util.Map;
 
-import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobID;
@@ -36,7 +36,7 @@ public class HadoopUtils {
 	 * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
 	 */
 	public static void mergeHadoopConf(JobConf jobConf) {
-		org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
+		org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
 		for (Map.Entry<String, String> e : hadoopConf) {
 			jobConf.set(e.getKey(), e.getValue());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
index 85edf8f..86b730f 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.hadoopcompatibility.mapreduce.utils;
 import java.lang.reflect.Constructor;
 import java.util.Map;
 
-import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -34,7 +34,7 @@ public class HadoopUtils {
 	 * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
 	 */
 	public static void mergeHadoopConf(Configuration configuration) {
-		Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
+		Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
 		
 		for (Map.Entry<String, String> e : hadoopConf) {
 			configuration.set(e.getKey(), e.getValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index ea7f3c1..a1ab0be 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -17,8 +17,9 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
 	<modelVersion>4.0.0</modelVersion>
 
@@ -41,7 +42,7 @@ under the License.
 			<artifactId>flink-shaded</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-core</artifactId>
@@ -224,14 +225,13 @@ under the License.
 				</executions>
 			</plugin>
 			<plugin>
-            			<artifactId>maven-assembly-plugin</artifactId>
-                		<configuration>
-            	    			<descriptorRefs>
-                				<descriptorRef>jar-with-dependencies</descriptorRef>
-                    			</descriptorRefs>
-            			</configuration>
-        		</plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
-
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/flink-tachyon/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-tachyon/pom.xml b/flink-addons/flink-tachyon/pom.xml
new file mode 100644
index 0000000..72fcffe
--- /dev/null
+++ b/flink-addons/flink-tachyon/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<artifactId>flink-addons</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>0.8-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-tachyon</artifactId>
+	<name>flink-tachyon</name>
+
+	<packaging>jar</packaging>
+
+	<!--
+		This is a Hadoop2 only flink module.
+	-->
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.tachyonproject</groupId>
+			<artifactId>tachyon</artifactId>
+			<version>0.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.tachyonproject</groupId>
+			<artifactId>tachyon</artifactId>
+			<version>0.5.0</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-util</artifactId>
+			<version>7.6.8.v20121106</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version>
+		</dependency>
+	</dependencies>
+	<dependencyManagement>
+		<dependencies>
+			<dependency>
+				<groupId>org.eclipse.jetty</groupId>
+				<artifactId>jetty-server</artifactId>
+				<version>7.6.8.v20121106</version>
+				<scope>test</scope>
+			</dependency>
+			<dependency>
+				<groupId>org.eclipse.jetty</groupId>
+				<artifactId>jetty-servlet</artifactId>
+				<version>7.6.8.v20121106</version>
+				<scope>test</scope>
+			</dependency>
+		</dependencies>
+	</dependencyManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
new file mode 100644
index 0000000..9b4abd4
--- /dev/null
+++ b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.tachyon;
+
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.WordCount;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * This test should logically be located in the 'flink-runtime' tests. However, this project
+ * has already all dependencies required (flink-java-examples). Also, the DOPOneExecEnv is here.
+ */
+public class HDFSTest {
+
+	private String hdfsURI;
+	private MiniDFSCluster hdfsCluster;
+	private org.apache.hadoop.fs.Path hdPath;
+	private org.apache.hadoop.fs.FileSystem hdfs;
+
+	@Before
+	public void createHDFS() {
+		try {
+			Configuration hdConf = new Configuration();
+
+			File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+
+			hdPath = new org.apache.hadoop.fs.Path("/test");
+			hdfs = hdPath.getFileSystem(hdConf);
+			FSDataOutputStream stream = hdfs.create(hdPath);
+			for(int i = 0; i < 10; i++) {
+				stream.write("Hello HDFS\n".getBytes());
+			}
+			stream.close();
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@After
+	public void destroyHDFS() {
+		try {
+			hdfs.delete(hdPath, false);
+			hdfsCluster.shutdown();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	@Test
+	public void testHDFS() {
+
+		Path file = new Path(hdfsURI + hdPath);
+		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
+		try {
+			FileSystem fs = file.getFileSystem();
+			Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
+			new TachyonFileSystemWrapperTest.DopOneTestEnvironment();
+			try {
+				WordCount.main(new String[]{file.toString(), result.toString()});
+			} catch(Throwable t) {
+				t.printStackTrace();
+				Assert.fail("Test failed with " + t.getMessage());
+			}
+			Assert.assertTrue("No result file present", hdfs.exists(result));
+			// validate output:
+			org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
+			StringWriter writer = new StringWriter();
+			IOUtils.copy(inStream, writer);
+			String resultString = writer.toString();
+
+			Assert.assertEquals("hdfs 10\n" +
+					"hello 10\n", resultString);
+			inStream.close();
+
+		} catch (IOException e) {
+			e.printStackTrace();
+			Assert.fail("Error in test: " + e.getMessage() );
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
new file mode 100644
index 0000000..582e7e7
--- /dev/null
+++ b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.tachyon;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.examples.java.wordcount.WordCount;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import tachyon.client.InStream;
+import tachyon.client.OutStream;
+import tachyon.client.ReadType;
+import tachyon.client.TachyonFS;
+import tachyon.client.TachyonFile;
+import tachyon.client.WriteType;
+import tachyon.master.LocalTachyonCluster;
+
+import java.io.File;
+import java.io.StringWriter;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+public class TachyonFileSystemWrapperTest {
+	private static final long TACHYON_WORKER_CAPACITY = 1024 * 1024 * 32;
+	private static final String TACHYON_TEST_IN_FILE_NAME = "tachyontest";
+	private static final String TACHYON_TEST_OUT_FILE_NAME = "result";
+	private static final Path HADOOP_CONFIG_PATH;
+
+	static {
+		URL resource = TachyonFileSystemWrapperTest.class.getResource("/tachyonHadoopConf.xml");
+		File file = null;
+		try {
+			file = new File(resource.toURI());
+		} catch (URISyntaxException e) {
+			throw new RuntimeException("Unable to load req. res", e);
+		}
+		if(!file.exists()) {
+			throw new RuntimeException("Unable to load required resource");
+		}
+		HADOOP_CONFIG_PATH = new Path(file.getAbsolutePath());
+	}
+
+	private LocalTachyonCluster cluster;
+	private TachyonFS client;
+	private String input;
+	private String output;
+
+	@Before
+	public void startTachyon() {
+		try {
+			cluster = new LocalTachyonCluster(TACHYON_WORKER_CAPACITY);
+			cluster.start();
+			client = cluster.getClient();
+			int id = client.createFile("/" + TACHYON_TEST_IN_FILE_NAME, 1024 * 32);
+			Assert.assertNotEquals("Unable to create file", -1, id);
+
+			TachyonFile testFile = client.getFile(id);
+			Assert.assertNotNull(testFile);
+
+
+			OutStream outStream = testFile.getOutStream(WriteType.MUST_CACHE);
+			for(int i = 0; i < 10; i++) {
+				outStream.write("Hello Tachyon\n".getBytes());
+			}
+			outStream.close();
+			final String tachyonBase = "tachyon://" + cluster.getMasterHostname() + ":" + cluster.getMasterPort();
+			input = tachyonBase + "/" + TACHYON_TEST_IN_FILE_NAME;
+			output = tachyonBase + "/" + TACHYON_TEST_OUT_FILE_NAME;
+
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test preparation failed with exception: "+e.getMessage());
+		}
+	}
+
+	@After
+	public void stopTachyon() {
+		try {
+			cluster.stop();
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test teardown failed with exception: "+e.getMessage());
+		}
+	}
+	// Verify that Hadoop's FileSystem can load the TFS (Tachyon File System)
+	@Test
+	public void testHadoopLoadability() {
+		try {
+			Path tPath = new Path(input);
+			Configuration conf = new Configuration();
+			conf.addResource(HADOOP_CONFIG_PATH);
+			Assert.assertEquals("tachyon.hadoop.TFS", conf.get("fs.tachyon.impl", null));
+			FileSystem hfs = tPath.getFileSystem(conf);
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test failed with exception: "+e.getMessage());
+		}
+	}
+
+
+	@Test
+	public void testTachyon() {
+		try {
+			org.apache.flink.configuration.Configuration addHDConfToFlinkConf = new org.apache.flink.configuration.Configuration();
+			addHDConfToFlinkConf.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, HADOOP_CONFIG_PATH.toString());
+			GlobalConfiguration.includeConfiguration(addHDConfToFlinkConf);
+
+			new DopOneTestEnvironment(); // initialize DOP one
+
+			WordCount.main(new String[]{input, output});
+
+//			List<Integer> files = client.listFiles("/", true);
+//			for(Integer file : files) {
+//				TachyonFile f = client.getFile(file);
+//				System.out.println("file = "+file+" f = "+f.getPath());
+//			}
+			// verify result
+			TachyonFile resultFile = client.getFile("/" + TACHYON_TEST_OUT_FILE_NAME);
+			Assert.assertNotNull("Result file has not been created", resultFile);
+			InStream inStream = resultFile.getInStream(ReadType.CACHE);
+			Assert.assertNotNull("Result file has not been created", inStream);
+			StringWriter writer = new StringWriter();
+			IOUtils.copy(inStream, writer);
+			String resultString = writer.toString();
+
+			Assert.assertEquals("hello 10\n" +
+					"tachyon 10\n", resultString);
+
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test failed with exception: "+e.getMessage());
+		}
+	}
+
+	// package visible
+	static final class DopOneTestEnvironment extends LocalEnvironment {
+	 	static {
+    		initializeContextEnvironment(new ExecutionEnvironmentFactory() {
+				@Override
+				public ExecutionEnvironment createExecutionEnvironment() {
+					LocalEnvironment le = new LocalEnvironment();
+					le.setDegreeOfParallelism(1);
+					return le;
+				}
+			});
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/flink-tachyon/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-addons/flink-tachyon/src/test/resources/log4j.properties b/flink-addons/flink-tachyon/src/test/resources/log4j.properties
new file mode 100644
index 0000000..2c9a677
--- /dev/null
+++ b/flink-addons/flink-tachyon/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
+# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
+# we provide a log4j.properties file ourselves.
+
+log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-tachyon/src/test/resources/tachyonHadoopConf.xml b/flink-addons/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
new file mode 100644
index 0000000..0af8190
--- /dev/null
+++ b/flink-addons/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+
+<configuration>
+    <property>
+        <name>fs.tachyon.impl</name>
+        <value>tachyon.hadoop.TFS</value>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/pom.xml b/flink-addons/flink-yarn/pom.xml
index bddb2cd..54f1bfb 100644
--- a/flink-addons/flink-yarn/pom.xml
+++ b/flink-addons/flink-yarn/pom.xml
@@ -61,157 +61,21 @@ under the License.
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-yarn-client</artifactId>
-			<exclusions>
-				<exclusion>
-					<groupId>asm</groupId>
-					<artifactId>asm</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>tomcat</groupId>
-						<artifactId>jasper-compiler</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>tomcat</groupId>
-					<artifactId>jasper-runtime</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jsp-api-2.1</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jsp-2.1</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.eclipse.jdt</groupId>
-					<artifactId>core</artifactId>
-				</exclusion>
-			</exclusions>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-common</artifactId>
-			<exclusions>
-				<exclusion>
-					<groupId>asm</groupId>
-					<artifactId>asm</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>tomcat</groupId>
-						<artifactId>jasper-compiler</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>tomcat</groupId>
-					<artifactId>jasper-runtime</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jsp-api-2.1</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jsp-2.1</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.eclipse.jdt</groupId>
-					<artifactId>core</artifactId>
-				</exclusion>
-			</exclusions>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>
-			<exclusions>
-				<exclusion>
-					<groupId>asm</groupId>
-					<artifactId>asm</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>tomcat</groupId>
-						<artifactId>jasper-compiler</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>tomcat</groupId>
-					<artifactId>jasper-runtime</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jsp-api-2.1</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jsp-2.1</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.eclipse.jdt</groupId>
-					<artifactId>core</artifactId>
-				</exclusion>
-			</exclusions>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-mapreduce-client-core</artifactId>
-			<exclusions>
-				<exclusion>
-					<groupId>asm</groupId>
-					<artifactId>asm</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>tomcat</groupId>
-						<artifactId>jasper-compiler</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>tomcat</groupId>
-					<artifactId>jasper-runtime</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jsp-api-2.1</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jsp-2.1</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.eclipse.jdt</groupId>
-					<artifactId>core</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency> 
+		</dependency>
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-addons/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/pom.xml b/flink-addons/pom.xml
index 2cd3788..5c7bdf1 100644
--- a/flink-addons/pom.xml
+++ b/flink-addons/pom.xml
@@ -45,6 +45,22 @@ under the License.
 	<!-- See main pom.xml for explanation of profiles -->
 	<profiles>
 		<profile>
+			<id>hadoop-2</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop2--><name>!hadoop.profile</name>
+				</property>
+			</activation>
+			<modules>
+				<!-- Include the Flink-tachyon project only for HD2.
+				 	The HDFS minicluster interfaces changed between the two versions.
+				 -->
+				<module>flink-tachyon</module>
+			</modules>
+		</profile>
+
+		<profile>
 			<id>include-yarn</id>
 			<activation>
 				<property>

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-core/src/main/java/org/apache/flink/core/fs/AbstractHadoopWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractHadoopWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractHadoopWrapper.java
new file mode 100644
index 0000000..69f37bc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractHadoopWrapper.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.core.fs;
+
+public interface AbstractHadoopWrapper {
+
+	/**
+	 * Test whether the HadoopWrapper can wrap the given file system scheme.
+	 * @param scheme
+	 * @return
+	 */
+	public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index eabe830..7980cba 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -28,6 +28,7 @@ package org.apache.flink.core.fs;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
@@ -36,6 +37,8 @@ import java.util.Map;
 import org.apache.flink.util.ClassUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An abstract base class for a fairly generic file system. It
@@ -43,15 +46,18 @@ import org.apache.flink.util.StringUtils;
  * one that reflects the locally-connected disk.
  */
 public abstract class FileSystem {
+	private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
 
 	private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";
 	
-	private static final String HADOOP_DISTRIBUTED_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.DistributedFileSystem";
+	private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
 
 	private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";
 	
 	private static final String S3_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.s3.S3FileSystem";
 
+	private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
+
 	
 	/** Object used to protect calls to specific methods.*/
 	private static final Object SYNCHRONIZATION_OBJECT = new Object();
@@ -148,7 +154,7 @@ public abstract class FileSystem {
 	private static final Map<String, String> FSDIRECTORY = new HashMap<String, String>();
 
 	static {
-		FSDIRECTORY.put("hdfs", HADOOP_DISTRIBUTED_FILESYSTEM_CLASS);
+		FSDIRECTORY.put("hdfs", HADOOP_WRAPPER_FILESYSTEM_CLASS);
 		FSDIRECTORY.put("maprfs", MAPR_FILESYSTEM_CLASS);
 		FSDIRECTORY.put("file", LOCAL_FILESYSTEM_CLASS);
 		FSDIRECTORY.put("s3", S3_FILESYSTEM_CLASS);
@@ -188,7 +194,6 @@ public abstract class FileSystem {
 	 *         thrown if a reference to the file system instance could not be obtained
 	 */
 	public static FileSystem get(URI uri) throws IOException {
-
 		FileSystem fs = null;
 
 		synchronized (SYNCHRONIZATION_OBJECT) {
@@ -222,38 +227,97 @@ public abstract class FileSystem {
 			}
 
 			// Try to create a new file system
+
 			if (!FSDIRECTORY.containsKey(uri.getScheme())) {
-				throw new IOException("No file system found with scheme " + uri.getScheme()
-						+ ", referenced in file URI '" + uri.toString() + "'.");
-			}
+				// no build in support for this file system. Falling back to Hadoop's FileSystem impl.
+				Class<?> wrapperClass = getHadoopWrapperClassNameForFileSystem(uri.getScheme());
+				if(wrapperClass != null) {
+					// hadoop has support for the FileSystem
+					FSKey wrappedKey = new FSKey(HADOOP_WRAPPER_SCHEME + "+" + uri.getScheme(), uri.getAuthority());
+					if (CACHE.containsKey(wrappedKey)) {
+						return CACHE.get(wrappedKey);
+					}
+					// cache didn't contain the file system. instantiate it:
+
+					// by now we know that the HadoopFileSystem wrapper can wrap the file system.
+					fs = instantiateHadoopFileSystemWrapper(wrapperClass);
+					fs.initialize(uri);
+					System.out.println("Initializing new instance of wrapper for "+wrapperClass);
+					CACHE.put(wrappedKey, fs);
+
+				} else {
+					// we can not read from this file system.
+					throw new IOException("No file system found with scheme " + uri.getScheme()
+							+ ", referenced in file URI '" + uri.toString() + "'.");
+				}
+			} else {
+				// we end up here if we have a file system with build-in flink support.
+				String fsClass = FSDIRECTORY.get(uri.getScheme());
+				if(fsClass.equals(HADOOP_WRAPPER_FILESYSTEM_CLASS)) {
+					fs = instantiateHadoopFileSystemWrapper(null);
+				} else {
+					fs = instantiateFileSystem(fsClass);
+				}
+				System.out.println("Initializing new instance of native class for "+fsClass);
+				// Initialize new file system object
+				fs.initialize(uri);
 
-			Class<? extends FileSystem> fsClass;
-			try {
-				fsClass = ClassUtils.getFileSystemByName(FSDIRECTORY.get(uri.getScheme()));
-			} catch (ClassNotFoundException e1) {
-				throw new IOException(StringUtils.stringifyException(e1));
+				// Add new file system object to cache
+				CACHE.put(key, fs);
 			}
+		}
 
-			try {
-				fs = fsClass.newInstance();
-			}
-			catch (InstantiationException e) {
-				throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);
-			}
-			catch (IllegalAccessException e) {
-				throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);
-			}
+		return fs;
+	}
 
-			// Initialize new file system object
-			fs.initialize(uri);
+	//Class must implement Hadoop FileSystem interface. The class is not avaiable in 'flink-core'.
+	private static FileSystem instantiateHadoopFileSystemWrapper(Class<?> wrappedFileSystem) throws IOException {
+		FileSystem fs = null;
+		Class<? extends FileSystem> fsClass;
+		try {
+			fsClass = ClassUtils.getFileSystemByName(HADOOP_WRAPPER_FILESYSTEM_CLASS);
+			Constructor<? extends FileSystem> fsClassCtor = fsClass.getConstructor(Class.class);
+			fs = fsClassCtor.newInstance(wrappedFileSystem);
+		} catch (Throwable e) {
+			throw new IOException("Error loading Hadoop FS wrapper", e);
+		}
+		return fs;
+	}
 
-			// Add new file system object to cache
-			CACHE.put(key, fs);
+	private static FileSystem instantiateFileSystem(String className) throws IOException {
+		FileSystem fs = null;
+		Class<? extends FileSystem> fsClass;
+		try {
+			fsClass = ClassUtils.getFileSystemByName(className);
+		} catch (ClassNotFoundException e1) {
+			throw new IOException(StringUtils.stringifyException(e1));
 		}
 
+		try {
+			fs = fsClass.newInstance();
+		}
+		catch (InstantiationException e) {
+			throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);
+		}
+		catch (IllegalAccessException e) {
+			throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);
+		}
 		return fs;
 	}
 
+	private static AbstractHadoopWrapper hadoopWrapper;
+
+	private static Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
+		if(hadoopWrapper == null) {
+			try {
+				hadoopWrapper = (AbstractHadoopWrapper) instantiateHadoopFileSystemWrapper(null);
+			} catch (IOException e) {
+				throw new RuntimeException("Error creating new Hadoop wrapper", e);
+			}
+		}
+		return hadoopWrapper.getHadoopWrapperClassNameForFileSystem(scheme);
+	}
+
 	/**
 	 * Returns the path of the file system's current working directory.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 960fee6..d242406 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -70,6 +70,7 @@ under the License.
 			<version>1.8.1</version>
 		</dependency>
 
+
 		<dependency>
 			<groupId>io.netty</groupId>
 			<artifactId>netty-all</artifactId>
@@ -186,40 +187,6 @@ under the License.
 				<dependency>
 					<groupId>org.apache.hadoop</groupId>
 					<artifactId>hadoop-core</artifactId>
-					<exclusions>
-						<exclusion>
-							<groupId>asm</groupId>
-							<artifactId>asm</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>tomcat</groupId>
-							<artifactId>jasper-compiler</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>tomcat</groupId>
-							<artifactId>jasper-runtime</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jetty</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jsp-api-2.1</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jsp-2.1</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jetty-util</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.eclipse.jdt</groupId>
-							<artifactId>core</artifactId>
-						</exclusion>
-					</exclusions>
 				</dependency>
 			</dependencies>
 		</profile>
@@ -235,86 +202,10 @@ under the License.
 				<dependency>
 					<groupId>org.apache.hadoop</groupId>
 					<artifactId>hadoop-common</artifactId>
-					<exclusions>
-						<exclusion>
-							<groupId>asm</groupId>
-							<artifactId>asm</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>tomcat</groupId>
-								<artifactId>jasper-compiler</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>tomcat</groupId>
-							<artifactId>jasper-runtime</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jetty</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jsp-api-2.1</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jsp-2.1</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jetty-util</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.eclipse.jdt</groupId>
-							<artifactId>core</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>javax.servlet</groupId>
-							<artifactId>servlet-api</artifactId>
-						</exclusion>
-					</exclusions>
 				</dependency>
 				<dependency>
 					<groupId>org.apache.hadoop</groupId>
 					<artifactId>hadoop-hdfs</artifactId>
-					<exclusions>
-						<exclusion>
-							<groupId>asm</groupId>
-							<artifactId>asm</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>tomcat</groupId>
-								<artifactId>jasper-compiler</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>tomcat</groupId>
-							<artifactId>jasper-runtime</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jetty</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jsp-api-2.1</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jsp-2.1</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.mortbay.jetty</groupId>
-							<artifactId>jetty-util</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>org.eclipse.jdt</groupId>
-							<artifactId>core</artifactId>
-						</exclusion>
-						<exclusion>
-							<groupId>javax.servlet</groupId>
-							<artifactId>servlet-api</artifactId>
-						</exclusion>
-					</exclusions>
 				</dependency>
 			</dependencies>
 		</profile>

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedBlockLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedBlockLocation.java
deleted file mode 100644
index 2040bb4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedBlockLocation.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.flink.core.fs.BlockLocation;
-
-/**
- * Implementation of the {@link BlockLocation} interface for the
- * Hadoop Distributed File System.
- * 
- */
-public final class DistributedBlockLocation implements BlockLocation {
-
-	/**
-	 * Specifies the character separating the hostname from the domain name.
-	 */
-	private static final char DOMAIN_SEPARATOR = '.';
-
-	/**
-	 * Regular expression for an IPv4 address.
-	 */
-	private static final Pattern IPV4_PATTERN = Pattern.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+$");
-
-	/**
-	 * The original Hadoop block location object.
-	 */
-	private final org.apache.hadoop.fs.BlockLocation blockLocation;
-
-	/**
-	 * Stores the hostnames without the domain suffix.
-	 */
-	private String[] hostnames;
-
-	/**
-	 * Creates a new block location
-	 * 
-	 * @param blockLocation
-	 *        the original HDFS block location
-	 */
-	public DistributedBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
-
-		this.blockLocation = blockLocation;
-	}
-
-
-	@Override
-	public String[] getHosts() throws IOException {
-
-		/**
-		 * Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
-		 * the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
-		 * depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
-		 * sure it does not contain the domain suffix.
-		 */
-		if (this.hostnames == null) {
-
-			final String[] hadoopHostnames = blockLocation.getHosts();
-			this.hostnames = new String[hadoopHostnames.length];
-
-			for (int i = 0; i < hadoopHostnames.length; ++i) {
-				this.hostnames[i] = stripHostname(hadoopHostnames[i]);
-			}
-		}
-
-		return this.hostnames;
-	}
-
-	/**
-	 * Looks for a domain suffix in a FQDN and strips it if present.
-	 * 
-	 * @param originalHostname
-	 *        the original hostname, possibly an FQDN
-	 * @return the stripped hostname without the domain suffix
-	 */
-	private static String stripHostname(final String originalHostname) {
-
-		// Check if the hostname domains the domain separator character
-		final int index = originalHostname.indexOf(DOMAIN_SEPARATOR);
-		if (index == -1) {
-			return originalHostname;
-		}
-
-		// Make sure we are not stripping an IPv4 address
-		final Matcher matcher = IPV4_PATTERN.matcher(originalHostname);
-		if (matcher.matches()) {
-			return originalHostname;
-		}
-
-		if (index == 0) {
-			throw new IllegalStateException("Hostname " + originalHostname + " starts with a " + DOMAIN_SEPARATOR);
-		}
-
-		return originalHostname.substring(0, index);
-	}
-
-
-	@Override
-	public long getLength() {
-
-		return this.blockLocation.getLength();
-	}
-
-
-	@Override
-	public long getOffset() {
-
-		return this.blockLocation.getOffset();
-	}
-
-
-	@Override
-	public int compareTo(final BlockLocation o) {
-
-		final long diff = getOffset() - o.getOffset();
-
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedDataInputStream.java
deleted file mode 100644
index 1fb02c2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedDataInputStream.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import java.io.IOException;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-
-/**
- * Concrete implementation of the {@link FSDataInputStream} for the
- * Hadoop Distributed File System.
- * 
- */
-public final class DistributedDataInputStream extends FSDataInputStream {
-
-	private org.apache.hadoop.fs.FSDataInputStream fsDataInputStream = null;
-
-	/**
-	 * Creates a new data input stream from the given HDFS input stream
-	 * 
-	 * @param fsDataInputStream
-	 *        the HDFS input stream
-	 */
-	public DistributedDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
-		this.fsDataInputStream = fsDataInputStream;
-	}
-
-	@Override
-	public synchronized void seek(long desired) throws IOException {
-
-		fsDataInputStream.seek(desired);
-	}
-
-	@Override
-	public int read() throws IOException {
-
-		return fsDataInputStream.read();
-	}
-
-	@Override
-	public void close() throws IOException {
-
-		fsDataInputStream.close();
-	}
-
-	@Override
-	public int read(byte[] buffer, int offset, int length) throws IOException {
-
-		return fsDataInputStream.read(buffer, offset, length);
-	}
-
-
-	@Override
-	public int available() throws IOException {
-		return fsDataInputStream.available();
-	}
-
-
-	@Override
-	public long skip(long n) throws IOException {
-		return fsDataInputStream.skip(n);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedDataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedDataOutputStream.java
deleted file mode 100644
index 946a3f3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedDataOutputStream.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import java.io.IOException;
-
-import org.apache.flink.core.fs.FSDataOutputStream;
-
-public final class DistributedDataOutputStream extends FSDataOutputStream {
-
-	private org.apache.hadoop.fs.FSDataOutputStream fdos;
-
-	public DistributedDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos) {
-		this.fdos = fdos;
-	}
-
-	@Override
-	public void write(int b) throws IOException {
-
-		fdos.write(b);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		fdos.write(b, off, len);
-	}
-
-	@Override
-	public void close() throws IOException {
-		fdos.close();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
deleted file mode 100644
index 6b819dc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.Path;
-
-/**
- * Concrete implementation of the {@link FileStatus} interface for the
- * Hadoop Distribution File System.
- * 
- */
-public final class DistributedFileStatus implements FileStatus {
-
-	private org.apache.hadoop.fs.FileStatus fileStatus;
-
-	/**
-	 * Creates a new file status from a HDFS file status.
-	 * 
-	 * @param fileStatus
-	 *        the HDFS file status
-	 */
-	public DistributedFileStatus(org.apache.hadoop.fs.FileStatus fileStatus) {
-		this.fileStatus = fileStatus;
-	}
-
-	@Override
-	public long getLen() {
-
-		return fileStatus.getLen();
-	}
-
-	@Override
-	public long getBlockSize() {
-
-		long blocksize = fileStatus.getBlockSize();
-		if (blocksize > fileStatus.getLen()) {
-			return fileStatus.getLen();
-		}
-
-		return blocksize;
-	}
-
-	@Override
-	public long getAccessTime() {
-
-		return fileStatus.getAccessTime();
-	}
-
-	@Override
-	public long getModificationTime() {
-
-		return fileStatus.getModificationTime();
-	}
-
-	@Override
-	public short getReplication() {
-
-		return fileStatus.getReplication();
-	}
-
-	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
-		return this.fileStatus;
-	}
-
-	@Override
-	public Path getPath() {
-
-		return new Path(fileStatus.getPath().toString());
-	}
-
-	@Override
-	public boolean isDir() {
-
-		return fileStatus.isDir();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
deleted file mode 100644
index 0836651..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.net.UnknownHostException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Concrete implementation of the {@link FileSystem} base class for the Hadoop Distribution File System. The
- * class is essentially a wrapper class which encapsulated the original Hadoop HDFS API.
- */
-public final class DistributedFileSystem extends FileSystem {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(DistributedFileSystem.class);
-	
-	private static final String DEFAULT_HDFS_CLASS = "org.apache.hadoop.hdfs.DistributedFileSystem";
-	
-	/**
-	 * Configuration value name for the DFS implementation name. Usually not specified in hadoop configurations.
-	 */
-	private static final String HDFS_IMPLEMENTATION_KEY = "fs.hdfs.impl";
-	
-
-	private final org.apache.hadoop.conf.Configuration conf;
-
-	private final org.apache.hadoop.fs.FileSystem fs;
-
-
-	/**
-	 * Creates a new DistributedFileSystem object to access HDFS
-	 * 
-	 * @throws IOException
-	 *         throw if the required HDFS classes cannot be instantiated
-	 */
-	public DistributedFileSystem() throws IOException {
-
-		// Create new Hadoop configuration object
-		this.conf = getHadoopConfiguration();
-
-		Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = null;
-		
-		// try to get the FileSystem implementation class Hadoop 2.0.0 style
-		{
-			LOG.debug("Trying to load HDFS class Hadoop 2.x style.");
-			
-			Object fsHandle = null;
-			try {
-				Method newApi = org.apache.hadoop.fs.FileSystem.class.getMethod("getFileSystemClass", String.class, org.apache.hadoop.conf.Configuration.class);
-				fsHandle = newApi.invoke(null, "hdfs", conf); 
-			} catch (Exception e) {
-				// if we can't find the FileSystem class using the new API,
-				// clazz will still be null, we assume we're running on an older Hadoop version
-			}
-			
-			if (fsHandle != null) {
-				if (fsHandle instanceof Class && org.apache.hadoop.fs.FileSystem.class.isAssignableFrom((Class<?>) fsHandle)) {
-					fsClass = ((Class<?>) fsHandle).asSubclass(org.apache.hadoop.fs.FileSystem.class);
-					
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Loaded '" + fsClass.getName() + "' as HDFS class.");
-					}
-				}
-				else {
-					LOG.debug("Unexpected return type from 'org.apache.hadoop.fs.FileSystem.getFileSystemClass(String, Configuration)'.");
-					throw new RuntimeException("The value returned from org.apache.hadoop.fs.FileSystem.getFileSystemClass(String, Configuration) is not a valid subclass of org.apache.hadoop.fs.FileSystem.");
-				}
-			}
-		}
-		
-		// fall back to an older Hadoop version
-		if (fsClass == null)
-		{
-			// first of all, check for a user-defined hdfs class
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '"
-						+ HDFS_IMPLEMENTATION_KEY + "'.");
-			}
-
-			Class<?> classFromConfig = conf.getClass(HDFS_IMPLEMENTATION_KEY, null);
-			
-			if (classFromConfig != null)
-			{
-				if (org.apache.hadoop.fs.FileSystem.class.isAssignableFrom(classFromConfig)) {
-					fsClass = classFromConfig.asSubclass(org.apache.hadoop.fs.FileSystem.class);
-					
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Loaded HDFS class '" + fsClass.getName() + "' as specified in configuration.");
-					}
-				}
-				else {
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY + " is of wrong type.");
-					}
-					
-					throw new IOException("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY +
-						" cannot be cast to a FileSystem type.");
-				}
-			}
-			else {
-				// load the default HDFS class
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Trying to load default HDFS implementation " + DEFAULT_HDFS_CLASS);
-				}
-				
-				try {
-					Class <?> reflectedClass = Class.forName(DEFAULT_HDFS_CLASS);
-					if (org.apache.hadoop.fs.FileSystem.class.isAssignableFrom(reflectedClass)) {
-						fsClass = reflectedClass.asSubclass(org.apache.hadoop.fs.FileSystem.class);
-					} else {
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Default HDFS class is of wrong type.");
-						}
-						
-						throw new IOException("The default HDFS class '" + DEFAULT_HDFS_CLASS + 
-							"' cannot be cast to a FileSystem type.");
-					}
-				}
-				catch (ClassNotFoundException e) {
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Default HDFS class cannot be loaded.");
-					}
-					
-					throw new IOException("No HDFS class has been configured and the default class '" +
-							DEFAULT_HDFS_CLASS + "' cannot be loaded.");
-				}
-			}
-		}
-		
-		this.fs = instantiateFileSystem(fsClass);
-	}
-	
-	/**
-	 * Returns a new Hadoop Configuration object using the path to the hadoop conf configured 
-	 * in the main configuration (flink-conf.yaml).
-	 * This method is public because its being used in the HadoopDataSource.
-	 */
-	public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
-		Configuration retConf = new org.apache.hadoop.conf.Configuration();
-
-		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
-		// the hdfs configuration
-		// Try to load HDFS configuration from Hadoop's own configuration files
-		// 1. approach: Flink configuration
-		final String hdfsDefaultPath = GlobalConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
-		if (hdfsDefaultPath != null) {
-			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
-		} else {
-			LOG.debug("Cannot find hdfs-default configuration file");
-		}
-
-		final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
-		if (hdfsSitePath != null) {
-			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
-		} else {
-			LOG.debug("Cannot find hdfs-site configuration file");
-		}
-		
-		// 2. Approach environment variables
-		String[] possibleHadoopConfPaths = new String[4]; 
-		possibleHadoopConfPaths[0] = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
-		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
-		
-		if (System.getenv("HADOOP_HOME") != null) {
-			possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf";
-			possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2
-		}
-
-		for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
-			if (possibleHadoopConfPath != null) {
-				if (new File(possibleHadoopConfPath).exists()) {
-					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
-						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
-
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
-						}
-					}
-					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
-						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
-
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
-						}
-					}
-				}
-			}
-		}
-		return retConf;
-	}
-	
-	private org.apache.hadoop.fs.FileSystem instantiateFileSystem(Class<? extends org.apache.hadoop.fs.FileSystem> fsClass)
-		throws IOException
-	{
-		try {
-			return fsClass.newInstance();
-		}
-		catch (ExceptionInInitializerError e) {
-			throw new IOException("The filesystem class '" + fsClass.getName() + "' throw an exception upon initialization.", e.getException());
-		}
-		catch (Throwable t) {
-			String errorMessage = InstantiationUtil.checkForInstantiationError(fsClass);
-			if (errorMessage != null) {
-				throw new IOException("The filesystem class '" + fsClass.getName() + "' cannot be instantiated: " + errorMessage);
-			} else {
-				throw new IOException("An error occurred while instantiating the filesystem class '" +
-						fsClass.getName() + "'.", t);
-			}
-		}
-	}
-
-
-	@Override
-	public Path getWorkingDirectory() {
-		return new Path(this.fs.getWorkingDirectory().toUri());
-	}
-
-	@Override
-	public URI getUri() {
-		return fs.getUri();
-	}
-
-	@Override
-	public void initialize(URI path) throws IOException {
-		
-		// For HDFS we have to have an authority
-		if (path.getAuthority() == null) {
-			
-			String configEntry = this.conf.get("fs.default.name", null);
-			if (configEntry == null) {
-				// fs.default.name deprecated as of hadoop 2.2.0 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
-				configEntry = this.conf.get("fs.defaultFS", null);
-			}
-			
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("fs.defaultFS is set to " + configEntry);
-			}
-			
-			if (configEntry == null) {
-				throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " +
-						"or that configuration did not contain an entry for the default hdfs.");
-			} else {
-				try {
-					URI initURI = URI.create(configEntry);
-					
-					if (initURI.getAuthority() == null) {
-						throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " +
-								"or the provided configuration contains no valid hdfs namenode address (fs.default.name or fs.defaultFS) describing the hdfs namenode host and port.");
-					} else if (!initURI.getScheme().equalsIgnoreCase("hdfs")) {
-						throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " +
-								"or the provided configuration describes a file system with scheme '" + initURI.getScheme() + "' other than the Hadoop Distributed File System (HDFS).");
-					} else {
-						try {
-							this.fs.initialize(initURI, this.conf);
-						}
-						catch (IOException e) {
-							throw new IOException(getMissingAuthorityErrorPrefix(path) +
-									"Could not initialize the file system connection with the given address of the HDFS NameNode: " + e.getMessage(), e);
-						}
-					}
-				}
-				catch (IllegalArgumentException e) {
-					throw new IOException(getMissingAuthorityErrorPrefix(path) +
-							"The configuration contains an invalid hdfs default name (fs.default.name or fs.defaultFS): " + configEntry);
-				}
-			} 
-		}
-		else {
-			// Initialize HDFS
-			try {
-				this.fs.initialize(path, this.conf);
-			}
-			catch (UnknownHostException e) {
-				String message = "The HDFS NameNode host at '" + path.getAuthority()
-						+ "', specified by file path '" + path.toString() + "', cannot be resolved"
-						+ (e.getMessage() != null ? ": " + e.getMessage() : ".");
-				
-				if (path.getPort() == -1) {
-					message += " Hint: Have you forgotten a slash? (correct URI would be 'hdfs:///" + path.getAuthority() + path.getPath() + "' ?)";
-				}
-				
-				throw new IOException(message, e);
-			}
-			catch (Exception e) {
-				throw new IOException("The given file URI (" + path.toString() + ") points to the HDFS NameNode at "
-						+ path.getAuthority() + ", but the File System could not be initialized with that address"
-					+ (e.getMessage() != null ? ": " + e.getMessage() : "."), e);
-			}
-		}
-	}
-	
-	private static String getMissingAuthorityErrorPrefix(URI path) {
-		return "The given HDFS file URI (" + path.toString() + ") did not describe the HDFS NameNode." +
-				" The attempt to use a default HDFS configuration, as specified in the '" + ConfigConstants.HDFS_DEFAULT_CONFIG + "' or '" + 
-				ConfigConstants.HDFS_SITE_CONFIG + "' config parameter failed due to the following problem: ";
-	}
-
-
-	@Override
-	public FileStatus getFileStatus(final Path f) throws IOException {
-		org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
-		return new DistributedFileStatus(status);
-	}
-
-	@Override
-	public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
-	throws IOException
-	{
-		if (!(file instanceof DistributedFileStatus)) {
-			throw new IOException("file is not an instance of DistributedFileStatus");
-		}
-
-		final DistributedFileStatus f = (DistributedFileStatus) file;
-
-		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
-			start, len);
-
-		// Wrap up HDFS specific block location objects
-		final DistributedBlockLocation[] distBlkLocations = new DistributedBlockLocation[blkLocations.length];
-		for (int i = 0; i < distBlkLocations.length; i++) {
-			distBlkLocations[i] = new DistributedBlockLocation(blkLocations[i]);
-		}
-
-		return distBlkLocations;
-	}
-
-	@Override
-	public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
-
-		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(new org.apache.hadoop.fs.Path(f.toString()),
-			bufferSize);
-
-		return new DistributedDataInputStream(fdis);
-	}
-
-	@Override
-	public FSDataInputStream open(final Path f) throws IOException {
-		final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(new org.apache.hadoop.fs.Path(f.toString()));
-		return new DistributedDataInputStream(fdis);
-	}
-
-	@Override
-	public FSDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
-			final short replication, final long blockSize)
-	throws IOException
-	{
-		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
-			new org.apache.hadoop.fs.Path(f.toString()), overwrite, bufferSize, replication, blockSize);
-		return new DistributedDataOutputStream(fdos);
-	}
-
-
-	@Override
-	public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
-		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
-			.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite);
-		return new DistributedDataOutputStream(fsDataOutputStream);
-	}
-
-	@Override
-	public boolean delete(final Path f, final boolean recursive) throws IOException {
-		return this.fs.delete(new org.apache.hadoop.fs.Path(f.toString()), recursive);
-	}
-
-	@Override
-	public FileStatus[] listStatus(final Path f) throws IOException {
-		final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(new org.apache.hadoop.fs.Path(f.toString()));
-		final FileStatus[] files = new FileStatus[hadoopFiles.length];
-
-		// Convert types
-		for (int i = 0; i < files.length; i++) {
-			files[i] = new DistributedFileStatus(hadoopFiles[i]);
-		}
-		
-		return files;
-	}
-
-	@Override
-	public boolean mkdirs(final Path f) throws IOException {
-		return this.fs.mkdirs(new org.apache.hadoop.fs.Path(f.toString()));
-	}
-
-	@Override
-	public boolean rename(final Path src, final Path dst) throws IOException {
-		return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()),
-			new org.apache.hadoop.fs.Path(dst.toString()));
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public long getDefaultBlockSize() {
-		return this.fs.getDefaultBlockSize();
-	}
-
-	@Override
-	public boolean isDistributedFS() {
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
new file mode 100644
index 0000000..a1cc72c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.flink.core.fs.BlockLocation;
+
+/**
+ * Implementation of the {@link BlockLocation} interface for the
+ * Hadoop Distributed File System.
+ * 
+ */
+public final class HadoopBlockLocation implements BlockLocation {
+
+	/**
+	 * Specifies the character separating the hostname from the domain name.
+	 */
+	private static final char DOMAIN_SEPARATOR = '.';
+
+	/**
+	 * Regular expression for an IPv4 address.
+	 */
+	private static final Pattern IPV4_PATTERN = Pattern.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+$");
+
+	/**
+	 * The original Hadoop block location object.
+	 */
+	private final org.apache.hadoop.fs.BlockLocation blockLocation;
+
+	/**
+	 * Stores the hostnames without the domain suffix.
+	 */
+	private String[] hostnames;
+
+	/**
+	 * Creates a new block location
+	 * 
+	 * @param blockLocation
+	 *        the original HDFS block location
+	 */
+	public HadoopBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
+
+		this.blockLocation = blockLocation;
+	}
+
+
+	@Override
+	public String[] getHosts() throws IOException {
+
+		/**
+		 * Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
+		 * the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
+		 * depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
+		 * sure it does not contain the domain suffix.
+		 */
+		if (this.hostnames == null) {
+
+			final String[] hadoopHostnames = blockLocation.getHosts();
+			this.hostnames = new String[hadoopHostnames.length];
+
+			for (int i = 0; i < hadoopHostnames.length; ++i) {
+				this.hostnames[i] = stripHostname(hadoopHostnames[i]);
+			}
+		}
+
+		return this.hostnames;
+	}
+
+	/**
+	 * Looks for a domain suffix in a FQDN and strips it if present.
+	 * 
+	 * @param originalHostname
+	 *        the original hostname, possibly an FQDN
+	 * @return the stripped hostname without the domain suffix
+	 */
+	private static String stripHostname(final String originalHostname) {
+
+		// Check if the hostname domains the domain separator character
+		final int index = originalHostname.indexOf(DOMAIN_SEPARATOR);
+		if (index == -1) {
+			return originalHostname;
+		}
+
+		// Make sure we are not stripping an IPv4 address
+		final Matcher matcher = IPV4_PATTERN.matcher(originalHostname);
+		if (matcher.matches()) {
+			return originalHostname;
+		}
+
+		if (index == 0) {
+			throw new IllegalStateException("Hostname " + originalHostname + " starts with a " + DOMAIN_SEPARATOR);
+		}
+
+		return originalHostname.substring(0, index);
+	}
+
+
+	@Override
+	public long getLength() {
+
+		return this.blockLocation.getLength();
+	}
+
+
+	@Override
+	public long getOffset() {
+
+		return this.blockLocation.getOffset();
+	}
+
+
+	@Override
+	public int compareTo(final BlockLocation o) {
+
+		final long diff = getOffset() - o.getOffset();
+
+		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76343101/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
new file mode 100644
index 0000000..9a606fd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+/**
+ * Concrete implementation of the {@link FSDataInputStream} for the
+ * Hadoop Distributed File System.
+ * 
+ */
+public final class HadoopDataInputStream extends FSDataInputStream {
+
+	private org.apache.hadoop.fs.FSDataInputStream fsDataInputStream = null;
+
+	/**
+	 * Creates a new data input stream from the given HDFS input stream
+	 * 
+	 * @param fsDataInputStream
+	 *        the HDFS input stream
+	 */
+	public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
+		this.fsDataInputStream = fsDataInputStream;
+	}
+
+	@Override
+	public synchronized void seek(long desired) throws IOException {
+
+		fsDataInputStream.seek(desired);
+	}
+
+	@Override
+	public int read() throws IOException {
+
+		return fsDataInputStream.read();
+	}
+
+	@Override
+	public void close() throws IOException {
+
+		fsDataInputStream.close();
+	}
+
+	@Override
+	public int read(byte[] buffer, int offset, int length) throws IOException {
+
+		return fsDataInputStream.read(buffer, offset, length);
+	}
+
+
+	@Override
+	public int available() throws IOException {
+		return fsDataInputStream.available();
+	}
+
+
+	@Override
+	public long skip(long n) throws IOException {
+		return fsDataInputStream.skip(n);
+	}
+
+}


Mime
View raw message