flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [05/10] flink git commit: [FLINK-2842] [documentation] Remove Flink S3FileSystem, extend documentation to use Hadoop S3FileSystem.
Date Mon, 19 Oct 2015 16:01:11 GMT
[FLINK-2842] [documentation] Remove Flink S3FileSystem, extend documentation to use Hadoop S3FileSystem.

This closes #1245


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9005779
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9005779
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9005779

Branch: refs/heads/master
Commit: c90057792d887d3ab7ff23ff53149f1887c9ed62
Parents: 640e63b
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Oct 9 17:35:00 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Oct 19 16:17:37 2015 +0200

----------------------------------------------------------------------
 docs/apis/example_connectors.md                 |  68 +-
 .../org/apache/flink/core/fs/FileSystem.java    |   3 -
 flink-runtime/pom.xml                           |  12 +-
 .../flink/runtime/fs/s3/S3BlockLocation.java    |  63 --
 .../flink/runtime/fs/s3/S3BucketObjectPair.java |  93 ---
 .../flink/runtime/fs/s3/S3DataInputStream.java  | 174 ----
 .../flink/runtime/fs/s3/S3DataOutputStream.java | 325 --------
 .../runtime/fs/s3/S3DirectoryStructure.java     |  87 --
 .../flink/runtime/fs/s3/S3FileStatus.java       |  95 ---
 .../flink/runtime/fs/s3/S3FileSystem.java       | 786 -------------------
 .../flink/runtime/fs/s3/S3FileSystemTest.java   | 465 -----------
 11 files changed, 52 insertions(+), 2119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/docs/apis/example_connectors.md
----------------------------------------------------------------------
diff --git a/docs/apis/example_connectors.md b/docs/apis/example_connectors.md
index 5b3f7c6..ef5e994 100644
--- a/docs/apis/example_connectors.md
+++ b/docs/apis/example_connectors.md
@@ -20,32 +20,64 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Reading from filesystems.
+## Reading from file systems.
 
 Flink has build-in support for the following file systems:
 
-| Filesystem        | Since           | Scheme  | Notes |
-| ------------- |-------------| -----| ------ |
-| Hadoop Distributed File System (HDFS)  | 0.2 | `hdfs://`| All HDFS versions are supported |
-| Amazon S3    |  0.2 | `s3://` |   |
-| MapR file system      | 0.7-incubating      |  `maprfs://` | The user has to manually place the required jar files in the `lib/` dir |
-| Tachyon   |  0.9 | `tachyon://` | Support through Hadoop file system implementation (see below) |
+| Filesystem                            | Scheme       | Notes  |
+| ------------------------------------- |--------------| ------ |
+| Hadoop Distributed File System (HDFS) &nbsp; | `hdfs://`    | All HDFS versions are supported |
+| Amazon S3                             | `s3://`      | Support through Hadoop file system implementation (see below) | 
+| MapR file system                      | `maprfs://`  | The user has to manually place the required jar files in the `lib/` dir |
+| Tachyon                               | `tachyon://` &nbsp; | Support through Hadoop file system implementation (see below) |
 
 
 
-### Using Hadoop file systems with Apache Flink
+### Using Hadoop file system implementations
 
 Apache Flink allows users to use any file system implementing the `org.apache.hadoop.fs.FileSystem`
