hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject [13/20] HADOOP-11074. Move s3-related FS connector code to hadoop-aws. (David S. Wang via Colin Patrick McCabe)
Date Thu, 11 Sep 2014 07:24:53 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java
new file mode 100644
index 0000000..312bf65
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import java.net.URI;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * <p>
+ * Extracts AWS credentials from the filesystem URI or configuration.
+ * </p>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class S3Credentials {
+  
+  private String accessKey;
+  private String secretAccessKey; 
+
+  /**
+   * @throws IllegalArgumentException if credentials for S3 cannot be
+   * determined.
+   */
+  public void initialize(URI uri, Configuration conf) {
+    if (uri.getHost() == null) {
+      throw new IllegalArgumentException("Invalid hostname in URI " + uri);
+    }
+    
+    String userInfo = uri.getUserInfo();
+    if (userInfo != null) {
+      int index = userInfo.indexOf(':');
+      if (index != -1) {
+        accessKey = userInfo.substring(0, index);
+        secretAccessKey = userInfo.substring(index + 1);
+      } else {
+        accessKey = userInfo;
+      }
+    }
+    
+    String scheme = uri.getScheme();
+    String accessKeyProperty = String.format("fs.%s.awsAccessKeyId", scheme);
+    String secretAccessKeyProperty =
+      String.format("fs.%s.awsSecretAccessKey", scheme);
+    if (accessKey == null) {
+      accessKey = conf.get(accessKeyProperty);
+    }
+    if (secretAccessKey == null) {
+      secretAccessKey = conf.get(secretAccessKeyProperty);
+    }
+    if (accessKey == null && secretAccessKey == null) {
+      throw new IllegalArgumentException("AWS " +
+                                         "Access Key ID and Secret Access " +
+                                         "Key must be specified as the " +
+                                         "username or password " +
+                                         "(respectively) of a " + scheme +
+                                         " URL, or by setting the " +
+                                         accessKeyProperty + " or " +
+                                         secretAccessKeyProperty +
+                                         " properties (respectively).");
+    } else if (accessKey == null) {
+      throw new IllegalArgumentException("AWS " +
+                                         "Access Key ID must be specified " +
+                                         "as the username of a " + scheme +
+                                         " URL, or by setting the " +
+                                         accessKeyProperty + " property.");
+    } else if (secretAccessKey == null) {
+      throw new IllegalArgumentException("AWS " +
+                                         "Secret Access Key must be " +
+                                         "specified as the password of a " +
+                                         scheme + " URL, or by setting the " +
+                                         secretAccessKeyProperty +
+                                         " property.");       
+    }
+
+  }
+  
+  public String getAccessKey() {
+    return accessKey;
+  }
+  
+  public String getSecretAccessKey() {
+    return secretAccessKey;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Exception.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Exception.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Exception.java
new file mode 100644
index 0000000..4f07c4e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Exception.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Thrown if there is a problem communicating with Amazon S3.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class S3Exception extends IOException {
+
+  private static final long serialVersionUID = 1L;
+
+  public S3Exception(Throwable t) {
+    super(t);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java
new file mode 100644
index 0000000..dda3cf6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * <p>
+ * A block-based {@link FileSystem} backed by
+ * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
+ * </p>
+ * @see NativeS3FileSystem
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class S3FileSystem extends FileSystem {
+
+  private URI uri;
+
+  private FileSystemStore store;
+
+  private Path workingDir;
+
+  public S3FileSystem() {
+    // set store in initialize()
+  }
+  
+  public S3FileSystem(FileSystemStore store) {
+    this.store = store;
+  }
+
+  /**
+   * Return the protocol scheme for the FileSystem.
+   * <p/>
+   *
+   * @return <code>s3</code>
+   */
+  @Override
+  public String getScheme() {
+    return "s3";
+  }
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    super.initialize(uri, conf);
+    if (store == null) {
+      store = createDefaultStore(conf);
+    }
+    store.initialize(uri, conf);
+    setConf(conf);
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
+    this.workingDir =
+      new Path("/user", System.getProperty("user.name")).makeQualified(this);
+  }  
+
+  private static FileSystemStore createDefaultStore(Configuration conf) {
+    FileSystemStore store = new Jets3tFileSystemStore();
+    
+    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+                                                                               conf.getInt("fs.s3.maxRetries", 4),
+                                                                               conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
+    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+      new HashMap<Class<? extends Exception>, RetryPolicy>();
+    exceptionToPolicyMap.put(IOException.class, basePolicy);
+    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
+    
+    RetryPolicy methodPolicy = RetryPolicies.retryByException(
+                                                              RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
+    methodNameToPolicyMap.put("storeBlock", methodPolicy);
+    methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
+    
+    return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
+                                               store, methodNameToPolicyMap);
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Override
+  public void setWorkingDirectory(Path dir) {
+    workingDir = makeAbsolute(dir);
+  }
+
+  private Path makeAbsolute(Path path) {
+    if (path.isAbsolute()) {
+      return path;
+    }
+    return new Path(workingDir, path);
+  }
+
+  /**
+   * @param permission Currently ignored.
+   */
+  @Override
+  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    List<Path> paths = new ArrayList<Path>();
+    do {
+      paths.add(0, absolutePath);
+      absolutePath = absolutePath.getParent();
+    } while (absolutePath != null);
+    
+    boolean result = true;
+    for (Path p : paths) {
+      result &= mkdir(p);
+    }
+    return result;
+  }
+  
+  private boolean mkdir(Path path) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    INode inode = store.retrieveINode(absolutePath);
+    if (inode == null) {
+      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
+    } else if (inode.isFile()) {
+      throw new IOException(String.format(
+          "Can't make directory for path %s since it is a file.",
+          absolutePath));
+    }
+    return true;
+  }
+
+  @Override
+  public boolean isFile(Path path) throws IOException {
+    INode inode = store.retrieveINode(makeAbsolute(path));
+    if (inode == null) {
+      return false;
+    }
+    return inode.isFile();
+  }
+
+  private INode checkFile(Path path) throws IOException {
+    INode inode = store.retrieveINode(makeAbsolute(path));
+    if (inode == null) {
+      throw new IOException("No such file.");
+    }
+    if (inode.isDirectory()) {
+      throw new IOException("Path " + path + " is a directory.");
+    }
+    return inode;
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws IOException {
+    Path absolutePath = makeAbsolute(f);
+    INode inode = store.retrieveINode(absolutePath);
+    if (inode == null) {
+      throw new FileNotFoundException("File " + f + " does not exist.");
+    }
+    if (inode.isFile()) {
+      return new FileStatus[] {
+        new S3FileStatus(f.makeQualified(this), inode)
+      };
+    }
+    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
+    for (Path p : store.listSubPaths(absolutePath)) {
+      ret.add(getFileStatus(p.makeQualified(this)));
+    }
+    return ret.toArray(new FileStatus[0]);
+  }
+
+  /** This optional operation is not yet supported. */
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  /**
+   * @param permission Currently ignored.
+   */
+  @Override
+  public FSDataOutputStream create(Path file, FsPermission permission,
+      boolean overwrite, int bufferSize,
+      short replication, long blockSize, Progressable progress)
+    throws IOException {
+
+    INode inode = store.retrieveINode(makeAbsolute(file));
+    if (inode != null) {
+      if (overwrite) {
+        delete(file, true);
+      } else {
+        throw new FileAlreadyExistsException("File already exists: " + file);
+      }
+    } else {
+      Path parent = file.getParent();
+      if (parent != null) {
+        if (!mkdirs(parent)) {
+          throw new IOException("Mkdirs failed to create " + parent.toString());
+        }
+      }      
+    }
+    return new FSDataOutputStream
+        (new S3OutputStream(getConf(), store, makeAbsolute(file),
+                            blockSize, progress, bufferSize),
+         statistics);
+  }
+
+  @Override
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    INode inode = checkFile(path);
+    return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
+                                                   statistics));
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    Path absoluteSrc = makeAbsolute(src);
+    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
+    INode srcINode = store.retrieveINode(absoluteSrc);
+    boolean debugEnabled = LOG.isDebugEnabled();
+    if (srcINode == null) {
+      // src path doesn't exist
+      if (debugEnabled) {
+        LOG.debug(debugPreamble + "returning false as src does not exist");
+      }
+      return false; 
+    }
+
+    Path absoluteDst = makeAbsolute(dst);
+
+    //validate the parent dir of the destination
+    Path dstParent = absoluteDst.getParent();
+    if (dstParent != null) {
+      //if the dst parent is not root, make sure it exists
+      INode dstParentINode = store.retrieveINode(dstParent);
+      if (dstParentINode == null) {
+        // dst parent doesn't exist
+        if (debugEnabled) {
+          LOG.debug(debugPreamble +
+                    "returning false as dst parent does not exist");
+        }
+        return false;
+      }
+      if (dstParentINode.isFile()) {
+        // dst parent exists but is a file
+        if (debugEnabled) {
+          LOG.debug(debugPreamble +
+                    "returning false as dst parent exists and is a file");
+        }
+        return false;
+      }
+    }
+
+    //get status of source
+    boolean srcIsFile = srcINode.isFile();
+
+    INode dstINode = store.retrieveINode(absoluteDst);
+    boolean destExists = dstINode != null;
+    boolean destIsDir = destExists && !dstINode.isFile();
+    if (srcIsFile) {
+
+      //source is a simple file
+      if (destExists) {
+        if (destIsDir) {
+          //outcome #1 dest exists and is dir -filename to subdir of dest
+          if (debugEnabled) {
+            LOG.debug(debugPreamble +
+                      "copying src file under dest dir to " + absoluteDst);
+          }
+          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
+        } else {
+          //outcome #2 dest it's a file: fail iff different from src
+          boolean renamingOnToSelf = absoluteSrc.equals(absoluteDst);
+          if (debugEnabled) {
+            LOG.debug(debugPreamble +
+                      "copying file onto file, outcome is " + renamingOnToSelf);
+          }
+          return renamingOnToSelf;
+        }
+      } else {
+        // #3 dest does not exist: use dest as path for rename
+        if (debugEnabled) {
+          LOG.debug(debugPreamble +
+                    "copying file onto file");
+        }
+      }
+    } else {
+      //here the source exists and is a directory
+      // outcomes (given we know the parent dir exists if we get this far)
+      // #1 destination is a file: fail
+      // #2 destination is a directory: create a new dir under that one
+      // #3 destination doesn't exist: create a new dir with that name
+      // #3 and #4 are only allowed if the dest path is not == or under src
+
+      if (destExists) {
+        if (!destIsDir) {
+          // #1 destination is a file: fail
+          if (debugEnabled) {
+            LOG.debug(debugPreamble +
+                      "returning false as src is a directory, but not dest");
+          }
+          return false;
+        } else {
+          // the destination dir exists
+          // destination for rename becomes a subdir of the target name
+          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
+          if (debugEnabled) {
+            LOG.debug(debugPreamble +
+                      "copying src dir under dest dir to " + absoluteDst);
+          }
+        }
+      }
+      //the final destination directory is now know, so validate it for
+      //illegal moves
+
+      if (absoluteSrc.equals(absoluteDst)) {
+        //you can't rename a directory onto itself
+        if (debugEnabled) {
+          LOG.debug(debugPreamble +
+                    "Dest==source && isDir -failing");
+        }
+        return false;
+      }
+      if (absoluteDst.toString().startsWith(absoluteSrc.toString() + "/")) {
+        //you can't move a directory under itself
+        if (debugEnabled) {
+          LOG.debug(debugPreamble +
+                    "dst is equal to or under src dir -failing");
+        }
+        return false;
+      }
+    }
+    //here the dest path is set up -so rename
+    return renameRecursive(absoluteSrc, absoluteDst);
+  }
+
+  private boolean renameRecursive(Path src, Path dst) throws IOException {
+    INode srcINode = store.retrieveINode(src);
+    store.storeINode(dst, srcINode);
+    store.deleteINode(src);
+    if (srcINode.isDirectory()) {
+      for (Path oldSrc : store.listDeepSubPaths(src)) {
+        INode inode = store.retrieveINode(oldSrc);
+        if (inode == null) {
+          return false;
+        }
+        String oldSrcPath = oldSrc.toUri().getPath();
+        String srcPath = src.toUri().getPath();
+        String dstPath = dst.toUri().getPath();
+        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
+        store.storeINode(newDst, inode);
+        store.deleteINode(oldSrc);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean delete(Path path, boolean recursive) throws IOException {
+   Path absolutePath = makeAbsolute(path);
+   INode inode = store.retrieveINode(absolutePath);
+   if (inode == null) {
+     return false;
+   }
+   if (inode.isFile()) {
+     store.deleteINode(absolutePath);
+     for (Block block: inode.getBlocks()) {
+       store.deleteBlock(block);
+     }
+   } else {
+     FileStatus[] contents = null; 
+     try {
+       contents = listStatus(absolutePath);
+     } catch(FileNotFoundException fnfe) {
+       return false;
+     }
+
+     if ((contents.length !=0) && (!recursive)) {
+       throw new IOException("Directory " + path.toString() 
+           + " is not empty.");
+     }
+     for (FileStatus p:contents) {
+       if (!delete(p.getPath(), recursive)) {
+         return false;
+       }
+     }
+     store.deleteINode(absolutePath);
+   }
+   return true;
+  }
+  
+  /**
+   * FileStatus for S3 file systems. 
+   */
+  @Override
+  public FileStatus getFileStatus(Path f)  throws IOException {
+    INode inode = store.retrieveINode(makeAbsolute(f));
+    if (inode == null) {
+      throw new FileNotFoundException(f + ": No such file or directory.");
+    }
+    return new S3FileStatus(f.makeQualified(this), inode);
+  }
+  
+  @Override
+  public long getDefaultBlockSize() {
+    return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
+  }
+
+  @Override
+  public String getCanonicalServiceName() {
+    // Does not support Token
+    return null;
+  }
+
+  // diagnostic methods
+
+  void dump() throws IOException {
+    store.dump();
+  }
+
+  void purge() throws IOException {
+    store.purge();
+  }
+
+  private static class S3FileStatus extends FileStatus {
+
+    S3FileStatus(Path f, INode inode) throws IOException {
+      super(findLength(inode), inode.isDirectory(), 1,
+            findBlocksize(inode), 0, f);
+    }
+
+    private static long findLength(INode inode) {
+      if (!inode.isDirectory()) {
+        long length = 0L;
+        for (Block block : inode.getBlocks()) {
+          length += block.getLength();
+        }
+        return length;
+      }
+      return 0;
+    }
+
+    private static long findBlocksize(INode inode) {
+      final Block[] ret = inode.getBlocks();
+      return ret == null ? 0L : ret[0].getLength();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemConfigKeys.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemConfigKeys.java
new file mode 100644
index 0000000..8172a46
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemConfigKeys.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+
+/** 
+ * This class contains constants for configuration keys used
+ * in the s3 file system. 
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class S3FileSystemConfigKeys extends CommonConfigurationKeys {
+  public static final String  S3_BLOCK_SIZE_KEY = "s3.blocksize";
+  public static final long    S3_BLOCK_SIZE_DEFAULT = 64*1024*1024;
+  public static final String  S3_REPLICATION_KEY = "s3.replication";
+  public static final short   S3_REPLICATION_DEFAULT = 1;
+  public static final String  S3_STREAM_BUFFER_SIZE_KEY = 
+                                                    "s3.stream-buffer-size";
+  public static final int     S3_STREAM_BUFFER_SIZE_DEFAULT = 4096;
+  public static final String  S3_BYTES_PER_CHECKSUM_KEY = 
+                                                    "s3.bytes-per-checksum";
+  public static final int     S3_BYTES_PER_CHECKSUM_DEFAULT = 512;
+  public static final String  S3_CLIENT_WRITE_PACKET_SIZE_KEY =
+                                                    "s3.client-write-packet-size";
+  public static final int     S3_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+}
+  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemException.java
new file mode 100644
index 0000000..cc1b463
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Thrown when there is a fatal exception while using {@link S3FileSystem}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class S3FileSystemException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public S3FileSystemException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3InputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3InputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3InputStream.java
new file mode 100644
index 0000000..5af57e6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3InputStream.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class S3InputStream extends FSInputStream {
+
+  private FileSystemStore store;
+
+  private Block[] blocks;
+
+  private boolean closed;
+
+  private long fileLength;
+
+  private long pos = 0;
+
+  private File blockFile;
+  
+  private DataInputStream blockStream;
+
+  private long blockEnd = -1;
+  
+  private FileSystem.Statistics stats;
+  
+  private static final Log LOG = 
+    LogFactory.getLog(S3InputStream.class.getName());
+
+
+  @Deprecated
+  public S3InputStream(Configuration conf, FileSystemStore store,
+                       INode inode) {
+    this(conf, store, inode, null);
+  }
+
+  public S3InputStream(Configuration conf, FileSystemStore store,
+                       INode inode, FileSystem.Statistics stats) {
+    
+    this.store = store;
+    this.stats = stats;
+    this.blocks = inode.getBlocks();
+    for (Block block : blocks) {
+      this.fileLength += block.getLength();
+    }
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    return pos;
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    return (int) (fileLength - pos);
+  }
+
+  @Override
+  public synchronized void seek(long targetPos) throws IOException {
+    if (targetPos > fileLength) {
+      throw new IOException("Cannot seek after EOF");
+    }
+    pos = targetPos;
+    blockEnd = -1;
+  }
+
+  @Override
+  public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    int result = -1;
+    if (pos < fileLength) {
+      if (pos > blockEnd) {
+        blockSeekTo(pos);
+      }
+      result = blockStream.read();
+      if (result >= 0) {
+        pos++;
+      }
+    }
+    if (stats != null && result >= 0) {
+      stats.incrementBytesRead(1);
+    }
+    return result;
+  }
+
+  @Override
+  public synchronized int read(byte buf[], int off, int len) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    if (pos < fileLength) {
+      if (pos > blockEnd) {
+        blockSeekTo(pos);
+      }
+      int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
+      int result = blockStream.read(buf, off, realLen);
+      if (result >= 0) {
+        pos += result;
+      }
+      if (stats != null && result > 0) {
+        stats.incrementBytesRead(result);
+      }
+      return result;
+    }
+    return -1;
+  }
+
+  private synchronized void blockSeekTo(long target) throws IOException {
+    //
+    // Compute desired block
+    //
+    int targetBlock = -1;
+    long targetBlockStart = 0;
+    long targetBlockEnd = 0;
+    for (int i = 0; i < blocks.length; i++) {
+      long blockLength = blocks[i].getLength();
+      targetBlockEnd = targetBlockStart + blockLength - 1;
+
+      if (target >= targetBlockStart && target <= targetBlockEnd) {
+        targetBlock = i;
+        break;
+      } else {
+        targetBlockStart = targetBlockEnd + 1;
+      }
+    }
+    if (targetBlock < 0) {
+      throw new IOException(
+                            "Impossible situation: could not find target position " + target);
+    }
+    long offsetIntoBlock = target - targetBlockStart;
+
+    // read block blocks[targetBlock] from position offsetIntoBlock
+
+    this.blockFile = store.retrieveBlock(blocks[targetBlock], offsetIntoBlock);
+
+    this.pos = target;
+    this.blockEnd = targetBlockEnd;
+    this.blockStream = new DataInputStream(new FileInputStream(blockFile));
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    if (blockStream != null) {
+      blockStream.close();
+      blockStream = null;
+    }
+    if (blockFile != null) {
+      boolean b = blockFile.delete();
+      if (!b) {
+        LOG.warn("Ignoring failed delete");
+      }
+    }
+    super.close();
+    closed = true;
+  }
+
+  /**
+   * We don't support marks.
+   */
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  @Override
+  public void mark(int readLimit) {
+    // Do nothing
+  }
+
+  @Override
+  public void reset() throws IOException {
+    throw new IOException("Mark not supported");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3OutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3OutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3OutputStream.java
new file mode 100644
index 0000000..761f2ce
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3OutputStream.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.INode.FileType;
+import org.apache.hadoop.util.Progressable;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class S3OutputStream extends OutputStream {
+
+  private Configuration conf;
+  
+  private int bufferSize;
+
+  private FileSystemStore store;
+
+  private Path path;
+
+  private long blockSize;
+
+  private File backupFile;
+
+  private OutputStream backupStream;
+
+  private Random r = new Random();
+
+  private boolean closed;
+
+  private int pos = 0;
+
+  private long filePos = 0;
+
+  private int bytesWrittenToBlock = 0;
+
+  private byte[] outBuf;
+
+  private List<Block> blocks = new ArrayList<Block>();
+
+  private Block nextBlock;
+  
+  private static final Log LOG = 
+    LogFactory.getLog(S3OutputStream.class.getName());
+
+
+  public S3OutputStream(Configuration conf, FileSystemStore store,
+                        Path path, long blockSize, Progressable progress,
+                        int buffersize) throws IOException {
+    
+    this.conf = conf;
+    this.store = store;
+    this.path = path;
+    this.blockSize = blockSize;
+    this.backupFile = newBackupFile();
+    this.backupStream = new FileOutputStream(backupFile);
+    this.bufferSize = buffersize;
+    this.outBuf = new byte[bufferSize];
+
+  }
+
+  private File newBackupFile() throws IOException {
+    File dir = new File(conf.get("fs.s3.buffer.dir"));
+    if (!dir.exists() && !dir.mkdirs()) {
+      throw new IOException("Cannot create S3 buffer directory: " + dir);
+    }
+    File result = File.createTempFile("output-", ".tmp", dir);
+    result.deleteOnExit();
+    return result;
+  }
+
+  public long getPos() throws IOException {
+    return filePos;
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
+      flush();
+    }
+    outBuf[pos++] = (byte) b;
+    filePos++;
+  }
+
+  @Override
+  public synchronized void write(byte b[], int off, int len) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    while (len > 0) {
+      int remaining = bufferSize - pos;
+      int toWrite = Math.min(remaining, len);
+      System.arraycopy(b, off, outBuf, pos, toWrite);
+      pos += toWrite;
+      off += toWrite;
+      len -= toWrite;
+      filePos += toWrite;
+
+      if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
+        flush();
+      }
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    if (bytesWrittenToBlock + pos >= blockSize) {
+      flushData((int) blockSize - bytesWrittenToBlock);
+    }
+    if (bytesWrittenToBlock == blockSize) {
+      endBlock();
+    }
+    flushData(pos);
+  }
+
+  private synchronized void flushData(int maxPos) throws IOException {
+    int workingPos = Math.min(pos, maxPos);
+
+    if (workingPos > 0) {
+      //
+      // To the local block backup, write just the bytes
+      //
+      backupStream.write(outBuf, 0, workingPos);
+
+      //
+      // Track position
+      //
+      bytesWrittenToBlock += workingPos;
+      System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
+      pos -= workingPos;
+    }
+  }
+
+  private synchronized void endBlock() throws IOException {
+    //
+    // Done with local copy
+    //
+    backupStream.close();
+
+    //
+    // Send it to S3
+    //
+    // TODO: Use passed in Progressable to report progress.
+    nextBlockOutputStream();
+    store.storeBlock(nextBlock, backupFile);
+    internalClose();
+
+    //
+    // Delete local backup, start new one
+    //
+    boolean b = backupFile.delete();
+    if (!b) {
+      LOG.warn("Ignoring failed delete");
+    }
+    backupFile = newBackupFile();
+    backupStream = new FileOutputStream(backupFile);
+    bytesWrittenToBlock = 0;
+  }
+
+  private synchronized void nextBlockOutputStream() throws IOException {
+    long blockId = r.nextLong();
+    while (store.blockExists(blockId)) {
+      blockId = r.nextLong();
+    }
+    nextBlock = new Block(blockId, bytesWrittenToBlock);
+    blocks.add(nextBlock);
+    bytesWrittenToBlock = 0;
+  }
+
+  private synchronized void internalClose() throws IOException {
+    INode inode = new INode(FileType.FILE, blocks.toArray(new Block[blocks
+                                                                    .size()]));
+    store.storeINode(path, inode);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    flush();
+    if (filePos == 0 || bytesWrittenToBlock != 0) {
+      endBlock();
+    }
+
+    backupStream.close();
+    boolean b = backupFile.delete();
+    if (!b) {
+      LOG.warn("Ignoring failed delete");
+    }
+
+    super.close();
+
+    closed = true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/VersionMismatchException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/VersionMismatchException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/VersionMismatchException.java
new file mode 100644
index 0000000..ccc8969
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/VersionMismatchException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Thrown when Hadoop cannot read the version of the data stored
+ * in {@link S3FileSystem}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class VersionMismatchException extends S3FileSystemException {
+  private static final long serialVersionUID = 1L;
+
+  public VersionMismatchException(String clientVersion, String dataVersion) {
+    super("Version mismatch: client expects version " + clientVersion +
+        ", but data has version " +
+        (dataVersion == null ? "[unversioned]" : dataVersion));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/package.html
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/package.html b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/package.html
new file mode 100644
index 0000000..dd601e1
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/package.html
@@ -0,0 +1,55 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+
+<p>A distributed, block-based implementation of {@link
+org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>
+as a backing store.</p>
+
+<p>
+Files are stored in S3 as blocks (represented by 
+{@link org.apache.hadoop.fs.s3.Block}), which have an ID and a length.
+Block metadata is stored in S3 as a small record (represented by 
+{@link org.apache.hadoop.fs.s3.INode}) using the URL-encoded
+path string as a key. Inodes record the file type (regular file or directory) and the list of blocks.
+This design makes it easy to seek to any given position in a file by reading the inode data to compute
+which block to access, then using S3's support for 
+<a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.2">HTTP Range</a> headers
+to start streaming from the correct position.
+Renames are also efficient since only the inode is moved (by a DELETE followed by a PUT since 
+S3 does not support renames).
+</p>
+<p>
+For a single file <i>/dir1/file1</i> which takes two blocks of storage, the file structure in S3
+would be something like this:
+</p>
+<pre>
+/
+/dir1
+/dir1/file1
+block-6415776850131549260
+block-3026438247347758425
+</pre>
+<p>
+Inodes start with a leading <code>/</code>, while blocks are prefixed with <code>block-</code>.
+</p>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/FileMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/FileMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/FileMetadata.java
new file mode 100644
index 0000000..2746af4
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/FileMetadata.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * <p>
+ * Holds basic metadata for a file stored in a {@link NativeFileSystemStore}.
+ * </p>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class FileMetadata {
+  private final String key;
+  private final long length;
+  private final long lastModified;
+  
+  public FileMetadata(String key, long length, long lastModified) {
+    this.key = key;
+    this.length = length;
+    this.lastModified = lastModified;
+  }
+  
+  public String getKey() {
+    return key;
+  }
+  
+  public long getLength() {
+    return length;
+  }
+
+  public long getLastModified() {
+    return lastModified;
+  }
+  
+  @Override
+  public String toString() {
+    return "FileMetadata[" + key + ", " + length + ", " + lastModified + "]";
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
new file mode 100644
index 0000000..a10d6f2
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import static org.apache.hadoop.fs.s3native.NativeS3FileSystem.PATH_DELIMITER;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.s3.S3Credentials;
+import org.apache.hadoop.fs.s3.S3Exception;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.jets3t.service.S3Service;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.ServiceException;
+import org.jets3t.service.StorageObjectsChunk;
+import org.jets3t.service.impl.rest.HttpException;
+import org.jets3t.service.impl.rest.httpclient.RestS3Service;
+import org.jets3t.service.model.MultipartPart;
+import org.jets3t.service.model.MultipartUpload;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.model.StorageObject;
+import org.jets3t.service.security.AWSCredentials;
+import org.jets3t.service.utils.MultipartUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
+  
+  private S3Service s3Service;
+  private S3Bucket bucket;
+
+  private long multipartBlockSize;
+  private boolean multipartEnabled;
+  private long multipartCopyBlockSize;
+  static final long MAX_PART_SIZE = (long)5 * 1024 * 1024 * 1024;
+
+  private String serverSideEncryptionAlgorithm;
+  
+  public static final Logger LOG =
+      LoggerFactory.getLogger(Jets3tNativeFileSystemStore.class);
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    S3Credentials s3Credentials = new S3Credentials();
+    s3Credentials.initialize(uri, conf);
+    try {
+      AWSCredentials awsCredentials =
+        new AWSCredentials(s3Credentials.getAccessKey(),
+            s3Credentials.getSecretAccessKey());
+      this.s3Service = new RestS3Service(awsCredentials);
+    } catch (S3ServiceException e) {
+      handleException(e);
+    }
+    multipartEnabled =
+        conf.getBoolean("fs.s3n.multipart.uploads.enabled", false);
+    multipartBlockSize = Math.min(
+        conf.getLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024),
+        MAX_PART_SIZE);
+    multipartCopyBlockSize = Math.min(
+        conf.getLong("fs.s3n.multipart.copy.block.size", MAX_PART_SIZE),
+        MAX_PART_SIZE);
+    serverSideEncryptionAlgorithm = conf.get("fs.s3n.server-side-encryption-algorithm");
+
+    bucket = new S3Bucket(uri.getHost());
+  }
+  
+  @Override
+  public void storeFile(String key, File file, byte[] md5Hash)
+    throws IOException {
+
+    if (multipartEnabled && file.length() >= multipartBlockSize) {
+      storeLargeFile(key, file, md5Hash);
+      return;
+    }
+
+    BufferedInputStream in = null;
+    try {
+      in = new BufferedInputStream(new FileInputStream(file));
+      S3Object object = new S3Object(key);
+      object.setDataInputStream(in);
+      object.setContentType("binary/octet-stream");
+      object.setContentLength(file.length());
+      object.setServerSideEncryptionAlgorithm(serverSideEncryptionAlgorithm);
+      if (md5Hash != null) {
+        object.setMd5Hash(md5Hash);
+      }
+      s3Service.putObject(bucket, object);
+    } catch (ServiceException e) {
+      handleException(e, key);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+
+  public void storeLargeFile(String key, File file, byte[] md5Hash)
+      throws IOException {
+    S3Object object = new S3Object(key);
+    object.setDataInputFile(file);
+    object.setContentType("binary/octet-stream");
+    object.setContentLength(file.length());
+    object.setServerSideEncryptionAlgorithm(serverSideEncryptionAlgorithm);
+    if (md5Hash != null) {
+      object.setMd5Hash(md5Hash);
+    }
+
+    List<StorageObject> objectsToUploadAsMultipart =
+        new ArrayList<StorageObject>();
+    objectsToUploadAsMultipart.add(object);
+    MultipartUtils mpUtils = new MultipartUtils(multipartBlockSize);
+
+    try {
+      mpUtils.uploadObjects(bucket.getName(), s3Service,
+                            objectsToUploadAsMultipart, null);
+    } catch (Exception e) {
+      handleException(e, key);
+    }
+  }
+  
+  @Override
+  public void storeEmptyFile(String key) throws IOException {
+    try {
+      S3Object object = new S3Object(key);
+      object.setDataInputStream(new ByteArrayInputStream(new byte[0]));
+      object.setContentType("binary/octet-stream");
+      object.setContentLength(0);
+      object.setServerSideEncryptionAlgorithm(serverSideEncryptionAlgorithm);
+      s3Service.putObject(bucket, object);
+    } catch (ServiceException e) {
+      handleException(e, key);
+    }
+  }
+
+  @Override
+  public FileMetadata retrieveMetadata(String key) throws IOException {
+    StorageObject object = null;
+    try {
+      LOG.debug("Getting metadata for key: {} from bucket: {}",
+          key, bucket.getName());
+      object = s3Service.getObjectDetails(bucket.getName(), key);
+      return new FileMetadata(key, object.getContentLength(),
+          object.getLastModifiedDate().getTime());
+
+    } catch (ServiceException e) {
+      try {
+        // process
+        handleException(e, key);
+        return null;
+      } catch (FileNotFoundException fnfe) {
+        // and downgrade missing files
+        return null;
+      }
+    } finally {
+      if (object != null) {
+        object.closeDataInputStream();
+      }
+    }
+  }
+
+  /**
+   * @param key
+   * The key is the object name that is being retrieved from the S3 bucket
+   * @return
+   * This method returns null if the key is not found
+   * @throws IOException
+   */
+
+  @Override
+  public InputStream retrieve(String key) throws IOException {
+    try {
+      LOG.debug("Getting key: {} from bucket: {}",
+          key, bucket.getName());
+      S3Object object = s3Service.getObject(bucket.getName(), key);
+      return object.getDataInputStream();
+    } catch (ServiceException e) {
+      handleException(e, key);
+      return null; //return null if key not found
+    }
+  }
+
+  /**
+   *
+   * @param key
+   * The key is the object name that is being retrieved from the S3 bucket
+   * @return
+   * This method returns null if the key is not found
+   * @throws IOException
+   */
+
+  @Override
+  public InputStream retrieve(String key, long byteRangeStart)
+          throws IOException {
+    try {
+      LOG.debug("Getting key: {} from bucket: {} with byteRangeStart: {}",
+          key, bucket.getName(), byteRangeStart);
+      S3Object object = s3Service.getObject(bucket, key, null, null, null,
+                                            null, byteRangeStart, null);
+      return object.getDataInputStream();
+    } catch (ServiceException e) {
+      handleException(e, key);
+      return null;
+    }
+  }
+
+  @Override
+  public PartialListing list(String prefix, int maxListingLength)
+          throws IOException {
+    return list(prefix, maxListingLength, null, false);
+  }
+  
+  @Override
+  public PartialListing list(String prefix, int maxListingLength, String priorLastKey,
+      boolean recurse) throws IOException {
+
+    return list(prefix, recurse ? null : PATH_DELIMITER, maxListingLength, priorLastKey);
+  }
+
+  /**
+   * list objects
+   * @param prefix prefix
+   * @param delimiter delimiter
+   * @param maxListingLength max no. of entries
+   * @param priorLastKey last key in any previous search
+   * @return a list of matches
+   * @throws IOException on any reported failure
+   */
+
+  private PartialListing list(String prefix, String delimiter,
+      int maxListingLength, String priorLastKey) throws IOException {
+    try {
+      if (!prefix.isEmpty() && !prefix.endsWith(PATH_DELIMITER)) {
+        prefix += PATH_DELIMITER;
+      }
+      StorageObjectsChunk chunk = s3Service.listObjectsChunked(bucket.getName(),
+          prefix, delimiter, maxListingLength, priorLastKey);
+      
+      FileMetadata[] fileMetadata =
+        new FileMetadata[chunk.getObjects().length];
+      for (int i = 0; i < fileMetadata.length; i++) {
+        StorageObject object = chunk.getObjects()[i];
+        fileMetadata[i] = new FileMetadata(object.getKey(),
+            object.getContentLength(), object.getLastModifiedDate().getTime());
+      }
+      return new PartialListing(chunk.getPriorLastKey(), fileMetadata,
+          chunk.getCommonPrefixes());
+    } catch (ServiceException e) {
+      handleException(e, prefix);
+      return null; // never returned - keep compiler happy
+    }
+  }
+
+  @Override
+  public void delete(String key) throws IOException {
+    try {
+      LOG.debug("Deleting key: {} from bucket: {}",
+          key, bucket.getName());
+      s3Service.deleteObject(bucket, key);
+    } catch (ServiceException e) {
+      handleException(e, key);
+    }
+  }
+
+  public void rename(String srcKey, String dstKey) throws IOException {
+    try {
+      s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey));
+    } catch (ServiceException e) {
+      handleException(e, srcKey);
+    }
+  }
+  
+  @Override
+  public void copy(String srcKey, String dstKey) throws IOException {
+    try {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName());
+      }
+      if (multipartEnabled) {
+        S3Object object = s3Service.getObjectDetails(bucket, srcKey, null,
+                                                     null, null, null);
+        if (multipartCopyBlockSize > 0 &&
+            object.getContentLength() > multipartCopyBlockSize) {
+          copyLargeFile(object, dstKey);
+          return;
+        }
+      }
+
+      S3Object dstObject = new S3Object(dstKey);
+      dstObject.setServerSideEncryptionAlgorithm(serverSideEncryptionAlgorithm);
+      s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
+          dstObject, false);
+    } catch (ServiceException e) {
+      handleException(e, srcKey);
+    }
+  }
+
+  public void copyLargeFile(S3Object srcObject, String dstKey) throws IOException {
+    try {
+      long partCount = srcObject.getContentLength() / multipartCopyBlockSize +
+          (srcObject.getContentLength() % multipartCopyBlockSize > 0 ? 1 : 0);
+
+      MultipartUpload multipartUpload = s3Service.multipartStartUpload
+          (bucket.getName(), dstKey, srcObject.getMetadataMap());
+
+      List<MultipartPart> listedParts = new ArrayList<MultipartPart>();
+      for (int i = 0; i < partCount; i++) {
+        long byteRangeStart = i * multipartCopyBlockSize;
+        long byteLength;
+        if (i < partCount - 1) {
+          byteLength = multipartCopyBlockSize;
+        } else {
+          byteLength = srcObject.getContentLength() % multipartCopyBlockSize;
+          if (byteLength == 0) {
+            byteLength = multipartCopyBlockSize;
+          }
+        }
+
+        MultipartPart copiedPart = s3Service.multipartUploadPartCopy
+            (multipartUpload, i + 1, bucket.getName(), srcObject.getKey(),
+             null, null, null, null, byteRangeStart,
+             byteRangeStart + byteLength - 1, null);
+        listedParts.add(copiedPart);
+      }
+      
+      Collections.reverse(listedParts);
+      s3Service.multipartCompleteUpload(multipartUpload, listedParts);
+    } catch (ServiceException e) {
+      handleException(e, srcObject.getKey());
+    }
+  }
+
+  @Override
+  public void purge(String prefix) throws IOException {
+    String key = "";
+    try {
+      S3Object[] objects =
+          s3Service.listObjects(bucket.getName(), prefix, null);
+      for (S3Object object : objects) {
+        key = object.getKey();
+        s3Service.deleteObject(bucket, key);
+      }
+    } catch (S3ServiceException e) {
+      handleException(e, key);
+    }
+  }
+
+  @Override
+  public void dump() throws IOException {
+    StringBuilder sb = new StringBuilder("S3 Native Filesystem, ");
+    sb.append(bucket.getName()).append("\n");
+    try {
+      S3Object[] objects = s3Service.listObjects(bucket.getName());
+      for (S3Object object : objects) {
+        sb.append(object.getKey()).append("\n");
+      }
+    } catch (S3ServiceException e) {
+      handleException(e);
+    }
+    System.out.println(sb);
+  }
+
+  /**
+   * Handle any service exception by translating it into an IOException
+   * @param e exception
+   * @throws IOException exception -always
+   */
+  private void handleException(Exception e) throws IOException {
+    throw processException(e, e, "");
+  }
+  /**
+   * Handle any service exception by translating it into an IOException
+   * @param e exception
+   * @param key key sought from object store
+
+   * @throws IOException exception -always
+   */
+  private void handleException(Exception e, String key) throws IOException {
+    throw processException(e, e, key);
+  }
+
+  /**
+   * Handle any service exception by translating it into an IOException
+   * @param thrown exception
+   * @param original original exception -thrown if no other translation could
+   * be made
+   * @param key key sought from object store or "" for undefined
+   * @return an exception to throw. If isProcessingCause==true this may be null.
+   */
+  private IOException processException(Throwable thrown, Throwable original,
+      String key) {
+    IOException result;
+    if (thrown.getCause() != null) {
+      // recurse down
+      result = processException(thrown.getCause(), original, key);
+    } else if (thrown instanceof HttpException) {
+      // nested HttpException - examine error code and react
+      HttpException httpException = (HttpException) thrown;
+      String responseMessage = httpException.getResponseMessage();
+      int responseCode = httpException.getResponseCode();
+      String bucketName = "s3n://" + bucket.getName();
+      String text = String.format("%s : %03d : %s",
+          bucketName,
+          responseCode,
+          responseMessage);
+      String filename = !key.isEmpty() ? (bucketName + "/" + key) : text;
+      IOException ioe;
+      switch (responseCode) {
+        case 404:
+          result = new FileNotFoundException(filename);
+          break;
+        case 416: // invalid range
+          result = new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF
+                                    +": " + filename);
+          break;
+        case 403: //forbidden
+          result = new AccessControlException("Permission denied"
+                                    +": " + filename);
+          break;
+        default:
+          result = new IOException(text);
+      }
+      result.initCause(thrown);
+    } else if (thrown instanceof S3ServiceException) {
+      S3ServiceException se = (S3ServiceException) thrown;
+      LOG.debug(
+          "S3ServiceException: {}: {} : {}",
+          se.getS3ErrorCode(), se.getS3ErrorMessage(), se, se);
+      if ("InvalidRange".equals(se.getS3ErrorCode())) {
+        result = new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+      } else {
+        result = new S3Exception(se);
+      }
+    } else if (thrown instanceof ServiceException) {
+      ServiceException se = (ServiceException) thrown;
+      LOG.debug("S3ServiceException: {}: {} : {}",
+          se.getErrorCode(), se.toString(), se, se);
+      result = new S3Exception(se);
+    } else if (thrown instanceof IOException) {
+      result = (IOException) thrown;
+    } else {
+      // here there is no exception derived yet.
+      // this means no inner cause, and no translation made yet.
+      // convert the original to an IOException -rather than just the
+      // exception at the base of the tree
+      result = new S3Exception(original);
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
new file mode 100644
index 0000000..f26cdac
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * <p>
+ * An abstraction for a key-based {@link File} store.
+ * </p>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+interface NativeFileSystemStore {
+  
+  void initialize(URI uri, Configuration conf) throws IOException;
+  
+  void storeFile(String key, File file, byte[] md5Hash) throws IOException;
+  void storeEmptyFile(String key) throws IOException;
+  
+  FileMetadata retrieveMetadata(String key) throws IOException;
+  InputStream retrieve(String key) throws IOException;
+  InputStream retrieve(String key, long byteRangeStart) throws IOException;
+  
+  PartialListing list(String prefix, int maxListingLength) throws IOException;
+  PartialListing list(String prefix, int maxListingLength, String priorLastKey, boolean recursive)
+    throws IOException;
+  
+  void delete(String key) throws IOException;
+
+  void copy(String srcKey, String dstKey) throws IOException;
+  
+  /**
+   * Delete all keys with the given prefix. Used for testing.
+   * @throws IOException
+   */
+  void purge(String prefix) throws IOException;
+  
+  /**
+   * Diagnostic method to dump state to the console.
+   * @throws IOException
+   */
+  void dump() throws IOException;
+}


Mime
View raw message