Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 437AC114B3 for ; Fri, 12 Sep 2014 18:46:53 +0000 (UTC) Received: (qmail 53825 invoked by uid 500); 12 Sep 2014 18:46:46 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 53715 invoked by uid 500); 12 Sep 2014 18:46:45 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 53545 invoked by uid 99); 12 Sep 2014 18:46:45 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Sep 2014 18:46:45 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A6B49A11F81; Fri, 12 Sep 2014 18:46:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: arp@apache.org To: common-commits@hadoop.apache.org Date: Fri, 12 Sep 2014 18:46:57 -0000 Message-Id: <075d6ce2a26f42eebbd968206bcbe230@git.apache.org> In-Reply-To: <30645674436a4925adf47a74fed60e1d@git.apache.org> References: <30645674436a4925adf47a74fed60e1d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/41] HADOOP-11074. Move s3-related FS connector code to hadoop-aws. (David S. Wang via Colin Patrick McCabe) http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java new file mode 100644 index 0000000..312bf65 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.net.URI; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +/** + *

+ * Extracts AWS credentials from the filesystem URI or configuration. + *

+ */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class S3Credentials { + + private String accessKey; + private String secretAccessKey; + + /** + * @throws IllegalArgumentException if credentials for S3 cannot be + * determined. + */ + public void initialize(URI uri, Configuration conf) { + if (uri.getHost() == null) { + throw new IllegalArgumentException("Invalid hostname in URI " + uri); + } + + String userInfo = uri.getUserInfo(); + if (userInfo != null) { + int index = userInfo.indexOf(':'); + if (index != -1) { + accessKey = userInfo.substring(0, index); + secretAccessKey = userInfo.substring(index + 1); + } else { + accessKey = userInfo; + } + } + + String scheme = uri.getScheme(); + String accessKeyProperty = String.format("fs.%s.awsAccessKeyId", scheme); + String secretAccessKeyProperty = + String.format("fs.%s.awsSecretAccessKey", scheme); + if (accessKey == null) { + accessKey = conf.get(accessKeyProperty); + } + if (secretAccessKey == null) { + secretAccessKey = conf.get(secretAccessKeyProperty); + } + if (accessKey == null && secretAccessKey == null) { + throw new IllegalArgumentException("AWS " + + "Access Key ID and Secret Access " + + "Key must be specified as the " + + "username or password " + + "(respectively) of a " + scheme + + " URL, or by setting the " + + accessKeyProperty + " or " + + secretAccessKeyProperty + + " properties (respectively)."); + } else if (accessKey == null) { + throw new IllegalArgumentException("AWS " + + "Access Key ID must be specified " + + "as the username of a " + scheme + + " URL, or by setting the " + + accessKeyProperty + " property."); + } else if (secretAccessKey == null) { + throw new IllegalArgumentException("AWS " + + "Secret Access Key must be " + + "specified as the password of a " + + scheme + " URL, or by setting the " + + secretAccessKeyProperty + + " property."); + } + + } + + public String getAccessKey() { + return accessKey; + } + + public String getSecretAccessKey() { + return secretAccessKey; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Exception.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Exception.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Exception.java new file mode 100644 index 0000000..4f07c4e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Exception.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Thrown if there is a problem communicating with Amazon S3. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class S3Exception extends IOException { + + private static final long serialVersionUID = 1L; + + public S3Exception(Throwable t) { + super(t); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java new file mode 100644 index 0000000..dda3cf6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java @@ -0,0 +1,486 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.s3native.NativeS3FileSystem; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.util.Progressable; + +/** + *

+ * A block-based {@link FileSystem} backed by + * Amazon S3. + *

+ * @see NativeS3FileSystem + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class S3FileSystem extends FileSystem { + + private URI uri; + + private FileSystemStore store; + + private Path workingDir; + + public S3FileSystem() { + // set store in initialize() + } + + public S3FileSystem(FileSystemStore store) { + this.store = store; + } + + /** + * Return the protocol scheme for the FileSystem. + *

+ * + * @return s3 + */ + @Override + public String getScheme() { + return "s3"; + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + if (store == null) { + store = createDefaultStore(conf); + } + store.initialize(uri, conf); + setConf(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + this.workingDir = + new Path("/user", System.getProperty("user.name")).makeQualified(this); + } + + private static FileSystemStore createDefaultStore(Configuration conf) { + FileSystemStore store = new Jets3tFileSystemStore(); + + RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( + conf.getInt("fs.s3.maxRetries", 4), + conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); + Map,RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(IOException.class, basePolicy); + exceptionToPolicyMap.put(S3Exception.class, basePolicy); + + RetryPolicy methodPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + Map methodNameToPolicyMap = new HashMap(); + methodNameToPolicyMap.put("storeBlock", methodPolicy); + methodNameToPolicyMap.put("retrieveBlock", methodPolicy); + + return (FileSystemStore) RetryProxy.create(FileSystemStore.class, + store, methodNameToPolicyMap); + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Override + public void setWorkingDirectory(Path dir) { + workingDir = makeAbsolute(dir); + } + + private Path makeAbsolute(Path path) { + if (path.isAbsolute()) { + return path; + } + return new Path(workingDir, path); + } + + /** + * @param permission Currently ignored. + */ + @Override + public boolean mkdirs(Path path, FsPermission permission) throws IOException { + Path absolutePath = makeAbsolute(path); + List paths = new ArrayList(); + do { + paths.add(0, absolutePath); + absolutePath = absolutePath.getParent(); + } while (absolutePath != null); + + boolean result = true; + for (Path p : paths) { + result &= mkdir(p); + } + return result; + } + + private boolean mkdir(Path path) throws IOException { + Path absolutePath = makeAbsolute(path); + INode inode = store.retrieveINode(absolutePath); + if (inode == null) { + store.storeINode(absolutePath, INode.DIRECTORY_INODE); + } else if (inode.isFile()) { + throw new IOException(String.format( + "Can't make directory for path %s since it is a file.", + absolutePath)); + } + return true; + } + + @Override + public boolean isFile(Path path) throws IOException { + INode inode = store.retrieveINode(makeAbsolute(path)); + if (inode == null) { + return false; + } + return inode.isFile(); + } + + private INode checkFile(Path path) throws IOException { + INode inode = store.retrieveINode(makeAbsolute(path)); + if (inode == null) { + throw new IOException("No such file."); + } + if (inode.isDirectory()) { + throw new IOException("Path " + path + " is a directory."); + } + return inode; + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + Path absolutePath = makeAbsolute(f); + INode inode = store.retrieveINode(absolutePath); + if (inode == null) { + throw new FileNotFoundException("File " + f + " does not exist."); + } + if (inode.isFile()) { + return new FileStatus[] { + new S3FileStatus(f.makeQualified(this), inode) + }; + } + ArrayList ret = new ArrayList(); + for (Path p : store.listSubPaths(absolutePath)) { + ret.add(getFileStatus(p.makeQualified(this))); + } + return ret.toArray(new FileStatus[0]); + } + + /** This optional operation is not yet supported. */ + @Override + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + throw new IOException("Not supported"); + } + + /** + * @param permission Currently ignored. + */ + @Override + public FSDataOutputStream create(Path file, FsPermission permission, + boolean overwrite, int bufferSize, + short replication, long blockSize, Progressable progress) + throws IOException { + + INode inode = store.retrieveINode(makeAbsolute(file)); + if (inode != null) { + if (overwrite) { + delete(file, true); + } else { + throw new FileAlreadyExistsException("File already exists: " + file); + } + } else { + Path parent = file.getParent(); + if (parent != null) { + if (!mkdirs(parent)) { + throw new IOException("Mkdirs failed to create " + parent.toString()); + } + } + } + return new FSDataOutputStream + (new S3OutputStream(getConf(), store, makeAbsolute(file), + blockSize, progress, bufferSize), + statistics); + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + INode inode = checkFile(path); + return new FSDataInputStream(new S3InputStream(getConf(), store, inode, + statistics)); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + Path absoluteSrc = makeAbsolute(src); + final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; + INode srcINode = store.retrieveINode(absoluteSrc); + boolean debugEnabled = LOG.isDebugEnabled(); + if (srcINode == null) { + // src path doesn't exist + if (debugEnabled) { + LOG.debug(debugPreamble + "returning false as src does not exist"); + } + return false; + } + + Path absoluteDst = makeAbsolute(dst); + + //validate the parent dir of the destination + Path dstParent = absoluteDst.getParent(); + if (dstParent != null) { + //if the dst parent is not root, make sure it exists + INode dstParentINode = store.retrieveINode(dstParent); + if (dstParentINode == null) { + // dst parent doesn't exist + if (debugEnabled) { + LOG.debug(debugPreamble + + "returning false as dst parent does not exist"); + } + return false; + } + if (dstParentINode.isFile()) { + // dst parent exists but is a file + if (debugEnabled) { + LOG.debug(debugPreamble + + "returning false as dst parent exists and is a file"); + } + return false; + } + } + + //get status of source + boolean srcIsFile = srcINode.isFile(); + + INode dstINode = store.retrieveINode(absoluteDst); + boolean destExists = dstINode != null; + boolean destIsDir = destExists && !dstINode.isFile(); + if (srcIsFile) { + + //source is a simple file + if (destExists) { + if (destIsDir) { + //outcome #1 dest exists and is dir -filename to subdir of dest + if (debugEnabled) { + LOG.debug(debugPreamble + + "copying src file under dest dir to " + absoluteDst); + } + absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); + } else { + //outcome #2 dest it's a file: fail iff different from src + boolean renamingOnToSelf = absoluteSrc.equals(absoluteDst); + if (debugEnabled) { + LOG.debug(debugPreamble + + "copying file onto file, outcome is " + renamingOnToSelf); + } + return renamingOnToSelf; + } + } else { + // #3 dest does not exist: use dest as path for rename + if (debugEnabled) { + LOG.debug(debugPreamble + + "copying file onto file"); + } + } + } else { + //here the source exists and is a directory + // outcomes (given we know the parent dir exists if we get this far) + // #1 destination is a file: fail + // #2 destination is a directory: create a new dir under that one + // #3 destination doesn't exist: create a new dir with that name + // #3 and #4 are only allowed if the dest path is not == or under src + + if (destExists) { + if (!destIsDir) { + // #1 destination is a file: fail + if (debugEnabled) { + LOG.debug(debugPreamble + + "returning false as src is a directory, but not dest"); + } + return false; + } else { + // the destination dir exists + // destination for rename becomes a subdir of the target name + absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); + if (debugEnabled) { + LOG.debug(debugPreamble + + "copying src dir under dest dir to " + absoluteDst); + } + } + } + //the final destination directory is now know, so validate it for + //illegal moves + + if (absoluteSrc.equals(absoluteDst)) { + //you can't rename a directory onto itself + if (debugEnabled) { + LOG.debug(debugPreamble + + "Dest==source && isDir -failing"); + } + return false; + } + if (absoluteDst.toString().startsWith(absoluteSrc.toString() + "/")) { + //you can't move a directory under itself + if (debugEnabled) { + LOG.debug(debugPreamble + + "dst is equal to or under src dir -failing"); + } + return false; + } + } + //here the dest path is set up -so rename + return renameRecursive(absoluteSrc, absoluteDst); + } + + private boolean renameRecursive(Path src, Path dst) throws IOException { + INode srcINode = store.retrieveINode(src); + store.storeINode(dst, srcINode); + store.deleteINode(src); + if (srcINode.isDirectory()) { + for (Path oldSrc : store.listDeepSubPaths(src)) { + INode inode = store.retrieveINode(oldSrc); + if (inode == null) { + return false; + } + String oldSrcPath = oldSrc.toUri().getPath(); + String srcPath = src.toUri().getPath(); + String dstPath = dst.toUri().getPath(); + Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); + store.storeINode(newDst, inode); + store.deleteINode(oldSrc); + } + } + return true; + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + Path absolutePath = makeAbsolute(path); + INode inode = store.retrieveINode(absolutePath); + if (inode == null) { + return false; + } + if (inode.isFile()) { + store.deleteINode(absolutePath); + for (Block block: inode.getBlocks()) { + store.deleteBlock(block); + } + } else { + FileStatus[] contents = null; + try { + contents = listStatus(absolutePath); + } catch(FileNotFoundException fnfe) { + return false; + } + + if ((contents.length !=0) && (!recursive)) { + throw new IOException("Directory " + path.toString() + + " is not empty."); + } + for (FileStatus p:contents) { + if (!delete(p.getPath(), recursive)) { + return false; + } + } + store.deleteINode(absolutePath); + } + return true; + } + + /** + * FileStatus for S3 file systems. + */ + @Override + public FileStatus getFileStatus(Path f) throws IOException { + INode inode = store.retrieveINode(makeAbsolute(f)); + if (inode == null) { + throw new FileNotFoundException(f + ": No such file or directory."); + } + return new S3FileStatus(f.makeQualified(this), inode); + } + + @Override + public long getDefaultBlockSize() { + return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024); + } + + @Override + public String getCanonicalServiceName() { + // Does not support Token + return null; + } + + // diagnostic methods + + void dump() throws IOException { + store.dump(); + } + + void purge() throws IOException { + store.purge(); + } + + private static class S3FileStatus extends FileStatus { + + S3FileStatus(Path f, INode inode) throws IOException { + super(findLength(inode), inode.isDirectory(), 1, + findBlocksize(inode), 0, f); + } + + private static long findLength(INode inode) { + if (!inode.isDirectory()) { + long length = 0L; + for (Block block : inode.getBlocks()) { + length += block.getLength(); + } + return length; + } + return 0; + } + + private static long findBlocksize(INode inode) { + final Block[] ret = inode.getBlocks(); + return ret == null ? 0L : ret[0].getLength(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemConfigKeys.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemConfigKeys.java new file mode 100644 index 0000000..8172a46 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemConfigKeys.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CommonConfigurationKeys; + +/** + * This class contains constants for configuration keys used + * in the s3 file system. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class S3FileSystemConfigKeys extends CommonConfigurationKeys { + public static final String S3_BLOCK_SIZE_KEY = "s3.blocksize"; + public static final long S3_BLOCK_SIZE_DEFAULT = 64*1024*1024; + public static final String S3_REPLICATION_KEY = "s3.replication"; + public static final short S3_REPLICATION_DEFAULT = 1; + public static final String S3_STREAM_BUFFER_SIZE_KEY = + "s3.stream-buffer-size"; + public static final int S3_STREAM_BUFFER_SIZE_DEFAULT = 4096; + public static final String S3_BYTES_PER_CHECKSUM_KEY = + "s3.bytes-per-checksum"; + public static final int S3_BYTES_PER_CHECKSUM_DEFAULT = 512; + public static final String S3_CLIENT_WRITE_PACKET_SIZE_KEY = + "s3.client-write-packet-size"; + public static final int S3_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemException.java new file mode 100644 index 0000000..cc1b463 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3FileSystemException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Thrown when there is a fatal exception while using {@link S3FileSystem}. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class S3FileSystemException extends IOException { + private static final long serialVersionUID = 1L; + + public S3FileSystemException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3InputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3InputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3InputStream.java new file mode 100644 index 0000000..5af57e6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3InputStream.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +class S3InputStream extends FSInputStream { + + private FileSystemStore store; + + private Block[] blocks; + + private boolean closed; + + private long fileLength; + + private long pos = 0; + + private File blockFile; + + private DataInputStream blockStream; + + private long blockEnd = -1; + + private FileSystem.Statistics stats; + + private static final Log LOG = + LogFactory.getLog(S3InputStream.class.getName()); + + + @Deprecated + public S3InputStream(Configuration conf, FileSystemStore store, + INode inode) { + this(conf, store, inode, null); + } + + public S3InputStream(Configuration conf, FileSystemStore store, + INode inode, FileSystem.Statistics stats) { + + this.store = store; + this.stats = stats; + this.blocks = inode.getBlocks(); + for (Block block : blocks) { + this.fileLength += block.getLength(); + } + } + + @Override + public synchronized long getPos() throws IOException { + return pos; + } + + @Override + public synchronized int available() throws IOException { + return (int) (fileLength - pos); + } + + @Override + public synchronized void seek(long targetPos) throws IOException { + if (targetPos > fileLength) { + throw new IOException("Cannot seek after EOF"); + } + pos = targetPos; + blockEnd = -1; + } + + @Override + public synchronized boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public synchronized int read() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + int result = -1; + if (pos < fileLength) { + if (pos > blockEnd) { + blockSeekTo(pos); + } + result = blockStream.read(); + if (result >= 0) { + pos++; + } + } + if (stats != null && result >= 0) { + stats.incrementBytesRead(1); + } + return result; + } + + @Override + public synchronized int read(byte buf[], int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + if (pos < fileLength) { + if (pos > blockEnd) { + blockSeekTo(pos); + } + int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L)); + int result = blockStream.read(buf, off, realLen); + if (result >= 0) { + pos += result; + } + if (stats != null && result > 0) { + stats.incrementBytesRead(result); + } + return result; + } + return -1; + } + + private synchronized void blockSeekTo(long target) throws IOException { + // + // Compute desired block + // + int targetBlock = -1; + long targetBlockStart = 0; + long targetBlockEnd = 0; + for (int i = 0; i < blocks.length; i++) { + long blockLength = blocks[i].getLength(); + targetBlockEnd = targetBlockStart + blockLength - 1; + + if (target >= targetBlockStart && target <= targetBlockEnd) { + targetBlock = i; + break; + } else { + targetBlockStart = targetBlockEnd + 1; + } + } + if (targetBlock < 0) { + throw new IOException( + "Impossible situation: could not find target position " + target); + } + long offsetIntoBlock = target - targetBlockStart; + + // read block blocks[targetBlock] from position offsetIntoBlock + + this.blockFile = store.retrieveBlock(blocks[targetBlock], offsetIntoBlock); + + this.pos = target; + this.blockEnd = targetBlockEnd; + this.blockStream = new DataInputStream(new FileInputStream(blockFile)); + + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + if (blockStream != null) { + blockStream.close(); + blockStream = null; + } + if (blockFile != null) { + boolean b = blockFile.delete(); + if (!b) { + LOG.warn("Ignoring failed delete"); + } + } + super.close(); + closed = true; + } + + /** + * We don't support marks. + */ + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readLimit) { + // Do nothing + } + + @Override + public void reset() throws IOException { + throw new IOException("Mark not supported"); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3OutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3OutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3OutputStream.java new file mode 100644 index 0000000..761f2ce --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3OutputStream.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3.INode.FileType; +import org.apache.hadoop.util.Progressable; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +class S3OutputStream extends OutputStream { + + private Configuration conf; + + private int bufferSize; + + private FileSystemStore store; + + private Path path; + + private long blockSize; + + private File backupFile; + + private OutputStream backupStream; + + private Random r = new Random(); + + private boolean closed; + + private int pos = 0; + + private long filePos = 0; + + private int bytesWrittenToBlock = 0; + + private byte[] outBuf; + + private List blocks = new ArrayList(); + + private Block nextBlock; + + private static final Log LOG = + LogFactory.getLog(S3OutputStream.class.getName()); + + + public S3OutputStream(Configuration conf, FileSystemStore store, + Path path, long blockSize, Progressable progress, + int buffersize) throws IOException { + + this.conf = conf; + this.store = store; + this.path = path; + this.blockSize = blockSize; + this.backupFile = newBackupFile(); + this.backupStream = new FileOutputStream(backupFile); + this.bufferSize = buffersize; + this.outBuf = new byte[bufferSize]; + + } + + private File newBackupFile() throws IOException { + File dir = new File(conf.get("fs.s3.buffer.dir")); + if (!dir.exists() && !dir.mkdirs()) { + throw new IOException("Cannot create S3 buffer directory: " + dir); + } + File result = File.createTempFile("output-", ".tmp", dir); + result.deleteOnExit(); + return result; + } + + public long getPos() throws IOException { + return filePos; + } + + @Override + public synchronized void write(int b) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) { + flush(); + } + outBuf[pos++] = (byte) b; + filePos++; + } + + @Override + public synchronized void write(byte b[], int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + while (len > 0) { + int remaining = bufferSize - pos; + int toWrite = Math.min(remaining, len); + System.arraycopy(b, off, outBuf, pos, toWrite); + pos += toWrite; + off += toWrite; + len -= toWrite; + filePos += toWrite; + + if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) { + flush(); + } + } + } + + @Override + public synchronized void flush() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + if (bytesWrittenToBlock + pos >= blockSize) { + flushData((int) blockSize - bytesWrittenToBlock); + } + if (bytesWrittenToBlock == blockSize) { + endBlock(); + } + flushData(pos); + } + + private synchronized void flushData(int maxPos) throws IOException { + int workingPos = Math.min(pos, maxPos); + + if (workingPos > 0) { + // + // To the local block backup, write just the bytes + // + backupStream.write(outBuf, 0, workingPos); + + // + // Track position + // + bytesWrittenToBlock += workingPos; + System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos); + pos -= workingPos; + } + } + + private synchronized void endBlock() throws IOException { + // + // Done with local copy + // + backupStream.close(); + + // + // Send it to S3 + // + // TODO: Use passed in Progressable to report progress. + nextBlockOutputStream(); + store.storeBlock(nextBlock, backupFile); + internalClose(); + + // + // Delete local backup, start new one + // + boolean b = backupFile.delete(); + if (!b) { + LOG.warn("Ignoring failed delete"); + } + backupFile = newBackupFile(); + backupStream = new FileOutputStream(backupFile); + bytesWrittenToBlock = 0; + } + + private synchronized void nextBlockOutputStream() throws IOException { + long blockId = r.nextLong(); + while (store.blockExists(blockId)) { + blockId = r.nextLong(); + } + nextBlock = new Block(blockId, bytesWrittenToBlock); + blocks.add(nextBlock); + bytesWrittenToBlock = 0; + } + + private synchronized void internalClose() throws IOException { + INode inode = new INode(FileType.FILE, blocks.toArray(new Block[blocks + .size()])); + store.storeINode(path, inode); + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + flush(); + if (filePos == 0 || bytesWrittenToBlock != 0) { + endBlock(); + } + + backupStream.close(); + boolean b = backupFile.delete(); + if (!b) { + LOG.warn("Ignoring failed delete"); + } + + super.close(); + + closed = true; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/VersionMismatchException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/VersionMismatchException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/VersionMismatchException.java new file mode 100644 index 0000000..ccc8969 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/VersionMismatchException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Thrown when Hadoop cannot read the version of the data stored + * in {@link S3FileSystem}. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class VersionMismatchException extends S3FileSystemException { + private static final long serialVersionUID = 1L; + + public VersionMismatchException(String clientVersion, String dataVersion) { + super("Version mismatch: client expects version " + clientVersion + + ", but data has version " + + (dataVersion == null ? "[unversioned]" : dataVersion)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/package.html ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/package.html b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/package.html new file mode 100644 index 0000000..dd601e1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/package.html @@ -0,0 +1,55 @@ + + + + + + +

A distributed, block-based implementation of {@link +org.apache.hadoop.fs.FileSystem} that uses Amazon S3 +as a backing store.

+ +

+Files are stored in S3 as blocks (represented by +{@link org.apache.hadoop.fs.s3.Block}), which have an ID and a length. +Block metadata is stored in S3 as a small record (represented by +{@link org.apache.hadoop.fs.s3.INode}) using the URL-encoded +path string as a key. Inodes record the file type (regular file or directory) and the list of blocks. +This design makes it easy to seek to any given position in a file by reading the inode data to compute +which block to access, then using S3's support for +HTTP Range headers +to start streaming from the correct position. +Renames are also efficient since only the inode is moved (by a DELETE followed by a PUT since +S3 does not support renames). +

+

+For a single file /dir1/file1 which takes two blocks of storage, the file structure in S3 +would be something like this: +

+
+/
+/dir1
+/dir1/file1
+block-6415776850131549260
+block-3026438247347758425
+
+

+Inodes start with a leading /, while blocks are prefixed with block-. +

+ + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/FileMetadata.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/FileMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/FileMetadata.java new file mode 100644 index 0000000..2746af4 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/FileMetadata.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + *

+ * Holds basic metadata for a file stored in a {@link NativeFileSystemStore}. + *

+ */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +class FileMetadata { + private final String key; + private final long length; + private final long lastModified; + + public FileMetadata(String key, long length, long lastModified) { + this.key = key; + this.length = length; + this.lastModified = lastModified; + } + + public String getKey() { + return key; + } + + public long getLength() { + return length; + } + + public long getLastModified() { + return lastModified; + } + + @Override + public String toString() { + return "FileMetadata[" + key + ", " + length + ", " + lastModified + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java new file mode 100644 index 0000000..a10d6f2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java @@ -0,0 +1,483 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import static org.apache.hadoop.fs.s3native.NativeS3FileSystem.PATH_DELIMITER; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.s3.S3Credentials; +import org.apache.hadoop.fs.s3.S3Exception; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.AccessControlException; +import org.jets3t.service.S3Service; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; +import org.jets3t.service.StorageObjectsChunk; +import org.jets3t.service.impl.rest.HttpException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.MultipartPart; +import org.jets3t.service.model.MultipartUpload; +import org.jets3t.service.model.S3Bucket; +import org.jets3t.service.model.S3Object; +import org.jets3t.service.model.StorageObject; +import org.jets3t.service.security.AWSCredentials; +import org.jets3t.service.utils.MultipartUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +class Jets3tNativeFileSystemStore implements NativeFileSystemStore { + + private S3Service s3Service; + private S3Bucket bucket; + + private long multipartBlockSize; + private boolean multipartEnabled; + private long multipartCopyBlockSize; + static final long MAX_PART_SIZE = (long)5 * 1024 * 1024 * 1024; + + private String serverSideEncryptionAlgorithm; + + public static final Logger LOG = + LoggerFactory.getLogger(Jets3tNativeFileSystemStore.class); + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + S3Credentials s3Credentials = new S3Credentials(); + s3Credentials.initialize(uri, conf); + try { + AWSCredentials awsCredentials = + new AWSCredentials(s3Credentials.getAccessKey(), + s3Credentials.getSecretAccessKey()); + this.s3Service = new RestS3Service(awsCredentials); + } catch (S3ServiceException e) { + handleException(e); + } + multipartEnabled = + conf.getBoolean("fs.s3n.multipart.uploads.enabled", false); + multipartBlockSize = Math.min( + conf.getLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024), + MAX_PART_SIZE); + multipartCopyBlockSize = Math.min( + conf.getLong("fs.s3n.multipart.copy.block.size", MAX_PART_SIZE), + MAX_PART_SIZE); + serverSideEncryptionAlgorithm = conf.get("fs.s3n.server-side-encryption-algorithm"); + + bucket = new S3Bucket(uri.getHost()); + } + + @Override + public void storeFile(String key, File file, byte[] md5Hash) + throws IOException { + + if (multipartEnabled && file.length() >= multipartBlockSize) { + storeLargeFile(key, file, md5Hash); + return; + } + + BufferedInputStream in = null; + try { + in = new BufferedInputStream(new FileInputStream(file)); + S3Object object = new S3Object(key); + object.setDataInputStream(in); + object.setContentType("binary/octet-stream"); + object.setContentLength(file.length()); + object.setServerSideEncryptionAlgorithm(serverSideEncryptionAlgorithm); + if (md5Hash != null) { + object.setMd5Hash(md5Hash); + } + s3Service.putObject(bucket, object); + } catch (ServiceException e) { + handleException(e, key); + } finally { + IOUtils.closeStream(in); + } + } + + public void storeLargeFile(String key, File file, byte[] md5Hash) + throws IOException { + S3Object object = new S3Object(key); + object.setDataInputFile(file); + object.setContentType("binary/octet-stream"); + object.setContentLength(file.length()); + object.setServerSideEncryptionAlgorithm(serverSideEncryptionAlgorithm); + if (md5Hash != null) { + object.setMd5Hash(md5Hash); + } + + List objectsToUploadAsMultipart = + new ArrayList(); + objectsToUploadAsMultipart.add(object); + MultipartUtils mpUtils = new MultipartUtils(multipartBlockSize); + + try { + mpUtils.uploadObjects(bucket.getName(), s3Service, + objectsToUploadAsMultipart, null); + } catch (Exception e) { + handleException(e, key); + } + } + + @Override + public void storeEmptyFile(String key) throws IOException { + try { + S3Object object = new S3Object(key); + object.setDataInputStream(new ByteArrayInputStream(new byte[0])); + object.setContentType("binary/octet-stream"); + object.setContentLength(0); + object.setServerSideEncryptionAlgorithm(serverSideEncryptionAlgorithm); + s3Service.putObject(bucket, object); + } catch (ServiceException e) { + handleException(e, key); + } + } + + @Override + public FileMetadata retrieveMetadata(String key) throws IOException { + StorageObject object = null; + try { + LOG.debug("Getting metadata for key: {} from bucket: {}", + key, bucket.getName()); + object = s3Service.getObjectDetails(bucket.getName(), key); + return new FileMetadata(key, object.getContentLength(), + object.getLastModifiedDate().getTime()); + + } catch (ServiceException e) { + try { + // process + handleException(e, key); + return null; + } catch (FileNotFoundException fnfe) { + // and downgrade missing files + return null; + } + } finally { + if (object != null) { + object.closeDataInputStream(); + } + } + } + + /** + * @param key + * The key is the object name that is being retrieved from the S3 bucket + * @return + * This method returns null if the key is not found + * @throws IOException + */ + + @Override + public InputStream retrieve(String key) throws IOException { + try { + LOG.debug("Getting key: {} from bucket: {}", + key, bucket.getName()); + S3Object object = s3Service.getObject(bucket.getName(), key); + return object.getDataInputStream(); + } catch (ServiceException e) { + handleException(e, key); + return null; //return null if key not found + } + } + + /** + * + * @param key + * The key is the object name that is being retrieved from the S3 bucket + * @return + * This method returns null if the key is not found + * @throws IOException + */ + + @Override + public InputStream retrieve(String key, long byteRangeStart) + throws IOException { + try { + LOG.debug("Getting key: {} from bucket: {} with byteRangeStart: {}", + key, bucket.getName(), byteRangeStart); + S3Object object = s3Service.getObject(bucket, key, null, null, null, + null, byteRangeStart, null); + return object.getDataInputStream(); + } catch (ServiceException e) { + handleException(e, key); + return null; + } + } + + @Override + public PartialListing list(String prefix, int maxListingLength) + throws IOException { + return list(prefix, maxListingLength, null, false); + } + + @Override + public PartialListing list(String prefix, int maxListingLength, String priorLastKey, + boolean recurse) throws IOException { + + return list(prefix, recurse ? null : PATH_DELIMITER, maxListingLength, priorLastKey); + } + + /** + * list objects + * @param prefix prefix + * @param delimiter delimiter + * @param maxListingLength max no. of entries + * @param priorLastKey last key in any previous search + * @return a list of matches + * @throws IOException on any reported failure + */ + + private PartialListing list(String prefix, String delimiter, + int maxListingLength, String priorLastKey) throws IOException { + try { + if (!prefix.isEmpty() && !prefix.endsWith(PATH_DELIMITER)) { + prefix += PATH_DELIMITER; + } + StorageObjectsChunk chunk = s3Service.listObjectsChunked(bucket.getName(), + prefix, delimiter, maxListingLength, priorLastKey); + + FileMetadata[] fileMetadata = + new FileMetadata[chunk.getObjects().length]; + for (int i = 0; i < fileMetadata.length; i++) { + StorageObject object = chunk.getObjects()[i]; + fileMetadata[i] = new FileMetadata(object.getKey(), + object.getContentLength(), object.getLastModifiedDate().getTime()); + } + return new PartialListing(chunk.getPriorLastKey(), fileMetadata, + chunk.getCommonPrefixes()); + } catch (ServiceException e) { + handleException(e, prefix); + return null; // never returned - keep compiler happy + } + } + + @Override + public void delete(String key) throws IOException { + try { + LOG.debug("Deleting key: {} from bucket: {}", + key, bucket.getName()); + s3Service.deleteObject(bucket, key); + } catch (ServiceException e) { + handleException(e, key); + } + } + + public void rename(String srcKey, String dstKey) throws IOException { + try { + s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey)); + } catch (ServiceException e) { + handleException(e, srcKey); + } + } + + @Override + public void copy(String srcKey, String dstKey) throws IOException { + try { + if(LOG.isDebugEnabled()) { + LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName()); + } + if (multipartEnabled) { + S3Object object = s3Service.getObjectDetails(bucket, srcKey, null, + null, null, null); + if (multipartCopyBlockSize > 0 && + object.getContentLength() > multipartCopyBlockSize) { + copyLargeFile(object, dstKey); + return; + } + } + + S3Object dstObject = new S3Object(dstKey); + dstObject.setServerSideEncryptionAlgorithm(serverSideEncryptionAlgorithm); + s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(), + dstObject, false); + } catch (ServiceException e) { + handleException(e, srcKey); + } + } + + public void copyLargeFile(S3Object srcObject, String dstKey) throws IOException { + try { + long partCount = srcObject.getContentLength() / multipartCopyBlockSize + + (srcObject.getContentLength() % multipartCopyBlockSize > 0 ? 1 : 0); + + MultipartUpload multipartUpload = s3Service.multipartStartUpload + (bucket.getName(), dstKey, srcObject.getMetadataMap()); + + List listedParts = new ArrayList(); + for (int i = 0; i < partCount; i++) { + long byteRangeStart = i * multipartCopyBlockSize; + long byteLength; + if (i < partCount - 1) { + byteLength = multipartCopyBlockSize; + } else { + byteLength = srcObject.getContentLength() % multipartCopyBlockSize; + if (byteLength == 0) { + byteLength = multipartCopyBlockSize; + } + } + + MultipartPart copiedPart = s3Service.multipartUploadPartCopy + (multipartUpload, i + 1, bucket.getName(), srcObject.getKey(), + null, null, null, null, byteRangeStart, + byteRangeStart + byteLength - 1, null); + listedParts.add(copiedPart); + } + + Collections.reverse(listedParts); + s3Service.multipartCompleteUpload(multipartUpload, listedParts); + } catch (ServiceException e) { + handleException(e, srcObject.getKey()); + } + } + + @Override + public void purge(String prefix) throws IOException { + String key = ""; + try { + S3Object[] objects = + s3Service.listObjects(bucket.getName(), prefix, null); + for (S3Object object : objects) { + key = object.getKey(); + s3Service.deleteObject(bucket, key); + } + } catch (S3ServiceException e) { + handleException(e, key); + } + } + + @Override + public void dump() throws IOException { + StringBuilder sb = new StringBuilder("S3 Native Filesystem, "); + sb.append(bucket.getName()).append("\n"); + try { + S3Object[] objects = s3Service.listObjects(bucket.getName()); + for (S3Object object : objects) { + sb.append(object.getKey()).append("\n"); + } + } catch (S3ServiceException e) { + handleException(e); + } + System.out.println(sb); + } + + /** + * Handle any service exception by translating it into an IOException + * @param e exception + * @throws IOException exception -always + */ + private void handleException(Exception e) throws IOException { + throw processException(e, e, ""); + } + /** + * Handle any service exception by translating it into an IOException + * @param e exception + * @param key key sought from object store + + * @throws IOException exception -always + */ + private void handleException(Exception e, String key) throws IOException { + throw processException(e, e, key); + } + + /** + * Handle any service exception by translating it into an IOException + * @param thrown exception + * @param original original exception -thrown if no other translation could + * be made + * @param key key sought from object store or "" for undefined + * @return an exception to throw. If isProcessingCause==true this may be null. + */ + private IOException processException(Throwable thrown, Throwable original, + String key) { + IOException result; + if (thrown.getCause() != null) { + // recurse down + result = processException(thrown.getCause(), original, key); + } else if (thrown instanceof HttpException) { + // nested HttpException - examine error code and react + HttpException httpException = (HttpException) thrown; + String responseMessage = httpException.getResponseMessage(); + int responseCode = httpException.getResponseCode(); + String bucketName = "s3n://" + bucket.getName(); + String text = String.format("%s : %03d : %s", + bucketName, + responseCode, + responseMessage); + String filename = !key.isEmpty() ? (bucketName + "/" + key) : text; + IOException ioe; + switch (responseCode) { + case 404: + result = new FileNotFoundException(filename); + break; + case 416: // invalid range + result = new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + +": " + filename); + break; + case 403: //forbidden + result = new AccessControlException("Permission denied" + +": " + filename); + break; + default: + result = new IOException(text); + } + result.initCause(thrown); + } else if (thrown instanceof S3ServiceException) { + S3ServiceException se = (S3ServiceException) thrown; + LOG.debug( + "S3ServiceException: {}: {} : {}", + se.getS3ErrorCode(), se.getS3ErrorMessage(), se, se); + if ("InvalidRange".equals(se.getS3ErrorCode())) { + result = new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } else { + result = new S3Exception(se); + } + } else if (thrown instanceof ServiceException) { + ServiceException se = (ServiceException) thrown; + LOG.debug("S3ServiceException: {}: {} : {}", + se.getErrorCode(), se.toString(), se, se); + result = new S3Exception(se); + } else if (thrown instanceof IOException) { + result = (IOException) thrown; + } else { + // here there is no exception derived yet. + // this means no inner cause, and no translation made yet. + // convert the original to an IOException -rather than just the + // exception at the base of the tree + result = new S3Exception(original); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java new file mode 100644 index 0000000..f26cdac --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +/** + *

+ * An abstraction for a key-based {@link File} store. + *

+ */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +interface NativeFileSystemStore { + + void initialize(URI uri, Configuration conf) throws IOException; + + void storeFile(String key, File file, byte[] md5Hash) throws IOException; + void storeEmptyFile(String key) throws IOException; + + FileMetadata retrieveMetadata(String key) throws IOException; + InputStream retrieve(String key) throws IOException; + InputStream retrieve(String key, long byteRangeStart) throws IOException; + + PartialListing list(String prefix, int maxListingLength) throws IOException; + PartialListing list(String prefix, int maxListingLength, String priorLastKey, boolean recursive) + throws IOException; + + void delete(String key) throws IOException; + + void copy(String srcKey, String dstKey) throws IOException; + + /** + * Delete all keys with the given prefix. Used for testing. + * @throws IOException + */ + void purge(String prefix) throws IOException; + + /** + * Diagnostic method to dump state to the console. + * @throws IOException + */ + void dump() throws IOException; +}