flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-7768] [core] Load File Systems via Java Service abstraction
Date Fri, 06 Oct 2017 17:54:04 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeFsFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeFsFactoryTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeFsFactoryTest.java
new file mode 100644
index 0000000..04bc917
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeFsFactoryTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URLClassLoader;
+
+/**
+ * Tests that validate the behavior of the Hadoop File System Factory.
+ */
+public class HadoopFreeFsFactoryTest extends TestLogger {
+
+	/**
+	 * This test validates that the factory can be instantiated and configured even
+	 * when Hadoop classes are missing from the classpath.
+	 */
+	@Test
+	public void testHadoopFactoryInstantiationWithoutHadoop() throws Exception {
+		// we do reflection magic here to instantiate the test in another class
+		// loader, to make sure no hadoop classes are in the classpath
+
+		final String testClassName = "org.apache.flink.runtime.fs.hdfs.HadoopFreeTests";
+
+		URLClassLoader parent = (URLClassLoader) getClass().getClassLoader();
+		ClassLoader hadoopFreeClassLoader = new HadoopFreeClassLoader(parent);
+		Class<?> testClass = Class.forName(testClassName, false, hadoopFreeClassLoader);
+		Method m = testClass.getDeclaredMethod("test");
+
+		try {
+			m.invoke(null);
+		}
+		catch (InvocationTargetException e) {
+			ExceptionUtils.rethrowException(e.getTargetException(), "exception in method");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class HadoopFreeClassLoader extends URLClassLoader {
+
+		private final ClassLoader properParent;
+
+		HadoopFreeClassLoader(URLClassLoader parent) {
+			super(parent.getURLs(), null);
+			properParent = parent;
+		}
+
+		@Override
+		public Class<?> loadClass(String name) throws ClassNotFoundException {
+			if (name.startsWith("org.apache.hadoop")) {
+				throw new ClassNotFoundException(name);
+			}
+			else if (name.startsWith("org.apache.log4j")) {
+				return properParent.loadClass(name);
+			}
+			else {
+				return super.loadClass(name);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeTests.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeTests.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeTests.java
new file mode 100644
index 0000000..e78fe8d
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeTests.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
+
+import java.net.URI;
+
+import static org.junit.Assert.fail;
+
+/**
+ * A class with tests that require to be run in a Hadoop-free environment, to test
+ * proper error handling when no Hadoop classes are available.
+ *
+ * <p>This class must be dynamically loaded in a Hadoop-free class loader.
+ */
+// this class is only instantiated via reflection
+@SuppressWarnings("unused")
+public class HadoopFreeTests {
+
+	public static void test() throws Exception {
+		// make sure no Hadoop FS classes are in the classpath
+		try {
+			Class.forName("org.apache.hadoop.fs.FileSystem");
+			fail("Cannot run test when Hadoop classes are in the classpath");
+		}
+		catch (ClassNotFoundException ignored) {}
+
+		try {
+			Class.forName("org.apache.hadoop.conf.Configuration");
+			fail("Cannot run test when Hadoop classes are in the classpath");
+		}
+		catch (ClassNotFoundException ignored) {}
+
+		// this method should complete without a linkage error
+		final HadoopFsFactory factory = new HadoopFsFactory();
+
+		// this method should also complete without a linkage error
+		factory.configure(new Configuration());
+
+		try {
+			factory.create(new URI("hdfs://somehost:9000/root/dir"));
+			fail("This statement should fail with an exception");
+		}
+		catch (UnsupportedFileSystemSchemeException e) {
+			// expected
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
new file mode 100644
index 0000000..1f5c932
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that validate the behavior of the Hadoop File System Factory.
+ */
+public class HadoopFsFactoryTest extends TestLogger {
+
+	@Test
+	public void testCreateHadoopFsWithoutConfig() throws Exception {
+		final URI uri = URI.create("hdfs://localhost:12345/");
+
+		HadoopFsFactory factory = new HadoopFsFactory();
+		HadoopFileSystem fs = factory.create(uri);
+
+		assertEquals(uri.getScheme(), fs.getUri().getScheme());
+		assertEquals(uri.getAuthority(), fs.getUri().getAuthority());
+		assertEquals(uri.getPort(), fs.getUri().getPort());
+	}
+
+	@Test
+	public void testCreateHadoopFsWithMissingAuthority() throws Exception {
+		final URI uri = URI.create("hdfs:///my/path");
+
+		HadoopFsFactory factory = new HadoopFsFactory();
+
+		try {
+			factory.create(uri);
+			fail("should have failed with an exception");
+		}
+		catch (IOException e) {
+			assertTrue(e.getMessage().contains("authority"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/resources/core-site.xml b/flink-filesystems/flink-hadoop-fs/src/test/resources/core-site.xml
new file mode 100644
index 0000000..56c7d55
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/resources/core-site.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- Values used when running unit tests.  Specify any values in here that
+     should override the default values. -->
+
+<configuration>
+    <property>
+        <name>cp_conf_key</name>
+        <value>oompf!</value>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/resources/log4j-test.properties b/flink-filesystems/flink-hadoop-fs/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2be3589
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/pom.xml b/flink-filesystems/flink-mapr-fs/pom.xml
new file mode 100644
index 0000000..12bb9bc
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-filesystems</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-mapr-fs</artifactId>
+	<name>flink-mapr-fs</name>
+
+	<packaging>jar</packaging>
+
+	<repositories>
+		<repository>
+			<id>mapr-releases</id>
+			<url>http://repository.mapr.com/maven/</url>
+			<snapshots><enabled>false</enabled></snapshots>
+			<releases><enabled>true</enabled></releases>
+		</repository>
+	</repositories>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-fs</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<!-- MapR dependencies as optional dependency, so we can hard depend on this without -->
+		<!-- pulling in MapR libraries by default -->
+
+		<dependency>
+			<groupId>com.mapr.hadoop</groupId>
+			<artifactId>maprfs</artifactId>
+			<version>5.2.1-mapr</version>
+			<optional>true</optional>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
new file mode 100644
index 0000000..058772c
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A MapR file system client for Flink.
+ *
+ * <p>Internally, this class wraps the {@link org.apache.hadoop.fs.FileSystem} implementation
+ * of the MapR file system client.
+ */
+public class MapRFileSystem extends HadoopFileSystem {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MapRFileSystem.class);
+
+	/** Name of the environment variable to determine the location of the MapR
+	 * installation. */
+	private static final String MAPR_HOME_ENV = "MAPR_HOME";
+
+	/** The default location of the MapR installation. */
+	private static final String DEFAULT_MAPR_HOME = "/opt/mapr/";
+
+	/** The path relative to the MAPR_HOME where MapR stores how to access the
+	 * configured clusters. */
+	private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf";
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a MapRFileSystem for the given URI.
+	 *
+	 * @param fsUri The URI describing the file system
+	 * @throws IOException Thrown if the file system could not be initialized.
+	 */
+	public MapRFileSystem(URI fsUri) throws IOException {
+		super(instantiateMapRFileSystem(fsUri));
+	}
+
+	private static org.apache.hadoop.fs.FileSystem instantiateMapRFileSystem(URI fsUri) throws IOException {
+		checkNotNull(fsUri, "fsUri");
+
+		final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+		final com.mapr.fs.MapRFileSystem fs;
+
+		final String authority = fsUri.getAuthority();
+		if (authority == null || authority.isEmpty()) {
+
+			// Use the default constructor to instantiate MapR file system object
+			fs = new com.mapr.fs.MapRFileSystem();
+		}
+		else {
+			// We have an authority, check the MapR cluster configuration to
+			// find the CLDB locations.
+			final String[] cldbLocations = getCLDBLocations(authority);
+			fs = new com.mapr.fs.MapRFileSystem(authority, cldbLocations);
+		}
+
+		// now initialize the Hadoop File System object
+		fs.initialize(fsUri, conf);
+
+		return fs;
+	}
+
+	/**
+	 * Retrieves the CLDB locations for the given MapR cluster name.
+	 *
+	 * @param authority
+	 *            the name of the MapR cluster
+	 * @return a list of CLDB locations
+	 * @throws IOException
+	 *             thrown if the CLDB locations for the given MapR cluster name
+	 *             cannot be determined
+	 */
+	private static String[] getCLDBLocations(String authority) throws IOException {
+
+		// Determine the MapR home
+		String maprHome = System.getenv(MAPR_HOME_ENV);
+		if (maprHome == null) {
+			maprHome = DEFAULT_MAPR_HOME;
+		}
+
+		final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(String.format(
+					"Trying to retrieve MapR cluster configuration from %s",
+					maprClusterConf));
+		}
+
+		if (!maprClusterConf.exists()) {
+			throw new IOException("Could not find CLDB configuration '" + maprClusterConf.getAbsolutePath() +
+					"', assuming MapR home is '" + maprHome + "'.");
+		}
+
+		// Read the cluster configuration file, format is specified at
+		// http://doc.mapr.com/display/MapR/mapr-clusters.conf
+
+		try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) {
+
+			String line;
+			while ((line = br.readLine()) != null) {
+
+				// Normalize the string
+				line = line.trim();
+				line = line.replace('\t', ' ');
+
+				final String[] fields = line.split(" ");
+				if (fields.length < 1) {
+					continue;
+				}
+
+				final String clusterName = fields[0];
+
+				if (!clusterName.equals(authority)) {
+					continue;
+				}
+
+				final List<String> cldbLocations = new ArrayList<>();
+
+				for (int i = 1; i < fields.length; ++i) {
+
+					// Make sure this is not a key-value pair MapR recently
+					// introduced in the file format along with their security
+					// features.
+					if (!fields[i].isEmpty() && !fields[i].contains("=")) {
+						cldbLocations.add(fields[i]);
+					}
+				}
+
+				if (cldbLocations.isEmpty()) {
+					throw new IOException(
+							String.format(
+									"%s contains entry for cluster %s but no CLDB locations.",
+									maprClusterConf, authority));
+				}
+
+				return cldbLocations.toArray(new String[cldbLocations.size()]);
+			}
+
+		}
+
+		throw new IOException(String.format(
+				"Unable to find CLDB locations for cluster %s", authority));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
new file mode 100644
index 0000000..e163f63
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory for the MapR file system.
+ *
+ * <p>This factory tries to reflectively instantiate the MapR file system. It can only be
+ * used when the MapR FS libraries are in the classpath.
+ */
+public class MapRFsFactory implements FileSystemFactory {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MapRFsFactory.class);
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String getScheme() {
+		return "maprfs";
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		// nothing to configure based on the configuration here
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		checkNotNull(fsUri, "fsUri");
+
+		try {
+			LOG.info("Trying to load and instantiate MapR File System");
+
+			return new MapRFileSystem(fsUri);
+		}
+		catch (LinkageError e) {
+			throw new IOException("Could not load MapR file system. "  +
+					"Please make sure the Flink runtime classes are part of the classpath or dependencies.", e);
+		}
+		catch (IOException e) {
+			throw e;
+		}
+		catch (Throwable t) {
+			throw new IOException("Could not instantiate MapR file system.", t);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-filesystems/flink-mapr-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..ffc7bcf
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.fs.maprfs.MapRFsFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java
new file mode 100644
index 0000000..aee50ba
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test checks that the file system is properly accessible through the
+ * service loading abstraction.
+ */
+public class FileSystemAccessTest extends TestLogger {
+
+	@Test
+	public void testGetMapRFs() throws Exception {
+		final Path path = new Path("maprfs:///my/path");
+
+		FileSystem fs = path.getFileSystem();
+		assertEquals(path.toUri().getScheme(), fs.getUri().getScheme());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java
new file mode 100644
index 0000000..110fce3
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * A class with tests that require to be run in a MapR/Hadoop-free environment,
+ * to test proper error handling when no Hadoop classes are available.
+ *
+ * <p>This class must be dynamically loaded in a MapR/Hadoop-free class loader.
+ */
+// this class is only instantiated via reflection
+@SuppressWarnings("unused")
+public class MapRFreeTests {
+
+	public static void test() throws Exception {
+		// make sure no MapR or Hadoop FS classes are in the classpath
+		try {
+			Class.forName("com.mapr.fs.MapRFileSystem");
+			fail("Cannot run test when MapR classes are in the classpath");
+		}
+		catch (ClassNotFoundException ignored) {}
+
+		try {
+			Class.forName("org.apache.hadoop.fs.FileSystem");
+			fail("Cannot run test when Hadoop classes are in the classpath");
+		}
+		catch (ClassNotFoundException ignored) {}
+
+		try {
+			Class.forName("org.apache.hadoop.conf.Configuration");
+			fail("Cannot run test when Hadoop classes are in the classpath");
+		}
+		catch (ClassNotFoundException ignored) {}
+
+		// this method should complete without a linkage error
+		final MapRFsFactory factory = new MapRFsFactory();
+
+		// this method should also complete without a linkage error
+		factory.configure(new Configuration());
+
+		try {
+			factory.create(new URI("maprfs://somehost:9000/root/dir"));
+			fail("This statement should fail with an exception");
+		}
+		catch (IOException e) {
+			assertTrue(e.getMessage().contains("MapR"));
+			assertTrue(e.getMessage().contains("classpath"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java
new file mode 100644
index 0000000..984668f
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the MapRFsFactory.
+ */
+public class MapRFsFactoryTest extends TestLogger {
+
+	/**
+	 * This test validates that the factory can be instantiated and configured even
+	 * when MapR and Hadoop classes are missing from the classpath.
+	 */
+	@Test
+	public void testInstantiationWithoutMapRClasses() throws Exception {
+		// we do reflection magic here to instantiate the test in another class
+		// loader, to make sure no MapR and Hadoop classes are in the classpath
+
+		final String testClassName = "org.apache.flink.runtime.fs.maprfs.MapRFreeTests";
+
+		URLClassLoader parent = (URLClassLoader) getClass().getClassLoader();
+		ClassLoader maprFreeClassLoader = new MapRFreeClassLoader(parent);
+		Class<?> testClass = Class.forName(testClassName, false, maprFreeClassLoader);
+		Method m = testClass.getDeclaredMethod("test");
+
+		try {
+			m.invoke(null);
+		}
+		catch (InvocationTargetException e) {
+			ExceptionUtils.rethrowException(e.getTargetException(), "exception in method");
+		}
+	}
+
+	@Test
+	public void testCreateFsWithAuthority() throws Exception {
+		final URI uri = URI.create("maprfs://localhost:12345/");
+
+		MapRFsFactory factory = new MapRFsFactory();
+
+		try {
+			factory.create(uri);
+			fail("should have failed with an exception");
+		}
+		catch (IOException e) {
+			// expected, because we have no CLDB config available
+		}
+	}
+
+	@Test
+	public void testCreateFsWithMissingAuthority() throws Exception {
+		final URI uri = URI.create("maprfs:///my/path");
+
+		MapRFsFactory factory = new MapRFsFactory();
+		factory.configure(new Configuration());
+
+		FileSystem fs = factory.create(uri);
+		assertEquals("maprfs", fs.getUri().getScheme());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class MapRFreeClassLoader extends URLClassLoader {
+
+		private final ClassLoader properParent;
+
+		MapRFreeClassLoader(URLClassLoader parent) {
+			super(parent.getURLs(), null);
+			properParent = parent;
+		}
+
+		@Override
+		public Class<?> loadClass(String name) throws ClassNotFoundException {
+			if (name.startsWith("com.mapr") || name.startsWith("org.apache.hadoop")) {
+				throw new ClassNotFoundException(name);
+			}
+			else if (name.startsWith("org.apache.log4j")) {
+				return properParent.loadClass(name);
+			}
+			else {
+				return super.loadClass(name);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/test/resources/log4j-test.properties b/flink-filesystems/flink-mapr-fs/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2be3589
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/pom.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/pom.xml b/flink-filesystems/pom.xml
new file mode 100644
index 0000000..b0199da
--- /dev/null
+++ b/flink-filesystems/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+
+	<artifactId>flink-filesystems</artifactId>
+	<name>flink-filesystems</name>
+	<packaging>pom</packaging>
+
+	<modules>
+		<module>flink-hadoop-fs</module>
+		<module>flink-mapr-fs</module>
+	</modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 3ed7e70..71ebfb7 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -50,6 +50,17 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<!-- The Hadoop FS support has only an optional dependency on Hadoop and
+			gracefully handles absence of Hadoop classes -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-fs</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<!-- optional dependency on Hadoop, so that Hadoop classes are not always pulled in -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-shaded-hadoop2</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
deleted file mode 100644
index 1484c95..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.BlockLocation;
-
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Implementation of the {@link BlockLocation} interface for the
- * Hadoop Distributed File System.
- */
-public final class HadoopBlockLocation implements BlockLocation {
-
-	/**
-	 * Specifies the character separating the hostname from the domain name.
-	 */
-	private static final char DOMAIN_SEPARATOR = '.';
-
-	/**
-	 * Regular expression for an IPv4 address.
-	 */
-	private static final Pattern IPV4_PATTERN = Pattern.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+$");
-
-	/**
-	 * The original Hadoop block location object.
-	 */
-	private final org.apache.hadoop.fs.BlockLocation blockLocation;
-
-	/**
-	 * Stores the hostnames without the domain suffix.
-	 */
-	private String[] hostnames;
-
-	/**
-	 * Creates a new block location.
-	 *
-	 * @param blockLocation
-	 *        the original HDFS block location
-	 */
-	public HadoopBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
-
-		this.blockLocation = blockLocation;
-	}
-
-	@Override
-	public String[] getHosts() throws IOException {
-
-		/**
-		 * Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
-		 * the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
-		 * depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
-		 * sure it does not contain the domain suffix.
-		 */
-		if (this.hostnames == null) {
-
-			final String[] hadoopHostnames = blockLocation.getHosts();
-			this.hostnames = new String[hadoopHostnames.length];
-
-			for (int i = 0; i < hadoopHostnames.length; ++i) {
-				this.hostnames[i] = stripHostname(hadoopHostnames[i]);
-			}
-		}
-
-		return this.hostnames;
-	}
-
-	/**
-	 * Looks for a domain suffix in a FQDN and strips it if present.
-	 *
-	 * @param originalHostname
-	 *        the original hostname, possibly an FQDN
-	 * @return the stripped hostname without the domain suffix
-	 */
-	private static String stripHostname(final String originalHostname) {
-
-		// Check if the hostname domains the domain separator character
-		final int index = originalHostname.indexOf(DOMAIN_SEPARATOR);
-		if (index == -1) {
-			return originalHostname;
-		}
-
-		// Make sure we are not stripping an IPv4 address
-		final Matcher matcher = IPV4_PATTERN.matcher(originalHostname);
-		if (matcher.matches()) {
-			return originalHostname;
-		}
-
-		if (index == 0) {
-			throw new IllegalStateException("Hostname " + originalHostname + " starts with a " + DOMAIN_SEPARATOR);
-		}
-
-		return originalHostname.substring(0, index);
-	}
-
-	@Override
-	public long getLength() {
-
-		return this.blockLocation.getLength();
-	}
-
-	@Override
-	public long getOffset() {
-
-		return this.blockLocation.getOffset();
-	}
-
-	@Override
-	public int compareTo(final BlockLocation o) {
-
-		final long diff = getOffset() - o.getOffset();
-
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
deleted file mode 100644
index da50c4c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Concrete implementation of the {@link FSDataInputStream} for Hadoop's input streams.
- * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
- */
-public final class HadoopDataInputStream extends FSDataInputStream {
-
-	/**
-	 * Minimum amount of bytes to skip forward before we issue a seek instead of discarding read.
-	 *
-	 * <p>The current value is just a magic number. In the long run, this value could become configurable, but for now it
-	 * is a conservative, relatively small value that should bring safe improvements for small skips (e.g. in reading
-	 * meta data), that would hurt the most with frequent seeks.
-	 *
-	 * <p>The optimal value depends on the DFS implementation and configuration plus the underlying filesystem.
-	 * For now, this number is chosen "big enough" to provide improvements for smaller seeks, and "small enough" to
-	 * avoid disadvantages over real seeks. While the minimum should be the page size, a true optimum per system would
-	 * be the amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, seektime is not
-	 * constant and devices, OS, and DFS potentially also use read buffers and read-ahead.
-	 */
-	public static final int MIN_SKIP_BYTES = 1024 * 1024;
-
-	/** The internal stream. */
-	private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
-
-	/**
-	 * Creates a new data input stream from the given Hadoop input stream.
-	 *
-	 * @param fsDataInputStream The Hadoop input stream
-	 */
-	public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
-		this.fsDataInputStream = checkNotNull(fsDataInputStream);
-	}
-
-	@Override
-	public void seek(long seekPos) throws IOException {
-		// We do some optimizations to avoid that some implementations of distributed FS perform
-		// expensive seeks when they are actually not needed.
-		long delta = seekPos - getPos();
-
-		if (delta > 0L && delta <= MIN_SKIP_BYTES) {
-			// Instead of a small forward seek, we skip over the gap
-			skipFully(delta);
-		} else if (delta != 0L) {
-			// For larger gaps and backward seeks, we do a real seek
-			forceSeek(seekPos);
-		} // Do nothing if delta is zero.
-	}
-
-	@Override
-	public long getPos() throws IOException {
-		return fsDataInputStream.getPos();
-	}
-
-	@Override
-	public int read() throws IOException {
-		return fsDataInputStream.read();
-	}
-
-	@Override
-	public void close() throws IOException {
-		fsDataInputStream.close();
-	}
-
-	@Override
-	public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
-		return fsDataInputStream.read(buffer, offset, length);
-	}
-
-	@Override
-	public int available() throws IOException {
-		return fsDataInputStream.available();
-	}
-
-	@Override
-	public long skip(long n) throws IOException {
-		return fsDataInputStream.skip(n);
-	}
-
-	/**
-	 * Gets the wrapped Hadoop input stream.
-	 * @return The wrapped Hadoop input stream.
-	 */
-	public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream() {
-		return fsDataInputStream;
-	}
-
-	/**
-	 * Positions the stream to the given location. In contrast to {@link #seek(long)}, this method will
-	 * always issue a "seek" command to the dfs and may not replace it by {@link #skip(long)} for small seeks.
-	 *
-	 * <p>Notice that the underlying DFS implementation can still decide to do skip instead of seek.
-	 *
-	 * @param seekPos the position to seek to.
-	 * @throws IOException
-	 */
-	public void forceSeek(long seekPos) throws IOException {
-		fsDataInputStream.seek(seekPos);
-	}
-
-	/**
-	 * Skips over a given amount of bytes in the stream.
-	 *
-	 * @param bytes the number of bytes to skip.
-	 * @throws IOException
-	 */
-	public void skipFully(long bytes) throws IOException {
-		while (bytes > 0) {
-			bytes -= fsDataInputStream.skip(bytes);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
deleted file mode 100644
index 1b8d1a3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.FSDataOutputStream;
-
-import java.io.IOException;
-
-/**
- * Concrete implementation of the {@link FSDataOutputStream} for Hadoop's input streams.
- * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
- */
-public class HadoopDataOutputStream extends FSDataOutputStream {
-
-	private final org.apache.hadoop.fs.FSDataOutputStream fdos;
-
-	public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos) {
-		if (fdos == null) {
-			throw new NullPointerException();
-		}
-		this.fdos = fdos;
-	}
-
-	@Override
-	public void write(int b) throws IOException {
-		fdos.write(b);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		fdos.write(b, off, len);
-	}
-
-	@Override
-	public void close() throws IOException {
-		fdos.close();
-	}
-
-	@Override
-	public long getPos() throws IOException {
-		return fdos.getPos();
-	}
-
-	@Override
-	public void flush() throws IOException {
-		fdos.hflush();
-	}
-
-	@Override
-	public void sync() throws IOException {
-		fdos.hsync();
-	}
-
-	/**
-	 * Gets the wrapped Hadoop output stream.
-	 * @return The wrapped Hadoop output stream.
-	 */
-	public org.apache.hadoop.fs.FSDataOutputStream getHadoopOutputStream() {
-		return fdos;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
deleted file mode 100644
index 17bb334..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.Path;
-
-/**
- * Concrete implementation of the {@link FileStatus} interface for the
- * Hadoop Distribution File System.
- */
-public final class HadoopFileStatus implements FileStatus {
-
-	private org.apache.hadoop.fs.FileStatus fileStatus;
-
-	/**
-	 * Creates a new file status from a HDFS file status.
-	 *
-	 * @param fileStatus
-	 *        the HDFS file status
-	 */
-	public HadoopFileStatus(org.apache.hadoop.fs.FileStatus fileStatus) {
-		this.fileStatus = fileStatus;
-	}
-
-	@Override
-	public long getLen() {
-		return fileStatus.getLen();
-	}
-
-	@Override
-	public long getBlockSize() {
-		long blocksize = fileStatus.getBlockSize();
-		if (blocksize > fileStatus.getLen()) {
-			return fileStatus.getLen();
-		}
-
-		return blocksize;
-	}
-
-	@Override
-	public long getAccessTime() {
-		return fileStatus.getAccessTime();
-	}
-
-	@Override
-	public long getModificationTime() {
-		return fileStatus.getModificationTime();
-	}
-
-	@Override
-	public short getReplication() {
-		return fileStatus.getReplication();
-	}
-
-	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
-		return this.fileStatus;
-	}
-
-	@Override
-	public Path getPath() {
-		return new Path(fileStatus.getPath().toString());
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public boolean isDir() {
-		return fileStatus.isDir();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
deleted file mode 100644
index 4ebf4bc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-
-import java.io.IOException;
-import java.net.URI;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link FileSystem} that wraps an {@link org.apache.hadoop.fs.FileSystem Hadoop File System}.
- */
-public final class HadoopFileSystem extends FileSystem {
-
-	/** The wrapped Hadoop File System. */
-	private final org.apache.hadoop.fs.FileSystem fs;
-
-	/**
-	 * Wraps the given Hadoop File System object as a Flink File System object.
-	 * The given Hadoop file system object is expected to be initialized already.
-	 *
-	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
-	 */
-	public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
-		this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
-	}
-
-	/**
-	 * Gets the underlying Hadoop FileSystem.
-	 * @return The underlying Hadoop FileSystem.
-	 */
-	public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
-		return this.fs;
-	}
-
-	// ------------------------------------------------------------------------
-	//  file system methods
-	// ------------------------------------------------------------------------
-
-	@Override
-	public Path getWorkingDirectory() {
-		return new Path(this.fs.getWorkingDirectory().toUri());
-	}
-
-	public Path getHomeDirectory() {
-		return new Path(this.fs.getHomeDirectory().toUri());
-	}
-
-	@Override
-	public URI getUri() {
-		return fs.getUri();
-	}
-
-	@Override
-	public FileStatus getFileStatus(final Path f) throws IOException {
-		org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
-		return new HadoopFileStatus(status);
-	}
-
-	@Override
-	public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
-			throws IOException {
-		if (!(file instanceof HadoopFileStatus)) {
-			throw new IOException("file is not an instance of DistributedFileStatus");
-		}
-
-		final HadoopFileStatus f = (HadoopFileStatus) file;
-
-		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
-			start, len);
-
-		// Wrap up HDFS specific block location objects
-		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
-		for (int i = 0; i < distBlkLocations.length; i++) {
-			distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
-		}
-
-		return distBlkLocations;
-	}
-
-	@Override
-	public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
-		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
-		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
-		return new HadoopDataInputStream(fdis);
-	}
-
-	@Override
-	public HadoopDataInputStream open(final Path f) throws IOException {
-		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
-		final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
-		return new HadoopDataInputStream(fdis);
-	}
-
-	@Override
-	@SuppressWarnings("deprecation")
-	public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
-			final short replication, final long blockSize) throws IOException {
-		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
-			new org.apache.hadoop.fs.Path(f.toString()), overwrite, bufferSize, replication, blockSize);
-		return new HadoopDataOutputStream(fdos);
-	}
-
-	@Override
-	public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
-		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
-			.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE);
-		return new HadoopDataOutputStream(fsDataOutputStream);
-	}
-
-	@Override
-	public boolean delete(final Path f, final boolean recursive) throws IOException {
-		return this.fs.delete(new org.apache.hadoop.fs.Path(f.toString()), recursive);
-	}
-
-	@Override
-	public FileStatus[] listStatus(final Path f) throws IOException {
-		final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(new org.apache.hadoop.fs.Path(f.toString()));
-		final FileStatus[] files = new FileStatus[hadoopFiles.length];
-
-		// Convert types
-		for (int i = 0; i < files.length; i++) {
-			files[i] = new HadoopFileStatus(hadoopFiles[i]);
-		}
-
-		return files;
-	}
-
-	@Override
-	public boolean mkdirs(final Path f) throws IOException {
-		return this.fs.mkdirs(new org.apache.hadoop.fs.Path(f.toString()));
-	}
-
-	@Override
-	public boolean rename(final Path src, final Path dst) throws IOException {
-		return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()),
-			new org.apache.hadoop.fs.Path(dst.toString()));
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public long getDefaultBlockSize() {
-		return this.fs.getDefaultBlockSize();
-	}
-
-	@Override
-	public boolean isDistributedFS() {
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
deleted file mode 100644
index d3b1b89..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
-import org.apache.flink.runtime.util.HadoopUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.UnknownHostException;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A file system factory for Hadoop-based file systems.
- *
- * <p>This factory calls Hadoop's mechanism to find a file system implementation for a given file
- * system scheme (a {@link org.apache.hadoop.fs.FileSystem}) and wraps it as a Flink file system
- * (a {@link org.apache.flink.core.fs.FileSystem}).
- */
-public class HadoopFsFactory implements FileSystemFactory {
-
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopFsFactory.class);
-
-	/** Hadoop's configuration for the file systems. */
-	private org.apache.hadoop.conf.Configuration hadoopConfig;
-
-	@Override
-	public void configure(Configuration config) {
-		hadoopConfig = HadoopUtils.getHadoopConfiguration(config);
-	}
-
-	@Override
-	public FileSystem create(URI fsUri) throws IOException {
-		checkNotNull(fsUri, "fsUri");
-
-		final String scheme = fsUri.getScheme();
-		checkArgument(scheme != null, "file system has null scheme");
-
-		// -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
-
-		final org.apache.hadoop.conf.Configuration hadoopConfig;
-		if (this.hadoopConfig != null) {
-			hadoopConfig = this.hadoopConfig;
-		}
-		else {
-			LOG.warn("Hadoop configuration has not been explicitly initialized prior to loading a Hadoop file system."
-					+ " Using configuration from the classpath.");
-
-			hadoopConfig = new org.apache.hadoop.conf.Configuration();
-		}
-
-		// -- (2) create the proper URI to initialize the file system
-
-		final URI initUri;
-		if (fsUri.getAuthority() != null) {
-			initUri = fsUri;
-		}
-		else {
-			LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)");
-
-			String configEntry = hadoopConfig.get("fs.defaultFS", null);
-			if (configEntry == null) {
-				// fs.default.name deprecated as of hadoop 2.2.0 - see
-				// http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
-				configEntry = hadoopConfig.get("fs.default.name", null);
-			}
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
-			}
-
-			if (configEntry == null) {
-				throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
-						"Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').");
-			}
-			else {
-				try {
-					initUri = URI.create(configEntry);
-				}
-				catch (IllegalArgumentException e) {
-					throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
-							"The configuration contains an invalid file system default name " +
-							"('fs.default.name' or 'fs.defaultFS'): " + configEntry);
-				}
-
-				if (initUri.getAuthority() == null) {
-					throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
-							"Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " +
-							"contains no valid authority component (like hdfs namenode, S3 host, etc)");
-				}
-			}
-		}
-
-		// -- (3) get the Hadoop file system class for that scheme
-
-		final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
-		try {
-			fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig);
-		}
-		catch (IOException e) {
-			throw new UnsupportedFileSystemSchemeException(
-					"Hadoop File System abstraction does not support scheme '" + scheme + "'. " +
-							"Either no file system implementation exists for that scheme, " +
-							"or the relevant classes are missing from the classpath.", e);
-		}
-
-		LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName());
-
-		// -- (4) instantiate the Hadoop file system
-
-		final org.apache.hadoop.fs.FileSystem hadoopFs;
-		try {
-			hadoopFs = fsClass.newInstance();
-		}
-		catch (Exception e) {
-			throw new IOException("The Hadoop file system class '" + fsClass.getName() + "' cannot be instantiated.", e);
-		}
-
-		// -- (5) configure the Hadoop file system
-
-		try {
-			hadoopFs.initialize(initUri, hadoopConfig);
-		}
-		catch (UnknownHostException e) {
-			String message = "The Hadoop file system's authority (" + initUri.getAuthority() +
-					"), specified by either the file URI or the configuration, cannot be resolved.";
-
-			throw new IOException(message, e);
-		}
-		catch (Exception e) {
-			throw new IOException("Hadoop file system " + fsClass.getName() + " for scheme '" + scheme +
-					"' could not ne initialized.", e);
-		}
-
-		return new HadoopFileSystem(hadoopFs);
-	}
-
-	private static String getMissingAuthorityErrorPrefix(URI fsURI) {
-		return "The given file system URI (" + fsURI.toString() + ") did not describe the authority " +
-				"(like for example HDFS NameNode address/port or S3 host). " +
-				"The attempt to use a configured default authority failed: ";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
deleted file mode 100644
index 9e12f96..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.maprfs;
-
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
-import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
-import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Concrete implementation of the {@link FileSystem} base class for the MapR
- * file system. The class contains MapR specific code to initialize the
- * connection to the file system. Apart from that, we code mainly reuses the
- * existing HDFS wrapper code.
- */
-@SuppressWarnings("unused") // is only instantiated via reflection
-public final class MapRFileSystem extends FileSystem {
-
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(MapRFileSystem.class);
-
-	/**
-	 * The name of MapR's class containing the implementation of the Hadoop HDFS
-	 * interface.
-	 */
-	private static final String MAPR_FS_IMPL_CLASS = "com.mapr.fs.MapRFileSystem";
-
-	/**
-	 * Name of the environment variable to determine the location of the MapR
-	 * installation.
-	 */
-	private static final String MAPR_HOME_ENV = "MAPR_HOME";
-
-	/**
-	 * The default location of the MapR installation.
-	 */
-	private static final String DEFAULT_MAPR_HOME = "/opt/mapr/";
-
-	/**
-	 * The path relative to the MAPR_HOME where MapR stores how to access the
-	 * configured clusters.
-	 */
-	private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf";
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The MapR implementation of the Hadoop HDFS interface.
-	 */
-	private final org.apache.hadoop.fs.FileSystem fs;
-
-	/**
-	 * Creates a new MapRFileSystem object to access the MapR file system.
-	 *
-	 * @throws IOException
-	 *             throw if the required MapR classes cannot be found
-	 */
-	public MapRFileSystem(URI fsURI) throws IOException {
-		checkNotNull(fsURI, "fsURI");
-
-		LOG.debug("Trying to load class {} to access the MapR file system", MAPR_FS_IMPL_CLASS);
-
-		final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
-		try {
-			fsClass = Class.forName(MAPR_FS_IMPL_CLASS).asSubclass(org.apache.hadoop.fs.FileSystem.class);
-		}
-		catch (Exception e) {
-			throw new IOException(
-					String.format("Cannot load MapR File System class '%s'. " +
-							"Please check that the MapR Hadoop libraries are in the classpath.",
-							MAPR_FS_IMPL_CLASS), e);
-		}
-
-		LOG.info("Initializing MapR file system for URI {}", fsURI);
-
-		final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-		final org.apache.hadoop.fs.FileSystem fs;
-
-		final String authority = fsURI.getAuthority();
-		if (authority == null || authority.isEmpty()) {
-
-			// Use the default constructor to instantiate MapR file system object
-
-			try {
-				fs = fsClass.newInstance();
-			}
-			catch (Exception e) {
-				throw new IOException(e);
-			}
-		} else {
-
-			// We have an authority, check the MapR cluster configuration to
-			// find the CLDB locations.
-			final String[] cldbLocations = getCLDBLocations(authority);
-
-			// Find the appropriate constructor
-			try {
-				final Constructor<? extends org.apache.hadoop.fs.FileSystem> constructor =
-						fsClass.getConstructor(String.class, String[].class);
-
-				fs = constructor.newInstance(authority, cldbLocations);
-			}
-			catch (InvocationTargetException e) {
-				if (e.getTargetException() instanceof IOException) {
-					throw (IOException) e.getTargetException();
-				} else {
-					throw new IOException(e.getTargetException());
-				}
-			}
-			catch (Exception e) {
-				throw new IOException(e);
-			}
-		}
-
-		// now initialize the Hadoop File System object
-		fs.initialize(fsURI, conf);
-
-		// all good as it seems
-		this.fs = fs;
-	}
-
-	// ------------------------------------------------------------------------
-	//  file system methods
-	// ------------------------------------------------------------------------
-
-	@Override
-	public Path getWorkingDirectory() {
-		return new Path(this.fs.getWorkingDirectory().toUri());
-	}
-
-	public Path getHomeDirectory() {
-		return new Path(this.fs.getHomeDirectory().toUri());
-	}
-
-	@Override
-	public URI getUri() {
-		return this.fs.getUri();
-	}
-
-	@Override
-	public FileStatus getFileStatus(final Path f) throws IOException {
-
-		final org.apache.hadoop.fs.FileStatus status = this.fs
-				.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
-
-		return new HadoopFileStatus(status);
-	}
-
-	@Override
-	public BlockLocation[] getFileBlockLocations(final FileStatus file,
-			final long start, final long len) throws IOException {
-
-		if (!(file instanceof HadoopFileStatus)) {
-			throw new IOException(
-					"file is not an instance of DistributedFileStatus");
-		}
-
-		final HadoopFileStatus f = (HadoopFileStatus) file;
-
-		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs
-				.getFileBlockLocations(f.getInternalFileStatus(), start, len);
-
-		// Wrap up HDFS specific block location objects
-		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
-		for (int i = 0; i < distBlkLocations.length; i++) {
-			distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
-		}
-
-		return distBlkLocations;
-	}
-
-	@Override
-	public FSDataInputStream open(final Path f, final int bufferSize)
-			throws IOException {
-		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
-		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
-		return new HadoopDataInputStream(fdis);
-	}
-
-	@Override
-	public FSDataInputStream open(final Path f) throws IOException {
-		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
-		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path);
-
-		return new HadoopDataInputStream(fdis);
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public FSDataOutputStream create(final Path f, final boolean overwrite,
-			final int bufferSize, final short replication, final long blockSize)
-			throws IOException {
-
-		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
-				new org.apache.hadoop.fs.Path(f.toString()), overwrite,
-				bufferSize, replication, blockSize);
-
-		return new HadoopDataOutputStream(fdos);
-	}
-
-	@Override
-	public FSDataOutputStream create(final Path f, final WriteMode overwrite)
-			throws IOException {
-
-		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
-				new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE);
-
-		return new HadoopDataOutputStream(fdos);
-	}
-
-	@Override
-	public boolean delete(final Path f, final boolean recursive)
-			throws IOException {
-
-		return this.fs.delete(new org.apache.hadoop.fs.Path(f.toString()),
-				recursive);
-	}
-
-	@Override
-	public FileStatus[] listStatus(final Path f) throws IOException {
-
-		final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs
-				.listStatus(new org.apache.hadoop.fs.Path(f.toString()));
-		final FileStatus[] files = new FileStatus[hadoopFiles.length];
-
-		// Convert types
-		for (int i = 0; i < files.length; i++) {
-			files[i] = new HadoopFileStatus(hadoopFiles[i]);
-		}
-
-		return files;
-	}
-
-	@Override
-	public boolean mkdirs(final Path f) throws IOException {
-
-		return this.fs.mkdirs(new org.apache.hadoop.fs.Path(f.toString()));
-	}
-
-	@Override
-	public boolean rename(final Path src, final Path dst) throws IOException {
-
-		return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()),
-				new org.apache.hadoop.fs.Path(dst.toString()));
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public long getDefaultBlockSize() {
-		return this.fs.getDefaultBlockSize();
-	}
-
-	@Override
-	public boolean isDistributedFS() {
-		return true;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Retrieves the CLDB locations for the given MapR cluster name.
-	 *
-	 * @param authority
-	 *            the name of the MapR cluster
-	 * @return a list of CLDB locations
-	 * @throws IOException
-	 *             thrown if the CLDB locations for the given MapR cluster name
-	 *             cannot be determined
-	 */
-	private static String[] getCLDBLocations(final String authority) throws IOException {
-
-		// Determine the MapR home
-		String maprHome = System.getenv(MAPR_HOME_ENV);
-		if (maprHome == null) {
-			maprHome = DEFAULT_MAPR_HOME;
-		}
-
-		final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format(
-					"Trying to retrieve MapR cluster configuration from %s",
-					maprClusterConf));
-		}
-
-		// Read the cluster configuration file, format is specified at
-		// http://doc.mapr.com/display/MapR/mapr-clusters.conf
-
-		try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) {
-
-			String line;
-			while ((line = br.readLine()) != null) {
-
-				// Normalize the string
-				line = line.trim();
-				line = line.replace('\t', ' ');
-
-				final String[] fields = line.split(" ");
-				if (fields.length < 1) {
-					continue;
-				}
-
-				final String clusterName = fields[0];
-
-				if (!clusterName.equals(authority)) {
-					continue;
-				}
-
-				final List<String> cldbLocations = new ArrayList<>();
-
-				for (int i = 1; i < fields.length; ++i) {
-
-					// Make sure this is not a key-value pair MapR recently
-					// introduced in the file format along with their security
-					// features.
-					if (!fields[i].isEmpty() && !fields[i].contains("=")) {
-						cldbLocations.add(fields[i]);
-					}
-				}
-
-				if (cldbLocations.isEmpty()) {
-					throw new IOException(
-							String.format(
-									"%s contains entry for cluster %s but no CLDB locations.",
-									maprClusterConf, authority));
-				}
-
-				return cldbLocations.toArray(new String[cldbLocations.size()]);
-			}
-
-		}
-
-		throw new IOException(String.format(
-				"Unable to find CLDB locations for cluster %s", authority));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
deleted file mode 100644
index ca0630c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.configuration.ConfigConstants;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Collection;
-
-/**
- * Utility class for working with Hadoop-related classes. This should only be used if Hadoop
- * is on the classpath.
- */
-public class HadoopUtils {
-
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
-
-	private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
-
-	public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
-
-		Configuration result = new Configuration();
-		boolean foundHadoopConfiguration = false;
-
-		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
-		// the hdfs configuration
-		// Try to load HDFS configuration from Hadoop's own configuration files
-		// 1. approach: Flink configuration
-		final String hdfsDefaultPath =
-			flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
-
-		if (hdfsDefaultPath != null) {
-			result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
-			LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath);
-			foundHadoopConfiguration = true;
-		} else {
-			LOG.debug("Cannot find hdfs-default configuration-file path in Flink config.");
-		}
-
-		final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
-		if (hdfsSitePath != null) {
-			result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
-			LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath);
-			foundHadoopConfiguration = true;
-		} else {
-			LOG.debug("Cannot find hdfs-site configuration-file path in Flink config.");
-		}
-
-		// 2. Approach environment variables
-		String[] possibleHadoopConfPaths = new String[4];
-		possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
-		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
-
-		if (System.getenv("HADOOP_HOME") != null) {
-			possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf";
-			possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2
-		}
-
-		for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
-			if (possibleHadoopConfPath != null) {
-				if (new File(possibleHadoopConfPath).exists()) {
-					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
-						result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
-						LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
-						foundHadoopConfiguration = true;
-					}
-					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
-						result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
-						LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
-						foundHadoopConfiguration = true;
-					}
-				}
-			}
-		}
-
-		if (!foundHadoopConfiguration) {
-			LOG.debug("Could not find Hadoop configuration via any of the supported methods " +
-				"(Flink configuration, environment variables).");
-		}
-
-		return result;
-	}
-
-	/**
-	 * Indicates whether the current user has an HDFS delegation token.
-	 */
-	public static boolean hasHDFSDelegationToken() throws Exception {
-		UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
-		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
-		for (Token<? extends TokenIdentifier> token : usrTok) {
-			if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
-				return true;
-			}
-		}
-		return false;
-	}
-}


Mime
View raw message