-interface. Hadoop ships adapters for FTP, [Hftp](http://hadoop.apache.org/docs/r1.2.1/hftp.html), and others.
+interface. There are Hadoop `FileSystem` implementations for
 
-Flink has integrated testcases to validate the integration with [Tachyon](http://tachyon-project.org/).
-Other file systems we tested the integration is the
-[Google Cloud Storage Connector for Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector) and [XtreemFS](http://www.xtreemfs.org/).
+- [S3](https://aws.amazon.com/s3/) (tested)
+- [Google Cloud Storage Connector for Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector) (tested)
+- [Tachyon](http://tachyon-project.org/) (tested)
+- [XtreemFS](http://www.xtreemfs.org/) (tested)
+- FTP via [Hftp](http://hadoop.apache.org/docs/r1.2.1/hftp.html) (not tested)
+- and many more.
 
-In order to use a Hadoop file system with Flink, make sure that the `flink-conf.yaml` has set the
-`fs.hdfs.hadoopconf` property set to the Hadoop configuration directory.
-In addition to that, the Hadoop configuration (in that directory) needs to have an entry for each supported file system.
-For example for tachyon support, there must be the following entry in the `core-site.xml` file:
+In order to use a Hadoop file system with Flink, make sure that
+
+- the `flink-conf.yaml` has set the `fs.hdfs.hadoopconf` property set to the Hadoop configuration directory.
+- the Hadoop configuration (in that directory) has an entry for the required file system. Examples for S3 and Tachyon are shown below.
+- the required classes for using the file system are available in the `lib/` folder of the Flink installation (on all machines running Flink). If putting the files into the directory is not possible, Flink is also respecting the `HADOOP_CLASSPATH` environment variable to add Hadoop jar files to the classpath.
+
+#### Amazon S3
+
+For Amazon S3 support add the following entries into the `core-site.xml` file:
+
+~~~xml
+<!-- configure the file system implementation -->
+<property>
+  <name>fs.s3.impl</name>
+  <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
+</property>
+
+<!-- set your AWS ID -->
+<property>
+  <name>fs.s3.awsAccessKeyId</name>
+  <value>putKeyHere</value>
+</property>
+
+<!-- set your AWS access key -->
+<property>
+  <name>fs.s3.awsSecretAccessKey</name>
+  <value>putSecretHere</value>
+</property>
+~~~
+
+#### Tachyon
+
+For Tachyon support add the following entry into the `core-site.xml` file:
 
 ~~~xml
 <property>
@@ -54,10 +86,8 @@ For example for tachyon support, there must be the following entry in the `core-
 </property>
 ~~~
 
-Also, the required classes for using the file system need to be placed in the `lib/` folder of the Flink installation (on all machines running Flink). If putting the files into the directory is not possible, Flink is also respecting the `HADOOP_CLASSPATH` environment variable to add Hadoop jar files to the classpath.
-
 
-## Connecting to other systems using Input / Output Format wrappers for Hadoop
+## Connecting to other systems using Input/OutputFormat wrappers for Hadoop
 
 Apache Flink allows users to access many different systems as data sources or sinks.
 The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept

http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 8b4cdba..185b5f2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -51,8 +51,6 @@ public abstract class FileSystem {
 
 	private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";
 	
-	private static final String S3_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.s3.S3FileSystem";
-
 	private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
 
 	
@@ -156,7 +154,6 @@ public abstract class FileSystem {
 		FSDIRECTORY.put("hdfs", HADOOP_WRAPPER_FILESYSTEM_CLASS);
 		FSDIRECTORY.put("maprfs", MAPR_FILESYSTEM_CLASS);
 		FSDIRECTORY.put("file", LOCAL_FILESYSTEM_CLASS);
-		FSDIRECTORY.put("s3", S3_FILESYSTEM_CLASS);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index a831eba..f79c5ed 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -76,15 +76,9 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.amazonaws</groupId>
-			<artifactId>aws-java-sdk</artifactId>
-			<version>1.8.1</version>
-			<exclusions>
-				<exclusion>
-					<artifactId>commons-codec</artifactId>
-					<groupId>commons-codec</groupId>
-				</exclusion>
-			</exclusions>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpclient</artifactId>
+			<version>4.2</version>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3BlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3BlockLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3BlockLocation.java
deleted file mode 100644
index 05b716d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3BlockLocation.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.s3;
-
-import java.io.IOException;
-
-import org.apache.flink.core.fs.BlockLocation;
-
-public final class S3BlockLocation implements BlockLocation {
-
-	private final String[] hosts;
-
-	private final long length;
-
-	S3BlockLocation(final String host, final long length) {
-
-		this.hosts = new String[1];
-		this.hosts[0] = host;
-		this.length = length;
-	}
-
-	@Override
-	public int compareTo(final BlockLocation arg0) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public String[] getHosts() throws IOException {
-
-		return this.hosts;
-	}
-
-	@Override
-	public long getOffset() {
-
-		return 0;
-	}
-
-	@Override
-	public long getLength() {
-
-		return this.length;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3BucketObjectPair.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3BucketObjectPair.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3BucketObjectPair.java
deleted file mode 100644
index 1f9e0c4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3BucketObjectPair.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.s3;
-
-/**
- * An S3 bucket-object pair identifies either a bucket in S3 or an object. If the object property is <code>null</code>,
- * this object identifies an S3 bucket. If both the bucket and the object property is <code>null</code>, the object
- * refers to the S3 base directory.
- * <p>
- * This class is thread-safe.
- * 
- */
-public final class S3BucketObjectPair {
-
-	/**
-	 * The name of the S3 bucket this object refers to.
-	 */
-	private final String bucket;
-
-	/**
-	 * The name of the S3 object this object refers to.
-	 */
-	private final String object;
-
-	/**
-	 * Creates a new S3 bucket-object pair.
-	 * 
-	 * @param bucket
-	 *        the name of the S3 bucket this object refers to
-	 * @param object
-	 *        the name of the S3 object this object refers to
-	 */
-	S3BucketObjectPair(final String bucket, final String object) {
-		this.bucket = bucket;
-		this.object = object;
-	}
-
-	/**
-	 * Returns the name of the S3 bucket this object refers to.
-	 * 
-	 * @return the name of the S3 bucket this object refers to or <code>null</code> if this object refers to the S3 base
-	 *         directory
-	 */
-	public String getBucket() {
-		return this.bucket;
-	}
-
-	/**
-	 * Returns the name of the S3 object this object refers to.
-	 * 
-	 * @return the name of the S3 object this object refers to or <code>null</code> if this object refers to an S3
-	 *         bucket
-	 */
-	public String getObject() {
-		return this.object;
-	}
-
-	/**
-	 * Checks whether this object refers to an S3 bucket.
-	 * 
-	 * @return <code>true</code> if this object refers to an S3 bucket, <code>false</code> otherwise
-	 *         directory
-	 */
-	public boolean hasBucket() {
-		return (this.bucket != null);
-	}
-
-	/**
-	 * Checks whether this object refers to an S3 object.
-	 * 
-	 * @return <code>true</code> if this object refers to an S3 object, <code>false</code> otherwise
-	 */
-	public boolean hasObject() {
-		return (this.object != null);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java
deleted file mode 100644
index c839640..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.s3;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.util.StringUtils;
-
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.S3Object;
-
-/**
- * This class implements an {@link FSDataInputStream} that downloads its data from Amazon S3 in the background.
- * Essentially, this class is just a wrapper to the Amazon AWS SDK.
- */
-public class S3DataInputStream extends FSDataInputStream {
-
-	/**
-	 * The input stream which reads the actual S3 object content.
-	 */
-	private final InputStream inputStream;
-
-	/**
-	 * The current position of input stream.
-	 */
-	private long position;
-
-	/**
-	 * The marked position.
-	 */
-	private long marked;
-
-
-	/**
-	 * Constructs a new input stream which reads its data from the specified S3 object.
-	 *
-	 * @param s3Client
-	 *        the S3 client to connect to Amazon S3.
-	 * @param bucket
-	 *        the name of the S3 bucket the object is stored in
-	 * @param object
-	 *        the name of the S3 object whose content shall be read
-	 * @throws IOException
-	 *         thrown if an error occurs while accessing the specified S3 object
-	 */
-	S3DataInputStream(final AmazonS3Client s3Client, final String bucket, final String object) throws IOException {
-
-		S3Object s3o = null;
-		try {
-			s3o = s3Client.getObject(bucket, object);
-		} catch (AmazonServiceException e) {
-			throw new IOException(StringUtils.stringifyException(e));
-		}
-
-		this.inputStream = s3o.getObjectContent();
-		this.position = 0;
-		this.marked = 0;
-	}
-
-
-	@Override
-	public int available() throws IOException {
-
-		return this.inputStream.available();
-	}
-
-
-	@Override
-	public void close() throws IOException {
-
-		this.inputStream.close();
-	}
-
-
-	@Override
-	public void mark(final int readlimit) {
-
-		this.inputStream.mark(readlimit);
-		marked = readlimit;
-	}
-
-
-	@Override
-	public boolean markSupported() {
-
-		return this.inputStream.markSupported();
-	}
-
-
-	@Override
-	public int read() throws IOException {
-
-		int read = this.inputStream.read();
-		if (read != -1) {
-			++position;
-		}
-
-		return read;
-	}
-
-
-	@Override
-	public int read(final byte[] b) throws IOException {
-
-		int read = this.inputStream.read(b);
-		if (read > 0) {
-			position += read;
-		}
-
-		return read;
-	}
-
-
-	@Override
-	public int read(final byte[] b, final int off, final int len) throws IOException {
-
-		int read = this.inputStream.read(b, off, len);
-		if (read > 0) {
-			position += read;
-		}
-
-		return read;
-	}
-
-
-	@Override
-	public void reset() throws IOException {
-
-		this.inputStream.reset();
-		position = marked;
-	}
-
-
-	@Override
-	public void seek(final long desired) throws IOException {
-
-		skip(desired);
-	}
-
-	@Override
-	public long skip(long n) throws IOException {
-		long skipped = this.inputStream.skip(n);
-		if (skipped > 0) {
-			position += skipped;
-		}
-
-		return skipped;
-	}
-
-	@Override
-	public long getPos() throws IOException {
-		return position;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataOutputStream.java
deleted file mode 100644
index f830613..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataOutputStream.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.s3;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.util.StringUtils;
-
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.StorageClass;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.amazonaws.services.s3.model.UploadPartResult;
-
-public final class S3DataOutputStream extends FSDataOutputStream {
-
-	private static final int MAX_PART_NUMBER = 10000;
-
-	public static final int MINIMUM_MULTIPART_SIZE = 5 * 1024 * 1024;
-
-	private final AmazonS3Client s3Client;
-
-	private final boolean useRRS;
-
-	private final byte[] buf;
-
-	private final String bucket;
-
-	private final String object;
-
-	private final List<PartETag> partETags = new ArrayList<PartETag>();
-
-	/**
-	 * The ID of a multipart upload in case multipart upload is used, otherwise <code>null</code>.
-	 */
-	private String uploadId = null;
-
-	/**
-	 * The next part number to be used during a multipart upload.
-	 */
-	private int partNumber = 1; // First valid upload part number is 1.
-
-	private int bytesWritten = 0;
-	
-
-	private final class InternalUploadInputStream extends InputStream {
-
-		private final byte[] srcBuf;
-
-		private final int length;
-
-		private int bytesRead = 0;
-
-		private InternalUploadInputStream(final byte[] srcBuf, final int length) {
-			this.srcBuf = buf;
-			this.length = length;
-		}
-
-
-		@Override
-		public int read() throws IOException {
-			if (this.length - this.bytesRead == 0) {
-				return -1;
-			}
-
-			return (int) this.srcBuf[this.bytesRead++];
-		}
-
-
-		@Override
-		public int read(final byte[] buf) throws IOException {
-			return read(buf, 0, buf.length);
-		}
-
-		@Override
-		public int read(final byte[] buf, final int off, final int len) throws IOException {
-			if (this.length - this.bytesRead == 0) {
-				return -1;
-			}
-
-			final int bytesToCopy = Math.min(len, this.length - this.bytesRead);
-			System.arraycopy(srcBuf, this.bytesRead, buf, off, bytesToCopy);
-			this.bytesRead += bytesToCopy;
-
-			return bytesToCopy;
-		}
-
-		@Override
-		public int available() throws IOException {
-
-			return (this.length - bytesRead);
-		}
-
-		@Override
-		public long skip(final long n) throws IOException {
-
-			int bytesToSkip = (int) Math.min(n, Integer.MAX_VALUE);
-			bytesToSkip = Math.min(this.length - this.bytesRead, bytesToSkip);
-
-			this.bytesRead += bytesToSkip;
-
-			return bytesToSkip;
-		}
-	}
-
-	S3DataOutputStream(final AmazonS3Client s3Client, final String bucket, final String object, final byte[] buf,
-			final boolean useRRS) {
-
-		this.s3Client = s3Client;
-		this.bucket = bucket;
-		this.object = object;
-		this.buf = buf;
-		this.useRRS = useRRS;
-	}
-
-
-	@Override
-	public void write(final int b) throws IOException {
-		// Upload buffer to S3
-		if (this.bytesWritten == this.buf.length) {
-			uploadPartAndFlushBuffer();
-		}
-
-		this.buf[this.bytesWritten++] = (byte) b;
-	}
-
-
-	@Override
-	public void write(final byte[] b, final int off, final int len) throws IOException {
-		int nextPos = off;
-
-		while (nextPos < len) {
-			// Upload buffer to S3
-			if (this.bytesWritten == this.buf.length) {
-				uploadPartAndFlushBuffer();
-			}
-
-			final int bytesToCopy = Math.min(this.buf.length - this.bytesWritten, len - nextPos);
-			System.arraycopy(b, nextPos, this.buf, this.bytesWritten, bytesToCopy);
-			this.bytesWritten += bytesToCopy;
-			nextPos += bytesToCopy;
-		}
-	}
-
-
-	@Override
-	public void write(final byte[] b) throws IOException {
-		write(b, 0, b.length);
-	}
-
-
-	@Override
-	public void close() throws IOException {
-		if (this.uploadId == null) {
-			// This is not a multipart upload
-
-			// No data has been written
-			if (this.bytesWritten == 0) {
-				return;
-			}
-
-			final InputStream is = new InternalUploadInputStream(this.buf, this.bytesWritten);
-			final ObjectMetadata om = new ObjectMetadata();
-			om.setContentLength(this.bytesWritten);
-
-			final PutObjectRequest por = new PutObjectRequest(this.bucket, this.object, is, om);
-			if (this.useRRS) {
-				por.setStorageClass(StorageClass.ReducedRedundancy);
-			} else {
-				por.setStorageClass(StorageClass.Standard);
-			}
-
-			try {
-				this.s3Client.putObject(por);
-			} catch (AmazonServiceException e) {
-				throw new IOException(StringUtils.stringifyException(e));
-			}
-
-			this.bytesWritten = 0;
-
-		} else {
-
-			if (this.bytesWritten > 0) {
-				uploadPartAndFlushBuffer();
-			}
-
-			boolean operationSuccessful = false;
-			try {
-				final CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(this.bucket,
-					this.object,
-					this.uploadId, this.partETags);
-				this.s3Client.completeMultipartUpload(request);
-
-				operationSuccessful = true;
-
-			} catch (AmazonServiceException e) {
-				throw new IOException(StringUtils.stringifyException(e));
-			} finally {
-				if (!operationSuccessful) {
-					abortUpload();
-				}
-			}
-		}
-	}
-
-	@Override
-	public void sync() throws IOException {
-		// can do nothing here
-	}
-	
-	@Override
-	public void flush() throws IOException {
-		// Flush does nothing in this implementation since we ways have to transfer at least 5 MB in a multipart upload
-	}
-
-	private void uploadPartAndFlushBuffer() throws IOException {
-
-		boolean operationSuccessful = false;
-
-		if (this.uploadId == null) {
-			this.uploadId = initiateMultipartUpload();
-		}
-
-		try {
-			if (this.partNumber >= MAX_PART_NUMBER) {
-				throw new IOException("Cannot upload any more data: maximum part number reached");
-			}
-
-			final InputStream inputStream = new InternalUploadInputStream(this.buf, this.bytesWritten);
-			final UploadPartRequest request = new UploadPartRequest();
-			request.setBucketName(this.bucket);
-			request.setKey(this.object);
-			request.setInputStream(inputStream);
-			request.setUploadId(this.uploadId);
-			request.setPartSize(this.bytesWritten);
-			request.setPartNumber(this.partNumber++);
-
-			final UploadPartResult result = this.s3Client.uploadPart(request);
-			this.partETags.add(result.getPartETag());
-
-			this.bytesWritten = 0;
-			operationSuccessful = true;
-
-		}
-		catch (AmazonServiceException e) {
-			throw new IOException(e.getMessage(), e);
-		}
-		finally {
-			if (!operationSuccessful) {
-				abortUpload();
-			}
-		}
-	}
-
-	private String initiateMultipartUpload() throws IOException {
-
-		boolean operationSuccessful = false;
-		final InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(this.bucket, this.object);
-		if (this.useRRS) {
-			request.setStorageClass(StorageClass.ReducedRedundancy);
-		} else {
-			request.setStorageClass(StorageClass.Standard);
-		}
-
-		try {
-
-			final InitiateMultipartUploadResult result = this.s3Client.initiateMultipartUpload(request);
-			operationSuccessful = true;
-			return result.getUploadId();
-
-		}
-		catch (AmazonServiceException e) {
-			throw new IOException(e.getMessage(), e);
-		}
-		finally {
-			if (!operationSuccessful) {
-				abortUpload();
-			}
-		}
-	}
-
-	private void abortUpload() {
-		if (this.uploadId == null) {
-			// This is not a multipart upload, nothing to do here
-			return;
-		}
-
-		try {
-			final AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(this.bucket, this.object,
-				this.uploadId);
-			this.s3Client.abortMultipartUpload(request);
-		}
-		catch (AmazonServiceException e) {
-			// Ignore exception
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DirectoryStructure.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DirectoryStructure.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DirectoryStructure.java
deleted file mode 100644
index aaf5284..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DirectoryStructure.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.s3;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.core.fs.Path;
-
-public final class S3DirectoryStructure {
-
-	private final String basePath;
-
-	private final Map<Path, S3BucketObjectPair> cache = new HashMap<Path, S3BucketObjectPair>();
-
-	S3DirectoryStructure(final String basePath) {
-		this.basePath = basePath;
-	}
-
-	S3BucketObjectPair toBucketObjectPair(final Path path) {
-
-		S3BucketObjectPair bop = this.cache.get(path);
-		if (bop != null) {
-			return bop;
-		}
-
-		final URI uri = path.toUri();
-		String p = uri.getPath();
-		if (!this.basePath.isEmpty() && !p.contains(this.basePath)) {
-			throw new IllegalArgumentException(path + " is not a valid path for the file system");
-		}
-
-		// Extract the base path
-		if (!this.basePath.isEmpty()) {
-			final int pos = p.indexOf(this.basePath);
-			p = p.substring(pos + this.basePath.length());
-		}
-
-		// Remove leading SEPARATOR
-		if (!p.isEmpty()) {
-			if (p.charAt(0) == Path.SEPARATOR_CHAR) {
-				p = p.substring(1);
-			}
-		}
-
-		if (p.isEmpty()) {
-			bop = new S3BucketObjectPair(null, null);
-			this.cache.put(path, bop);
-			return bop;
-		}
-
-		final int objectPos = p.indexOf(Path.SEPARATOR_CHAR);
-		if (objectPos < 0) {
-			bop = new S3BucketObjectPair(p, null);
-		} else {
-			final String bucket = p.substring(0, objectPos);
-			final String object = p.substring(objectPos + 1);
-			if (object.isEmpty()) {
-				bop = new S3BucketObjectPair(bucket, null);
-			} else {
-				bop = new S3BucketObjectPair(bucket, object);
-			}
-		}
-
-		this.cache.put(path, bop);
-
-		return bop;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileStatus.java
deleted file mode 100644
index bb0daff..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileStatus.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.s3;
-
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.Path;
-
-public final class S3FileStatus implements FileStatus {
-
-	private final Path path;
-
-	private final long length;
-
-	private final boolean isDir;
-
-	private final long modificationTime;
-	
-	private final long accessTime;
-	
-	S3FileStatus(final Path path, final long length, final boolean isDir, final long modificationTime,
-			final long accessTime) {
-		this.path = path;
-		this.length = length;
-		this.isDir = isDir;
-		this.modificationTime = modificationTime;
-		this.accessTime = accessTime;
-	}
-
-
-	@Override
-	public long getLen() {
-
-		return this.length;
-	}
-
-
-	@Override
-	public long getBlockSize() {
-
-		return this.length;
-	}
-
-
-	@Override
-	public short getReplication() {
-
-		return 1;
-	}
-
-
-	@Override
-	public long getModificationTime() {
-		
-		return this.modificationTime;
-	}
-
-
-	@Override
-	public long getAccessTime() {
-		
-		return this.accessTime;
-	}
-
-
-	@Override
-	public boolean isDir() {
-
-		return this.isDir;
-	}
-
-
-	@Override
-	public Path getPath() {
-
-		return this.path;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileSystem.java
deleted file mode 100644
index 522d90e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileSystem.java
+++ /dev/null
@@ -1,786 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.s3;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.StringUtils;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.Bucket;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.Owner;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-
-/**
- * This class provides a {@link FileSystem} implementation which is backed by Amazon's Simple Storage Service (S3). The
- * implementation uses the REST API of Amazon S3 to facilitate the communication and read/write the data.
- * 
- */
-public final class S3FileSystem extends FileSystem {
-
-	/**
-	 * The logging object used for debugging.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystem.class);
-
-	/**
-	 * The configuration key to access the S3 host.
-	 */
-	public static final String S3_HOST_KEY = "fs.s3.host";
-
-	/**
-	 * The configuration key to access the S3 port.
-	 */
-	public static final String S3_PORT_KEY = "fs.s3.port";
-
-	/**
-	 * The configuration key to access the S3 Reduced Redundancy Storage setting.
-	 */
-	public static final String S3_RRS_KEY = "fs.s3.rrs";
-
-	/**
-	 * The configuration key to access the S3 access key.
-	 */
-	public static final String S3_ACCESS_KEY_KEY = "fs.s3.accessKey";
-
-	/**
-	 * The configuration key to access the S3 secret key.
-	 */
-	public static final String S3_SECRET_KEY_KEY = "fs.s3.secretKey";
-
-	/**
-	 * The default host to connect to.
-	 */
-	private static final String DEFAULT_S3_HOST = "s3.amazonaws.com";
-
-	/**
-	 * The default setting whether to use S3 Reduced Redundancy Storage
-	 */
-	private static final boolean DEFAULT_S3_RRS = true;
-
-	/**
-	 * The default port to connect to.
-	 */
-	private static final int DEFAULT_S3_PORT = 80;
-
-	/**
-	 * The prefix of the HTTP protocol.
-	 */
-	private static final String HTTP_PREFIX = "http";
-
-	/**
-	 * The error code for "resource not found" according to the HTTP protocol.
-	 */
-	private static final int HTTP_RESOURCE_NOT_FOUND_CODE = 404;
-
-	/**
-	 * The character which S3 uses internally to indicate an object represents a directory.
-	 */
-	private static final char S3_DIRECTORY_SEPARATOR = '/';
-
-	/**
-	 * The scheme which is used by this file system.
-	 */
-	public static final String S3_SCHEME = "s3";
-
-	/**
-	 * The character set with which the URL is expected to be encoded
-	 */
-	private static final String URL_ENCODE_CHARACTER = "UTF-8";
-
-	/**
-	 * The host to address the REST requests to.
-	 */
-	private String host = null;
-
-	private int port = -1;
-
-	private URI s3Uri = null;
-
-	private AmazonS3Client s3Client = null;
-
-	private S3DirectoryStructure directoryStructure = null;
-
-	private final boolean useRRS;
-
-	public S3FileSystem() {
-
-		this.useRRS = GlobalConfiguration.getBoolean(S3_RRS_KEY, DEFAULT_S3_RRS);
-		LOG.info("Creating new S3 file system binding with Reduced Redundancy Storage "
-			+ (this.useRRS ? "enabled" : "disabled"));
-	}
-
-
-	@Override
-	public Path getWorkingDirectory() {
-
-		return new Path(this.s3Uri);
-	}
-
-	public Path getHomeDirectory() {
-		return new Path(this.s3Uri);
-	}
-
-	@Override
-	public URI getUri() {
-
-		return this.s3Uri;
-	}
-
-
-	@Override
-	public void initialize(URI name) throws IOException {
-
-		this.host = name.getHost();
-		if (this.host == null) {
-			LOG.debug("Provided URI does not provide a host to connect to, using configuration...");
-			this.host = GlobalConfiguration.getString(S3_HOST_KEY, DEFAULT_S3_HOST);
-		}
-
-		this.port = name.getPort();
-		if (this.port == -1) {
-			LOG.debug("Provided URI does not provide a port to connect to, using configuration...");
-			this.port = GlobalConfiguration.getInteger(S3_PORT_KEY, DEFAULT_S3_PORT);
-		}
-
-		final String userInfo = name.getUserInfo();
-
-		String awsAccessKey = null;
-		String awsSecretKey = null;
-
-		if (userInfo != null) {
-
-			final String[] splits = userInfo.split(":");
-			if (splits.length > 1) {
-				awsAccessKey = URLDecoder.decode(splits[0], URL_ENCODE_CHARACTER);
-				awsSecretKey = URLDecoder.decode(splits[1], URL_ENCODE_CHARACTER);
-			}
-		}
-
-		if (awsAccessKey == null) {
-			LOG.debug("Provided URI does not provide an access key to Amazon S3, using configuration...");
-			awsAccessKey = GlobalConfiguration.getString(S3_ACCESS_KEY_KEY, null);
-			if (awsAccessKey == null) {
-				throw new IOException("Cannot determine access key to Amazon S3. Please make " +
-						"sure to configure it by setting the configuration key '"
-						+ S3_ACCESS_KEY_KEY + "'.");
-			}
-		}
-
-		if (awsSecretKey == null) {
-			LOG.debug("Provided URI does not provide a secret key to Amazon S3, using configuration...");
-			awsSecretKey = GlobalConfiguration.getString(S3_SECRET_KEY_KEY, null);
-			if (awsSecretKey == null) {
-				throw new IOException("Cannot determine secret key to Amazon S3. Please make " +
-						"sure to configure it by setting the configuration key '"
-						+ S3_SECRET_KEY_KEY + "'.");
-			}
-		}
-
-		final AWSCredentials credentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey);
-		this.s3Client = new AmazonS3Client(credentials);
-
-		initializeDirectoryStructure(name);
-	}
-
-	private void initializeDirectoryStructure(final URI name) throws IOException {
-
-		String basePath = name.getPath();
-		while (true) {
-
-			try {
-				final String endpoint = new URL(HTTP_PREFIX, this.host, this.port, basePath).toString();
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Trying S3 endpoint " + endpoint);
-				}
-
-				this.s3Client.setEndpoint(endpoint);
-				final Owner owner = this.s3Client.getS3AccountOwner();
-				LOG.info("Successfully established connection to Amazon S3 using the endpoint " + endpoint);
-				LOG.info("Amazon S3 user is " + owner.getDisplayName());
-
-				break;
-			} catch (MalformedURLException e) {
-				throw new IOException(StringUtils.stringifyException(e));
-			} catch (AmazonClientException e) {
-
-				// Truncate path
-				if (basePath.isEmpty()) {
-					throw new IOException("Cannot establish connection to Amazon S3: "
-						+ StringUtils.stringifyException(e));
-				} else {
-					final int pos = basePath.lastIndexOf(Path.SEPARATOR);
-					if (pos < 0) {
-						basePath = "";
-					} else {
-						basePath = basePath.substring(0, pos);
-					}
-				}
-			}
-		}
-
-		// Set the S3 URI
-		try {
-			this.s3Uri = new URI(S3_SCHEME, (String) null, this.host, this.port, basePath, null, null);
-		} catch (URISyntaxException e) {
-			throw new IOException(StringUtils.stringifyException(e));
-		}
-
-		// Finally, create directory structure object
-		this.directoryStructure = new S3DirectoryStructure(basePath);
-	}
-
-
-	@Override
-	public FileStatus getFileStatus(final Path f) throws IOException {
-
-		final S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);
-
-		// This is the S3:/// base directory
-		if (!bop.hasBucket() && !bop.hasObject()) {
-			return new S3FileStatus(f, 0L, true, 0L, 0L);
-		}
-
-		try {
-			if (bop.hasBucket() && !bop.hasObject()) {
-
-				final List<Bucket> buckets = this.s3Client.listBuckets();
-				final Iterator<Bucket> it = buckets.iterator();
-
-				// Iterator throw list of buckets to find out creation date
-				while (it.hasNext()) {
-
-					final Bucket bucket = it.next();
-					if (bop.getBucket().equals(bucket.getName())) {
-
-						final long creationDate = dateToLong(bucket.getCreationDate());
-						// S3 does not track access times, so this implementation always sets it to 0
-						return new S3FileStatus(f, 0L, true, creationDate, 0L);
-					}
-				}
-
-				throw new FileNotFoundException("Cannot find " + f.toUri());
-			}
-
-			try {
-				final ObjectMetadata om = this.s3Client.getObjectMetadata(bop.getBucket(), bop.getObject());
-				final long modificationDate = dateToLong(om.getLastModified());
-				// S3 does not track access times, so this implementation always sets it to 0
-				if (objectRepresentsDirectory(bop.getObject(), om.getContentLength())) {
-					return new S3FileStatus(f, 0L, true, modificationDate, 0L);
-				} else {
-					return new S3FileStatus(f, om.getContentLength(), false, modificationDate, 0L);
-				}
-
-			} catch (AmazonServiceException e) {
-				if (e.getStatusCode() == HTTP_RESOURCE_NOT_FOUND_CODE) {
-					throw new FileNotFoundException("Cannot find " + f.toUri());
-				} else {
-					throw e;
-				}
-			}
-		} catch (AmazonClientException e) {
-			throw new IOException(StringUtils.stringifyException(e));
-		}
-	}
-
-	private static long dateToLong(final Date date) {
-
-		if (date == null) {
-			return 0L;
-		}
-
-		return date.getTime();
-	}
-
-
-	@Override
-	public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
-			throws IOException {
-
-		if ((start + len) > file.getLen()) {
-			return null;
-		}
-
-		final S3BlockLocation bl = new S3BlockLocation(this.host, file.getLen());
-
-		return new BlockLocation[] { bl };
-	}
-
-
-	@Override
-	public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
-
-		return open(f); // Ignore bufferSize
-	}
-
-
-	@Override
-	public FSDataInputStream open(final Path f) throws IOException {
-
-		final FileStatus fileStatus = getFileStatus(f); // Will throw FileNotFoundException if f does not exist
-
-		// Make sure f is not a directory
-		if (fileStatus.isDir()) {
-			throw new IOException("Cannot open " + f.toUri() + " because it is a directory");
-		}
-
-		final S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);
-		if (!bop.hasBucket() || !bop.hasObject()) {
-			throw new IOException(f.toUri() + " cannot be opened");
-		}
-
-		return new S3DataInputStream(this.s3Client, bop.getBucket(), bop.getObject());
-	}
-
-
-	@Override
-	public FileStatus[] listStatus(final Path f) throws IOException {
-
-		final S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);
-
-		try {
-
-			if (!bop.hasBucket()) {
-
-				final List<Bucket> list = this.s3Client.listBuckets();
-				final S3FileStatus[] array = new S3FileStatus[list.size()];
-				final Iterator<Bucket> it = list.iterator();
-				int i = 0;
-				while (it.hasNext()) {
-					final Bucket bucket = it.next();
-					final long creationDate = dateToLong(bucket.getCreationDate());
-					// S3 does not track access times, so this implementation always sets it to 0
-					final S3FileStatus status = new S3FileStatus(extendPath(f, bucket.getName()
-						+ S3_DIRECTORY_SEPARATOR), 0, true, creationDate, 0L);
-					array[i++] = status;
-				}
-
-				return array;
-			}
-
-			if (bop.hasBucket() && !bop.hasObject()) {
-
-				// Check if the bucket really exists
-				if (!this.s3Client.doesBucketExist(bop.getBucket())) {
-					throw new FileNotFoundException("Cannot find " + f.toUri());
-				}
-
-				return listBucketContent(f, bop);
-
-			} else {
-
-				final ObjectMetadata omd = this.s3Client.getObjectMetadata(bop.getBucket(), bop.getObject());
-				if (objectRepresentsDirectory(bop.getObject(), omd.getContentLength())) {
-
-					return listBucketContent(f, bop);
-
-				} else {
-					final S3FileStatus fileStatus = new S3FileStatus(f, omd.getContentLength(), false,
-						dateToLong(omd.getLastModified()), 0L);
-
-					return new FileStatus[] { fileStatus };
-				}
-
-			}
-
-		} catch (AmazonClientException e) {
-			throw new IOException(StringUtils.stringifyException(e));
-		}
-	}
-
-	private S3FileStatus[] listBucketContent(final Path f, final S3BucketObjectPair bop) throws IOException {
-
-		ObjectListing listing = null;
-		final List<S3FileStatus> resultList = new ArrayList<S3FileStatus>();
-
-		final int depth = (bop.hasObject() ? getDepth(bop.getObject()) + 1 : 0);
-
-		while (true) {
-
-			if (listing == null) {
-				if (bop.hasObject()) {
-					listing = this.s3Client.listObjects(bop.getBucket(), bop.getObject());
-				} else {
-					listing = this.s3Client.listObjects(bop.getBucket());
-				}
-			} else {
-				listing = this.s3Client.listNextBatchOfObjects(listing);
-			}
-
-			final List<S3ObjectSummary> list = listing.getObjectSummaries();
-			final Iterator<S3ObjectSummary> it = list.iterator();
-			while (it.hasNext()) {
-
-				final S3ObjectSummary os = it.next();
-				String key = os.getKey();
-
-				final int childDepth = getDepth(os.getKey());
-
-				if (childDepth != depth) {
-					continue;
-				}
-
-				// Remove the prefix
-				if (bop.hasObject()) {
-					if (key.startsWith(bop.getObject())) {
-						key = key.substring(bop.getObject().length());
-					}
-
-					// This has been the prefix itself
-					if (key.isEmpty()) {
-						continue;
-					}
-				}
-
-				final long modificationDate = dateToLong(os.getLastModified());
-
-				S3FileStatus fileStatus;
-				if (objectRepresentsDirectory(os)) {
-					fileStatus = new S3FileStatus(extendPath(f, key), 0, true, modificationDate, 0L);
-				} else {
-					fileStatus = new S3FileStatus(extendPath(f, key), os.getSize(), false, modificationDate, 0L);
-				}
-
-				resultList.add(fileStatus);
-			}
-
-			if (!listing.isTruncated()) {
-				break;
-			}
-		}
-
-		/*
-		 * System.out.println("---- RETURN CONTENT ----");
-		 * for (final FileStatus entry : resultList) {
-		 * System.out.println(entry.getPath());
-		 * }
-		 * System.out.println("------------------------");
-		 */
-
-		return resultList.toArray(new S3FileStatus[0]);
-
-	}
-
-	private static int getDepth(final String key) {
-
-		int depth = 0;
-		int nextStartPos = 0;
-
-		final int length = key.length();
-
-		while (nextStartPos < length) {
-
-			final int sepPos = key.indexOf(S3_DIRECTORY_SEPARATOR, nextStartPos);
-			if (sepPos < 0) {
-				break;
-			} else {
-				++depth;
-				nextStartPos = sepPos + 1;
-			}
-		}
-
-		if (length > 0) {
-			if (key.charAt(length - 1) == S3_DIRECTORY_SEPARATOR) {
-				--depth;
-			}
-		}
-
-		return depth;
-	}
-
-
-	@Override
-	public boolean delete(Path f, boolean recursive) throws IOException {
-
-		try {
-			final FileStatus fileStatus = getFileStatus(f); // Will throw a FileNotFoundException if f is invalid
-			final S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);
-
-			if (fileStatus.isDir()) {
-
-				boolean retVal = false;
-				final FileStatus[] dirContent = listStatus(f);
-				if (dirContent.length > 0) {
-					// Directory is not empty
-					if (!recursive) {
-						throw new IOException("Found non-empty directory " + f
-							+ " while performing non-recursive delete");
-					}
-
-					for (final FileStatus entry : dirContent) {
-
-						if (delete(entry.getPath(), true)) {
-							retVal = true;
-						}
-					}
-				}
-
-				// Now the directory is empty
-
-				if (!bop.hasBucket()) {
-					// This is the root directory, do not delete this
-					return retVal;
-				}
-
-				if (!bop.hasObject()) {
-					// This is a real bucket
-					this.s3Client.deleteBucket(bop.getBucket());
-				} else {
-					// This directory is actually represented by an object in S3
-					this.s3Client.deleteObject(bop.getBucket(), bop.getObject());
-				}
-			} else {
-				// This is a file
-				this.s3Client.deleteObject(bop.getBucket(), bop.getObject());
-			}
-		} catch (AmazonClientException e) {
-			throw new IOException(StringUtils.stringifyException(e));
-		}
-
-		return true;
-	}
-
-
-	@Override
-	public boolean mkdirs(final Path f) throws IOException {
-
-		final S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);
-		if (!bop.hasBucket() && !bop.hasObject()) {
-			// Ignore this call
-			return false;
-		}
-
-		boolean retCode = false;
-
-		try {
-
-			// Make sure the bucket exists
-			if (bop.hasBucket()) {
-				if (this.s3Client.doesBucketExist(bop.getBucket())) {
-				} else {
-					this.s3Client.createBucket(bop.getBucket());
-					retCode = true;
-				}
-			}
-
-			if (bop.hasObject()) {
-
-				// Make sure object name ends with a directory separator character
-				String object = bop.getObject();
-				if (!object.isEmpty()) {
-					if (object.charAt(object.length() - 1) != S3_DIRECTORY_SEPARATOR) {
-						object = object.concat(Character.toString(S3_DIRECTORY_SEPARATOR));
-					}
-				}
-
-				while (true) {
-
-					try {
-						this.s3Client.getObjectMetadata(bop.getBucket(), object);
-					} catch (AmazonServiceException e) {
-						if (e.getStatusCode() == HTTP_RESOURCE_NOT_FOUND_CODE) {
-							createEmptyObject(bop.getBucket(), object);
-
-							if (object.length() > 1) {
-								final int nextPos = object.lastIndexOf(S3_DIRECTORY_SEPARATOR, object.length() - 2);
-								if (nextPos >= 0) {
-									object = object.substring(0, nextPos + 1);
-									continue;
-								}
-							}
-
-						} else {
-							// Rethrow the exception
-							throw e;
-						}
-					}
-
-					// Object already exists, exit
-					break;
-				}
-			}
-		} catch (AmazonClientException e) {
-			throw new IOException(StringUtils.stringifyException(e));
-		}
-
-		return retCode;
-	}
-
-	private void createEmptyObject(final String bucketName, final String objectName) {
-
-		final InputStream im = new InputStream() {
-
-			@Override
-			public int read() throws IOException {
-
-				return -1;
-			}
-		};
-
-		final ObjectMetadata om = new ObjectMetadata();
-		om.setContentLength(0L);
-
-		this.s3Client.putObject(bucketName, objectName, im, om);
-	}
-
-
-	@Override
-	public FSDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
-			final short replication, final long blockSize)
-			throws IOException {
-
-		if (!overwrite && exists(f)) {
-			throw new IOException(f.toUri() + " already exists");
-		}
-
-		final S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);
-		if (!bop.hasBucket() || !bop.hasObject()) {
-			throw new IOException(f.toUri() + " is not a valid path to create a new file");
-		}
-
-		if (bufferSize < S3DataOutputStream.MINIMUM_MULTIPART_SIZE) {
-			throw new IOException("Provided buffer must be at least " + S3DataOutputStream.MINIMUM_MULTIPART_SIZE
-				+ " bytes");
-		}
-
-		final byte[] buf = new byte[bufferSize]; // TODO: Use memory manager to allocate larger pages
-
-		return new S3DataOutputStream(this.s3Client, bop.getBucket(), bop.getObject(), buf, this.useRRS);
-	}
-
-
-	@Override
-	public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
-
-		return create(f, overwrite, S3DataOutputStream.MINIMUM_MULTIPART_SIZE, (short) 1, 1024L);
-	}
-
-	private boolean objectRepresentsDirectory(final S3ObjectSummary os) {
-
-		return objectRepresentsDirectory(os.getKey(), os.getSize());
-	}
-
-	private boolean objectRepresentsDirectory(final String name, final long size) {
-
-		if (name.isEmpty()) {
-			return false;
-		}
-
-		if (name.charAt(name.length() - 1) == S3_DIRECTORY_SEPARATOR && size == 0L) {
-			return true;
-		}
-
-		return false;
-	}
-
-	static Path extendPath(final Path parent, final String extension) throws IOException {
-
-		final URI parentUri = parent.toUri();
-
-		if (extension.isEmpty()) {
-			return parent;
-		}
-
-		final String path = parentUri.getPath();
-		String extendedPath;
-		if (path.isEmpty()) {
-			if (extension.charAt(0) == Path.SEPARATOR_CHAR) {
-				extendedPath = extension;
-			} else {
-				extendedPath = Path.SEPARATOR + extension;
-			}
-		} else {
-			if (path.charAt(path.length() - 1) == Path.SEPARATOR_CHAR) {
-				if (extension.charAt(0) == Path.SEPARATOR_CHAR) {
-					if (extension.length() > 1) {
-						extendedPath = path + extension.substring(1);
-					} else {
-						extendedPath = path;
-					}
-				} else {
-					extendedPath = path + extension;
-				}
-			} else {
-				if (extension.charAt(0) == Path.SEPARATOR_CHAR) {
-					extendedPath = path + extension;
-				} else {
-					extendedPath = path + Path.SEPARATOR + extension;
-				}
-			}
-		}
-
-		try {
-			final URI extendedUri = new URI(parentUri.getScheme(),
-				((parentUri.getAuthority() != null) ? parentUri.getAuthority() : ""), extendedPath,
-				parentUri.getQuery(), parentUri.getFragment());
-			return new Path(extendedUri);
-		} catch (URISyntaxException e) {
-			throw new IOException(StringUtils.stringifyException(e));
-		}
-	}
-
-
-	@Override
-	public boolean rename(final Path src, final Path dst) throws IOException {
-
-		throw new UnsupportedOperationException("This method is not yet implemented");
-	}
-
-	@Override
-	public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
-		if (createDirectory) {
-			// make sure that the path is terminated with a slash, S3 is very particular about this
-			outPath = outPath.suffix("/");
-		}
-		return super.initOutPathDistFS(outPath, writeMode, createDirectory);
-	}
-	
-	@Override
-	public boolean isDistributedFS() {
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9005779/flink-runtime/src/test/java/org/apache/flink/runtime/fs/s3/S3FileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/s3/S3FileSystemTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/s3/S3FileSystemTest.java
deleted file mode 100644
index b8cd99d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/s3/S3FileSystemTest.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.fs.s3;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * This test checks the S3 implementation of the {@link FileSystem} interface.
- * 
- */
-public class S3FileSystemTest {
-
-	/**
-	 * The length of the bucket/object names used in this test.
-	 */
-	private static final int NAME_LENGTH = 32;
-
-	/**
-	 * The alphabet to generate the random bucket/object names from.
-	 */
-	private static final char[] ALPHABET = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o',
-		'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };
-
-	/**
-	 * The size of the byte buffer used during the tests in bytes.
-	 */
-	private static final int TEST_BUFFER_SIZE = 128;
-
-	/**
-	 * The size of the small test file in bytes.
-	 */
-	private static final int SMALL_FILE_SIZE = 512;
-
-	/**
-	 * The size of the large test file in bytes.
-	 */
-	private static final int LARGE_FILE_SIZE = 1024 * 1024 * 12; // 12 MB
-
-	/**
-	 * The modulus to be used when generating the test data. Must not be larger than 128.
-	 */
-	private static final int MODULUS = 128;
-
-	private static final String S3_BASE_URI = "s3:///";
-
-	/**
-	 * Tries to read the AWS access key and the AWS secret key from the environments variables. If accessing these keys
-	 * fails, all tests will be skipped and marked as successful.
-	 */
-	@Before
-	public void initKeys() {
-		final String accessKey = System.getenv("AK");
-		final String secretKey = System.getenv("SK");
-		
-		if (accessKey != null || secretKey != null) {
-			Configuration conf = new Configuration();
-			if (accessKey != null) {
-				conf.setString(S3FileSystem.S3_ACCESS_KEY_KEY, accessKey);
-			}
-			if (secretKey != null) {
-				conf.setString(S3FileSystem.S3_SECRET_KEY_KEY, secretKey);
-			}
-			GlobalConfiguration.includeConfiguration(conf);
-		}
-	}
-
-	/**
-	 * This test creates and deletes a bucket inside S3 and checks it is correctly displayed inside the directory
-	 * listing.
-	 */
-	@Test
-	public void createAndDeleteBucketTest() {
-
-		if (!testActivated()) {
-			return;
-		}
-
-		final String bucketName = getRandomName();
-		final Path bucketPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR);
-
-		try {
-
-			final FileSystem fs = bucketPath.getFileSystem();
-
-			// Create directory
-			fs.mkdirs(bucketPath);
-
-			// Check if directory is correctly displayed in file system hierarchy
-			final FileStatus[] content = fs.listStatus(new Path(S3_BASE_URI));
-			boolean entryFound = false;
-			for (final FileStatus entry : content) {
-				if (bucketPath.equals(entry.getPath())) {
-					entryFound = true;
-					break;
-				}
-			}
-
-			if (!entryFound) {
-				fail("Cannot find entry " + bucketName + " in directory " + S3_BASE_URI);
-			}
-
-			// Check the concrete directory file status
-			try {
-				final FileStatus directoryFileStatus = fs.getFileStatus(bucketPath);
-				assertTrue(directoryFileStatus.isDir());
-				assertEquals(0L, directoryFileStatus.getAccessTime());
-				assertTrue(directoryFileStatus.getModificationTime() > 0L);
-
-			} catch (FileNotFoundException e) {
-				fail(e.getMessage());
-			}
-
-			// Delete the bucket
-			fs.delete(bucketPath, true);
-
-			// Make sure the bucket no longer exists
-			try {
-				fs.getFileStatus(bucketPath);
-				fail("Expected FileNotFoundException for " + bucketPath.toUri());
-			} catch (FileNotFoundException e) {
-				// This is an expected exception
-			}
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	/**
-	 * Creates and reads the a larger test file in S3. The test file is generated according to a specific pattern.
-	 * During the read phase the incoming data stream is also checked against this pattern.
-	 */
-	@Test
-	public void createAndReadLargeFileTest() {
-
-		try {
-			createAndReadFileTest(LARGE_FILE_SIZE);
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	/**
-	 * Creates and reads the a small test file in S3. The test file is generated according to a specific pattern.
-	 * During the read phase the incoming data stream is also checked against this pattern.
-	 */
-	@Test
-	public void createAndReadSmallFileTest() {
-
-		try {
-			createAndReadFileTest(SMALL_FILE_SIZE);
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	/**
-	 * The tests checks the mapping of the file system directory structure to the underlying bucket/object model of
-	 * Amazon S3.
-	 */
-	@Test
-	public void multiLevelDirectoryTest() {
-
-		if (!testActivated()) {
-			return;
-		}
-
-		final String dirName = getRandomName();
-		final String subdirName = getRandomName();
-		final String subsubdirName = getRandomName();
-		final String fileName = getRandomName();
-		final Path dir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR);
-		final Path subdir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR);
-		final Path subsubdir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR
-			+ subsubdirName + Path.SEPARATOR);
-		final Path file = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR + fileName);
-
-		try {
-
-			final FileSystem fs = dir.getFileSystem();
-
-			fs.mkdirs(subsubdir);
-
-			final OutputStream os = fs.create(file, true);
-			generateTestData(os, SMALL_FILE_SIZE);
-			os.close();
-
-			// On this directory levels there should only be one subdirectory
-			FileStatus[] list = fs.listStatus(dir);
-			int numberOfDirs = 0;
-			int numberOfFiles = 0;
-			for (final FileStatus entry : list) {
-
-				if (entry.isDir()) {
-					++numberOfDirs;
-					assertEquals(subdir, entry.getPath());
-				} else {
-					fail(entry.getPath() + " is a file which must not appear on this directory level");
-				}
-			}
-
-			assertEquals(1, numberOfDirs);
-			assertEquals(0, numberOfFiles);
-
-			list = fs.listStatus(subdir);
-			numberOfDirs = 0;
-
-			for (final FileStatus entry : list) {
-				if (entry.isDir()) {
-					assertEquals(subsubdir, entry.getPath());
-					++numberOfDirs;
-				} else {
-					assertEquals(file, entry.getPath());
-					++numberOfFiles;
-				}
-			}
-
-			assertEquals(1, numberOfDirs);
-			assertEquals(1, numberOfFiles);
-
-			fs.delete(dir, true);
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	/**
-	 * This test checks the S3 implementation of the file system method to retrieve the block locations of a file.
-	 */
-	@Test
-	public void blockLocationTest() {
-
-		if (!testActivated()) {
-			return;
-		}
-
-		final String dirName = getRandomName();
-		final String fileName = getRandomName();
-		final Path dir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR);
-		final Path file = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + fileName);
-
-		try {
-
-			final FileSystem fs = dir.getFileSystem();
-
-			fs.mkdirs(dir);
-
-			final OutputStream os = fs.create(file, true);
-			generateTestData(os, SMALL_FILE_SIZE);
-			os.close();
-
-			final FileStatus fileStatus = fs.getFileStatus(file);
-			assertNotNull(fileStatus);
-
-			BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 0, SMALL_FILE_SIZE + 1);
-			assertNull(blockLocations);
-
-			blockLocations = fs.getFileBlockLocations(fileStatus, 0, SMALL_FILE_SIZE);
-			assertEquals(1, blockLocations.length);
-
-			final BlockLocation bl = blockLocations[0];
-			assertNotNull(bl.getHosts());
-			assertEquals(1, bl.getHosts().length);
-			assertEquals(SMALL_FILE_SIZE, bl.getLength());
-			assertEquals(0, bl.getOffset());
-			final URI s3Uri = fs.getUri();
-			assertNotNull(s3Uri);
-			assertEquals(s3Uri.getHost(), bl.getHosts()[0]);
-
-			fs.delete(dir, true);
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	/**
-	 * Creates and reads a file with the given size in S3. The test file is generated according to a specific pattern.
-	 * During the read phase the incoming data stream is also checked against this pattern.
-	 * 
-	 * @param fileSize
-	 *        the size of the file to be generated in bytes
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while writing or reading the test file
-	 */
-	private void createAndReadFileTest(final int fileSize) throws IOException {
-
-		if (!testActivated()) {
-			return;
-		}
-
-		final String bucketName = getRandomName();
-		final String objectName = getRandomName();
-		final Path bucketPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR);
-		final Path objectPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR + objectName);
-
-		FileSystem fs = bucketPath.getFileSystem();
-
-		// Create test bucket
-		fs.mkdirs(bucketPath);
-
-		// Write test file to S3
-		final FSDataOutputStream outputStream = fs.create(objectPath, false);
-		generateTestData(outputStream, fileSize);
-		outputStream.close();
-
-		// Now read the same file back from S3
-		final FSDataInputStream inputStream = fs.open(objectPath);
-		testReceivedData(inputStream, fileSize);
-		inputStream.close();
-
-		// Delete test bucket
-		fs.delete(bucketPath, true);
-	}
-
-	/**
-	 * Receives test data from the given input stream and checks the size of the data as well as the pattern inside the
-	 * received data.
-	 * 
-	 * @param inputStream
-	 *        the input stream to read the test data from
-	 * @param expectedSize
-	 *        the expected size of the data to be read from the input stream in bytes
-	 * @throws IOException
-	 *         thrown if an error occurs while reading the data
-	 */
-	private void testReceivedData(final InputStream inputStream, final int expectedSize) throws IOException {
-
-		final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
-
-		int totalBytesRead = 0;
-		int nextExpectedNumber = 0;
-		while (true) {
-
-			final int bytesRead = inputStream.read(testBuffer);
-			if (bytesRead < 0) {
-				break;
-			}
-
-			totalBytesRead += bytesRead;
-
-			for (int i = 0; i < bytesRead; ++i) {
-				if (testBuffer[i] != nextExpectedNumber) {
-					throw new IOException("Read number " + testBuffer[i] + " but expected " + nextExpectedNumber);
-				}
-
-				++nextExpectedNumber;
-
-				if (nextExpectedNumber == MODULUS) {
-					nextExpectedNumber = 0;
-				}
-			}
-		}
-
-		if (totalBytesRead != expectedSize) {
-			throw new IOException("Expected to read " + expectedSize + " bytes but only received " + totalBytesRead);
-		}
-	}
-
-	/**
-	 * Generates test data of the given size according to some specific pattern and writes it to the provided output
-	 * stream.
-	 * 
-	 * @param outputStream
-	 *        the output stream to write the data to
-	 * @param size
-	 *        the size of the test data to be generated in bytes
-	 * @throws IOException
-	 *         thrown if an error occurs while writing the data
-	 */
-	private void generateTestData(final OutputStream outputStream, final int size) throws IOException {
-
-		final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
-		for (int i = 0; i < testBuffer.length; ++i) {
-			testBuffer[i] = (byte) (i % MODULUS);
-		}
-
-		int bytesWritten = 0;
-		while (bytesWritten < size) {
-
-			final int diff = size - bytesWritten;
-			if (diff < testBuffer.length) {
-				outputStream.write(testBuffer, 0, diff);
-				bytesWritten += diff;
-			} else {
-				outputStream.write(testBuffer);
-				bytesWritten += testBuffer.length;
-			}
-		}
-	}
-
-	/**
-	 * Generates a random name.
-	 * 
-	 * @return a random name
-	 */
-	private String getRandomName() {
-
-		final StringBuilder stringBuilder = new StringBuilder();
-		for (int i = 0; i < NAME_LENGTH; ++i) {
-			final char c = ALPHABET[(int) (Math.random() * (double) ALPHABET.length)];
-			stringBuilder.append(c);
-		}
-
-		return stringBuilder.toString();
-	}
-
-	/**
-	 * Checks whether the AWS access key and the AWS secret keys have been successfully loaded from the configuration
-	 * and whether the S3 tests shall be performed.
-	 * 
-	 * @return <code>true</code> if the tests shall be performed, <code>false</code> if the tests shall be skipped
-	 *         because at least one AWS key is missing
-	 */
-	private boolean testActivated() {
-
-		final String accessKey = GlobalConfiguration.getString(S3FileSystem.S3_ACCESS_KEY_KEY, null);
-		final String secretKey = GlobalConfiguration.getString(S3FileSystem.S3_SECRET_KEY_KEY, null);
-
-		if (accessKey != null && secretKey != null) {
-			return true;
-		}
-
-		return false;
-	}
-}


Mime
View raw message