Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4C8E410BAC for ; Wed, 17 Dec 2014 22:59:50 +0000 (UTC) Received: (qmail 3326 invoked by uid 500); 17 Dec 2014 22:59:48 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 2892 invoked by uid 500); 17 Dec 2014 22:59:48 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 2673 invoked by uid 99); 17 Dec 2014 22:59:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Dec 2014 22:59:48 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 06EED8C4002; Wed, 17 Dec 2014 22:59:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cnauroth@apache.org To: common-commits@hadoop.apache.org Date: Wed, 17 Dec 2014 22:59:50 -0000 Message-Id: <4feb455005db483588281bddfc40add4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/24] hadoop git commit: HADOOP-9629. Support Windows Azure Storage - Blob as a file system in Hadoop. Contributed by Dexter Bradshaw, Mostafa Elhemali, Xi Fang, Johannes Klein, David Lao, Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys, http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java new file mode 100644 index 0000000..5085a0f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.permission.PermissionStatus; + +/** + *

+ * Holds basic metadata for a file stored in a {@link NativeFileSystemStore}. + *

+ */ +@InterfaceAudience.Private +class FileMetadata { + private final String key; + private final long length; + private final long lastModified; + private final boolean isDir; + private final PermissionStatus permissionStatus; + private final BlobMaterialization blobMaterialization; + + /** + * Constructs a FileMetadata object for a file. + * + * @param key + * The key (path) to the file. + * @param length + * The length in bytes of the file. + * @param lastModified + * The last modified date (milliseconds since January 1, 1970 UTC.) + * @param permissionStatus + * The permission for the file. + */ + public FileMetadata(String key, long length, long lastModified, + PermissionStatus permissionStatus) { + this.key = key; + this.length = length; + this.lastModified = lastModified; + this.isDir = false; + this.permissionStatus = permissionStatus; + this.blobMaterialization = BlobMaterialization.Explicit; // File are never + // implicit. + } + + /** + * Constructs a FileMetadata object for a directory. + * + * @param key + * The key (path) to the directory. + * @param lastModified + * The last modified date (milliseconds since January 1, 1970 UTC.) + * @param permissionStatus + * The permission for the directory. + * @param blobMaterialization + * Whether this is an implicit (no real blob backing it) or explicit + * directory. + */ + public FileMetadata(String key, long lastModified, + PermissionStatus permissionStatus, BlobMaterialization blobMaterialization) { + this.key = key; + this.isDir = true; + this.length = 0; + this.lastModified = lastModified; + this.permissionStatus = permissionStatus; + this.blobMaterialization = blobMaterialization; + } + + public boolean isDir() { + return isDir; + } + + public String getKey() { + return key; + } + + public long getLength() { + return length; + } + + public long getLastModified() { + return lastModified; + } + + public PermissionStatus getPermissionStatus() { + return permissionStatus; + } + + /** + * Indicates whether this is an implicit directory (no real blob backing it) + * or an explicit one. + * + * @return Implicit if this is an implicit directory, or Explicit if it's an + * explicit directory or a file. + */ + public BlobMaterialization getBlobMaterialization() { + return blobMaterialization; + } + + @Override + public String toString() { + return "FileMetadata[" + key + ", " + length + ", " + lastModified + ", " + + permissionStatus + "]"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProvider.java new file mode 100644 index 0000000..4c3a369 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProvider.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +/** + * The interface that every Azure file system key provider must implement. + */ +@InterfaceAudience.Private +public interface KeyProvider { + /** + * Key providers must implement this method. Given a list of configuration + * parameters for the specified Azure storage account, retrieve the plaintext + * storage account key. + * + * @param accountName + * the storage account name + * @param conf + * Hadoop configuration parameters + * @return the plaintext storage account key + * @throws KeyProviderException + */ + String getStorageAccountKey(String accountName, Configuration conf) + throws KeyProviderException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProviderException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProviderException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProviderException.java new file mode 100644 index 0000000..b65b2e6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProviderException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Thrown if there is a problem instantiating a KeyProvider or retrieving a key + * using a KeyProvider object. + */ +@InterfaceAudience.Private +public class KeyProviderException extends Exception { + private static final long serialVersionUID = 1L; + + public KeyProviderException(String message) { + super(message); + } + + public KeyProviderException(String message, Throwable cause) { + super(message, cause); + } + + public KeyProviderException(Throwable t) { + super(t); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java new file mode 100644 index 0000000..30e6b30 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -0,0 +1,1465 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.BufferedFSInputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Progressable; + + +import com.google.common.annotations.VisibleForTesting; +import com.microsoft.windowsazure.storage.core.Utility; + +/** + *

+ * A {@link FileSystem} for reading and writing files stored on Windows Azure. This implementation is + * blob-based and stores files on Azure in their native form so they can be read + * by other Azure tools. + *

+ */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class NativeAzureFileSystem extends FileSystem { + + @Override + public String getScheme() { + return "wasb"; + } + + + /** + *

+ * A {@link FileSystem} for reading and writing files stored on Windows Azure. This implementation is + * blob-based and stores files on Azure in their native form so they can be read + * by other Azure tools. This implementation uses HTTPS for secure network communication. + *

+ */ + public static class Secure extends NativeAzureFileSystem { + @Override + public String getScheme() { + return "wasbs"; + } + } + + public static final Log LOG = LogFactory.getLog(NativeAzureFileSystem.class); + + static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; + /** + * The time span in seconds before which we consider a temp blob to be + * dangling (not being actively uploaded to) and up for reclamation. + * + * So e.g. if this is 60, then any temporary blobs more than a minute old + * would be considered dangling. + */ + static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds"; + private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600; + static final String PATH_DELIMITER = Path.SEPARATOR; + static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$"; + + private static final int AZURE_LIST_ALL = -1; + private static final int AZURE_UNBOUNDED_DEPTH = -1; + + private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L; + + /** + * The configuration property that determines which group owns files created + * in WASB. + */ + private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup"; + /** + * The default value for fs.azure.permissions.supergroup. Chosen as the same + * default as DFS. + */ + static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup"; + + static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost"; + private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost"; + + private class NativeAzureFsInputStream extends FSInputStream { + private InputStream in; + private final String key; + private long pos = 0; + + public NativeAzureFsInputStream(DataInputStream in, String key) { + this.in = in; + this.key = key; + } + + /* + * Reads the next byte of data from the input stream. The value byte is + * returned as an integer in the range 0 to 255. If no byte is available + * because the end of the stream has been reached, the value -1 is returned. + * This method blocks until input data is available, the end of the stream + * is detected, or an exception is thrown. + * + * @returns int An integer corresponding to the byte read. + */ + @Override + public synchronized int read() throws IOException { + int result = 0; + result = in.read(); + if (result != -1) { + pos++; + if (statistics != null) { + statistics.incrementBytesRead(1); + } + } + + // Return to the caller with the result. + // + return result; + } + + /* + * Reads up to len bytes of data from the input stream into an array of + * bytes. An attempt is made to read as many as len bytes, but a smaller + * number may be read. The number of bytes actually read is returned as an + * integer. This method blocks until input data is available, end of file is + * detected, or an exception is thrown. If len is zero, then no bytes are + * read and 0 is returned; otherwise, there is an attempt to read at least + * one byte. If no byte is available because the stream is at end of file, + * the value -1 is returned; otherwise, at least one byte is read and stored + * into b. + * + * @param b -- the buffer into which data is read + * + * @param off -- the start offset in the array b at which data is written + * + * @param len -- the maximum number of bytes read + * + * @ returns int The total number of byes read into the buffer, or -1 if + * there is no more data because the end of stream is reached. + */ + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + int result = 0; + result = in.read(b, off, len); + if (result > 0) { + pos += result; + } + + if (null != statistics) { + statistics.incrementBytesRead(result); + } + + // Return to the caller with the result. + return result; + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public synchronized void seek(long pos) throws IOException { + in.close(); + in = store.retrieve(key, pos); + this.pos = pos; + } + + @Override + public synchronized long getPos() throws IOException { + return pos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + } + + private class NativeAzureFsOutputStream extends OutputStream { + // We should not override flush() to actually close current block and flush + // to DFS, this will break applications that assume flush() is a no-op. + // Applications are advised to use Syncable.hflush() for that purpose. + // NativeAzureFsOutputStream needs to implement Syncable if needed. + private String key; + private String keyEncoded; + private OutputStream out; + + public NativeAzureFsOutputStream(OutputStream out, String aKey, + String anEncodedKey) throws IOException { + // Check input arguments. The output stream should be non-null and the + // keys + // should be valid strings. + if (null == out) { + throw new IllegalArgumentException( + "Illegal argument: the output stream is null."); + } + + if (null == aKey || 0 == aKey.length()) { + throw new IllegalArgumentException( + "Illegal argument the key string is null or empty"); + } + + if (null == anEncodedKey || 0 == anEncodedKey.length()) { + throw new IllegalArgumentException( + "Illegal argument the encoded key string is null or empty"); + } + + // Initialize the member variables with the incoming parameters. + this.out = out; + + setKey(aKey); + setEncodedKey(anEncodedKey); + } + + @Override + public synchronized void close() throws IOException { + if (out != null) { + // Close the output stream and decode the key for the output stream + // before returning to the caller. + // + out.close(); + restoreKey(); + out = null; + } + } + + /** + * Writes the specified byte to this output stream. The general contract for + * write is that one byte is written to the output stream. The byte to be + * written is the eight low-order bits of the argument b. The 24 high-order + * bits of b are ignored. + * + * @param b + * 32-bit integer of block of 4 bytes + */ + @Override + public void write(int b) throws IOException { + out.write(b); + } + + /** + * Writes b.length bytes from the specified byte array to this output + * stream. The general contract for write(b) is that it should have exactly + * the same effect as the call write(b, 0, b.length). + * + * @param b + * Block of bytes to be written to the output stream. + */ + @Override + public void write(byte[] b) throws IOException { + out.write(b); + } + + /** + * Writes len from the specified byte array starting at offset + * off to the output stream. The general contract for write(b, + * off, len) is that some of the bytes in the array + * b are written to the output stream in order; element + * b[off] is the first byte written and + * b[off+len-1] is the last byte written by this operation. + * + * @param b + * Byte array to be written. + * @param off + * Write this offset in stream. + * @param len + * Number of bytes to be written. + */ + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + /** + * Get the blob name. + * + * @return String Blob name. + */ + public String getKey() { + return key; + } + + /** + * Set the blob name. + * + * @param key + * Blob name. + */ + public void setKey(String key) { + this.key = key; + } + + /** + * Get the blob name. + * + * @return String Blob name. + */ + public String getEncodedKey() { + return keyEncoded; + } + + /** + * Set the blob name. + * + * @param anEncodedKey + * Blob name. + */ + public void setEncodedKey(String anEncodedKey) { + this.keyEncoded = anEncodedKey; + } + + /** + * Restore the original key name from the m_key member variable. Note: The + * output file stream is created with an encoded blob store key to guarantee + * load balancing on the front end of the Azure storage partition servers. + * The create also includes the name of the original key value which is + * stored in the m_key member variable. This method should only be called + * when the stream is closed. + * + * @param anEncodedKey + * Encoding of the original key stored in m_key member. + */ + private void restoreKey() throws IOException { + store.rename(getEncodedKey(), getKey()); + } + } + + private URI uri; + private NativeFileSystemStore store; + private AzureNativeFileSystemStore actualStore; + private Path workingDir; + private long blockSize = MAX_AZURE_BLOCK_SIZE; + private static boolean suppressRetryPolicy = false; + + public NativeAzureFileSystem() { + // set store in initialize() + } + + public NativeAzureFileSystem(NativeFileSystemStore store) { + this.store = store; + } + + /** + * Suppress the default retry policy for the Storage, useful in unit tests to + * test negative cases without waiting forever. + */ + @VisibleForTesting + static void suppressRetryPolicy() { + suppressRetryPolicy = true; + } + + /** + * Undo the effect of suppressRetryPolicy. + */ + @VisibleForTesting + static void resumeRetryPolicy() { + suppressRetryPolicy = false; + } + + /** + * Checks if the given URI scheme is a scheme that's affiliated with the Azure + * File System. + * + * @param scheme + * The URI scheme. + * @return true iff it's an Azure File System URI scheme. + */ + private static boolean isWasbScheme(String scheme) { + // The valid schemes are: asv (old name), asvs (old name over HTTPS), + // wasb (new name), wasbs (new name over HTTPS). + return scheme != null + && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs") + || scheme.equalsIgnoreCase("wasb") || scheme + .equalsIgnoreCase("wasbs")); + } + + /** + * Puts in the authority of the default file system if it is a WASB file + * system and the given URI's authority is null. + * + * @return The URI with reconstructed authority if necessary and possible. + */ + private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) { + if (null == uri.getAuthority()) { + // If WASB is the default file system, get the authority from there + URI defaultUri = FileSystem.getDefaultUri(conf); + if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) { + try { + // Reconstruct the URI with the authority from the default URI. + return new URI(uri.getScheme(), defaultUri.getAuthority(), + uri.getPath(), uri.getQuery(), uri.getFragment()); + } catch (URISyntaxException e) { + // This should never happen. + throw new Error("Bad URI construction", e); + } + } + } + return uri; + } + + @Override + protected void checkPath(Path path) { + // Make sure to reconstruct the path's authority if needed + super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(), + getConf()))); + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + // Check authority for the URI to guarantee that it is non-null. + uri = reconstructAuthorityIfNeeded(uri, conf); + if (null == uri.getAuthority()) { + final String errMsg = String + .format("Cannot initialize WASB file system, URI authority not recognized."); + throw new IllegalArgumentException(errMsg); + } + super.initialize(uri, conf); + + if (store == null) { + store = createDefaultStore(conf); + } + + store.initialize(uri, conf); + setConf(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() + .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); + this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, + MAX_AZURE_BLOCK_SIZE); + + if (LOG.isDebugEnabled()) { + LOG.debug("NativeAzureFileSystem. Initializing."); + LOG.debug(" blockSize = " + + conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); + } + + } + + private NativeFileSystemStore createDefaultStore(Configuration conf) { + actualStore = new AzureNativeFileSystemStore(); + + if (suppressRetryPolicy) { + actualStore.suppressRetryPolicy(); + } + return actualStore; + } + + // Note: The logic for this method is confusing as to whether it strips the + // last slash or not (it adds it in the beginning, then strips it at the end). + // We should revisit that. + private String pathToKey(Path path) { + // Convert the path to a URI to parse the scheme, the authority, and the + // path from the path object. + URI tmpUri = path.toUri(); + String pathUri = tmpUri.getPath(); + + // The scheme and authority is valid. If the path does not exist add a "/" + // separator to list the root of the container. + Path newPath = path; + if ("".equals(pathUri)) { + newPath = new Path(tmpUri.toString() + Path.SEPARATOR); + } + + // Verify path is absolute if the path refers to a windows drive scheme. + if (!newPath.isAbsolute()) { + throw new IllegalArgumentException("Path must be absolute: " + path); + } + + String key = null; + key = newPath.toUri().getPath(); + if (key.length() == 1) { + return key; + } else { + return key.substring(1); // remove initial slash + } + } + + private static Path keyToPath(String key) { + if (key.equals("/")) { + return new Path("/"); // container + } + return new Path("/" + key); + } + + private Path makeAbsolute(Path path) { + if (path.isAbsolute()) { + return path; + } + return new Path(workingDir, path); + } + + /** + * For unit test purposes, retrieves the AzureNativeFileSystemStore store + * backing this file system. + * + * @return The store object. + */ + @VisibleForTesting + AzureNativeFileSystemStore getStore() { + return actualStore; + } + + /** This optional operation is not yet supported. */ + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) + throws IOException { + throw new IOException("Not supported"); + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug("Creating file: " + f.toString()); + } + + if (containsColon(f)) { + throw new IOException("Cannot create file " + f + + " through WASB that has colons in the name"); + } + + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + + FileMetadata existingMetadata = store.retrieveMetadata(key); + if (existingMetadata != null) { + if (existingMetadata.isDir()) { + throw new IOException("Cannot create file " + f + + "; already exists as a directory."); + } + if (!overwrite) { + throw new IOException("File already exists:" + f); + } + } + + Path parentFolder = absolutePath.getParent(); + if (parentFolder != null && parentFolder.getParent() != null) { // skip root + // Update the parent folder last modified time if the parent folder + // already exists. + String parentKey = pathToKey(parentFolder); + FileMetadata parentMetadata = store.retrieveMetadata(parentKey); + if (parentMetadata != null + && parentMetadata.isDir() + && parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { + store.updateFolderLastModifiedTime(parentKey); + } else { + // Make sure that the parent folder exists. + mkdirs(parentFolder, permission); + } + } + + // Open the output blob stream based on the encoded key. + String keyEncoded = encodeKey(key); + + // Mask the permission first (with the default permission mask as well). + FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile); + PermissionStatus permissionStatus = createPermissionStatus(masked); + + // First create a blob at the real key, pointing back to the temporary file + // This accomplishes a few things: + // 1. Makes sure we can create a file there. + // 2. Makes it visible to other concurrent threads/processes/nodes what + // we're + // doing. + // 3. Makes it easier to restore/cleanup data in the event of us crashing. + store.storeEmptyLinkFile(key, keyEncoded, permissionStatus); + + // The key is encoded to point to a common container at the storage server. + // This reduces the number of splits on the server side when load balancing. + // Ingress to Azure storage can take advantage of earlier splits. We remove + // the root path to the key and prefix a random GUID to the tail (or leaf + // filename) of the key. Keys are thus broadly and randomly distributed over + // a single container to ease load balancing on the storage server. When the + // blob is committed it is renamed to its earlier key. Uncommitted blocks + // are not cleaned up and we leave it to Azure storage to garbage collect + // these + // blocks. + OutputStream bufOutStream = new NativeAzureFsOutputStream(store.storefile( + keyEncoded, permissionStatus), key, keyEncoded); + + // Construct the data output stream from the buffered output stream. + FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); + + // Return data output stream to caller. + return fsOut; + } + + @Override + @Deprecated + public boolean delete(Path path) throws IOException { + return delete(path, true); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting file: " + f.toString()); + } + + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + + // Capture the metadata for the path. + // + FileMetadata metaFile = store.retrieveMetadata(key); + + if (null == metaFile) { + // The path to be deleted does not exist. + return false; + } + + // The path exists, determine if it is a folder containing objects, + // an empty folder, or a simple file and take the appropriate actions. + if (!metaFile.isDir()) { + // The path specifies a file. We need to check the parent path + // to make sure it's a proper materialized directory before we + // delete the file. Otherwise we may get into a situation where + // the file we were deleting was the last one in an implicit directory + // (e.g. the blob store only contains the blob a/b and there's no + // corresponding directory blob a) and that would implicitly delete + // the directory as well, which is not correct. + Path parentPath = absolutePath.getParent(); + if (parentPath.getParent() != null) {// Not root + String parentKey = pathToKey(parentPath); + FileMetadata parentMetadata = store.retrieveMetadata(parentKey); + if (!parentMetadata.isDir()) { + // Invalid state: the parent path is actually a file. Throw. + throw new AzureException("File " + f + " has a parent directory " + + parentPath + " which is also a file. Can't resolve."); + } + if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found an implicit parent directory while trying to" + + " delete the file " + f + ". Creating the directory blob for" + + " it in " + parentKey + "."); + } + store.storeEmptyFolder(parentKey, + createPermissionStatus(FsPermission.getDefault())); + } else { + store.updateFolderLastModifiedTime(parentKey); + } + } + store.delete(key); + } else { + // The path specifies a folder. Recursively delete all entries under the + // folder. + Path parentPath = absolutePath.getParent(); + if (parentPath.getParent() != null) { + String parentKey = pathToKey(parentPath); + FileMetadata parentMetadata = store.retrieveMetadata(parentKey); + + if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found an implicit parent directory while trying to" + + " delete the directory " + f + + ". Creating the directory blob for" + " it in " + parentKey + + "."); + } + store.storeEmptyFolder(parentKey, + createPermissionStatus(FsPermission.getDefault())); + } + } + + // List all the blobs in the current folder. + String priorLastKey = null; + PartialListing listing = store.listAll(key, AZURE_LIST_ALL, 1, + priorLastKey); + FileMetadata[] contents = listing.getFiles(); + if (!recursive && contents.length > 0) { + // The folder is non-empty and recursive delete was not specified. + // Throw an exception indicating that a non-recursive delete was + // specified for a non-empty folder. + throw new IOException("Non-recursive delete of non-empty directory " + + f.toString()); + } + + // Delete all the files in the folder. + for (FileMetadata p : contents) { + // Tag on the directory name found as the suffix of the suffix of the + // parent directory to get the new absolute path. + String suffix = p.getKey().substring( + p.getKey().lastIndexOf(PATH_DELIMITER)); + if (!p.isDir()) { + store.delete(key + suffix); + } else { + // Recursively delete contents of the sub-folders. Notice this also + // deletes the blob for the directory. + if (!delete(new Path(f.toString() + suffix), true)) { + return false; + } + } + } + store.delete(key); + + // Update parent directory last modified time + Path parent = absolutePath.getParent(); + if (parent != null && parent.getParent() != null) { // not root + String parentKey = pathToKey(parent); + store.updateFolderLastModifiedTime(parentKey); + } + } + + // File or directory was successfully deleted. + return true; + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug("Getting the file status for " + f.toString()); + } + + // Capture the absolute path and the path to key. + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + if (key.length() == 0) { // root always exists + return newDirectory(null, absolutePath); + } + + // The path is either a folder or a file. Retrieve metadata to + // determine if it is a directory or file. + FileMetadata meta = store.retrieveMetadata(key); + if (meta != null) { + if (meta.isDir()) { + // The path is a folder with files in it. + // + if (LOG.isDebugEnabled()) { + LOG.debug("Path " + f.toString() + "is a folder."); + } + + // Return reference to the directory object. + return newDirectory(meta, absolutePath); + } + + // The path is a file. + if (LOG.isDebugEnabled()) { + LOG.debug("Found the path: " + f.toString() + " as a file."); + } + + // Return with reference to a file object. + return newFile(meta, absolutePath); + } + + // File not found. Throw exception no such file or directory. + // Note: Should never get to this point since the root always exists. + throw new FileNotFoundException(absolutePath + + ": No such file or directory."); + } + + @Override + public URI getUri() { + return uri; + } + + /** + * Retrieve the status of a given path if it is a file, or of all the + * contained files if it is a directory. + */ + @Override + public FileStatus[] listStatus(Path f) throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug("Listing status for " + f.toString()); + } + + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + Set status = new TreeSet(); + FileMetadata meta = store.retrieveMetadata(key); + + if (meta != null) { + if (!meta.isDir()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found path as a file"); + } + return new FileStatus[] { newFile(meta, absolutePath) }; + } + String partialKey = null; + PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); + for (FileMetadata fileMetadata : listing.getFiles()) { + Path subpath = keyToPath(fileMetadata.getKey()); + + // Test whether the metadata represents a file or directory and + // add the appropriate metadata object. + // + // Note: There was a very old bug here where directories were added + // to the status set as files flattening out recursive listings + // using "-lsr" down the file system hierarchy. + if (fileMetadata.isDir()) { + // Make sure we hide the temp upload folder + if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) { + // Don't expose that. + continue; + } + status.add(newDirectory(fileMetadata, subpath)); + } else { + status.add(newFile(fileMetadata, subpath)); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Found path as a directory with " + status.size() + + " files in it."); + } + } else { + // There is no metadata found for the path. + if (LOG.isDebugEnabled()) { + LOG.debug("Did not find any metadata for path: " + key); + } + + throw new FileNotFoundException("File" + f + " does not exist."); + } + + return status.toArray(new FileStatus[0]); + } + + private FileStatus newFile(FileMetadata meta, Path path) { + return new FileStatus(meta.getLength(), false, 1, blockSize, + meta.getLastModified(), 0, meta.getPermissionStatus().getPermission(), + meta.getPermissionStatus().getUserName(), meta.getPermissionStatus() + .getGroupName(), + path.makeQualified(getUri(), getWorkingDirectory())); + } + + private FileStatus newDirectory(FileMetadata meta, Path path) { + return new FileStatus(0, true, 1, blockSize, meta == null ? 0 + : meta.getLastModified(), 0, meta == null ? FsPermission.getDefault() + : meta.getPermissionStatus().getPermission(), meta == null ? "" : meta + .getPermissionStatus().getUserName(), meta == null ? "" : meta + .getPermissionStatus().getGroupName(), path.makeQualified(getUri(), + getWorkingDirectory())); + } + + private static enum UMaskApplyMode { + NewFile, NewDirectory, ChangeExistingFile, ChangeExistingDirectory, + } + + /** + * Applies the applicable UMASK's on the given permission. + * + * @param permission + * The permission to mask. + * @param applyDefaultUmask + * Whether to also apply the default umask. + * @return The masked persmission. + */ + private FsPermission applyUMask(final FsPermission permission, + final UMaskApplyMode applyMode) { + FsPermission newPermission = new FsPermission(permission); + // Apply the default umask - this applies for new files or directories. + if (applyMode == UMaskApplyMode.NewFile + || applyMode == UMaskApplyMode.NewDirectory) { + newPermission = newPermission + .applyUMask(FsPermission.getUMask(getConf())); + } + return newPermission; + } + + /** + * Creates the PermissionStatus object to use for the given permission, based + * on the current user in context. + * + * @param permission + * The permission for the file. + * @return The permission status object to use. + * @throws IOException + * If login fails in getCurrentUser + */ + private PermissionStatus createPermissionStatus(FsPermission permission) + throws IOException { + // Create the permission status for this file based on current user + return new PermissionStatus(UserGroupInformation.getCurrentUser() + .getShortUserName(), getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME, + AZURE_DEFAULT_GROUP_DEFAULT), permission); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating directory: " + f.toString()); + } + + if (containsColon(f)) { + throw new IOException("Cannot create directory " + f + + " through WASB that has colons in the name"); + } + + Path absolutePath = makeAbsolute(f); + PermissionStatus permissionStatus = createPermissionStatus(applyUMask( + permission, UMaskApplyMode.NewDirectory)); + + ArrayList keysToCreateAsFolder = new ArrayList(); + ArrayList keysToUpdateAsFolder = new ArrayList(); + boolean childCreated = false; + // Check that there is no file in the parent chain of the given path. + // Stop when you get to the root + for (Path current = absolutePath, parent = current.getParent(); parent != null; current = parent, parent = current + .getParent()) { + String currentKey = pathToKey(current); + FileMetadata currentMetadata = store.retrieveMetadata(currentKey); + if (currentMetadata != null && !currentMetadata.isDir()) { + throw new IOException("Cannot create directory " + f + " because " + + current + " is an existing file."); + } else if (currentMetadata == null + || (currentMetadata.isDir() && currentMetadata + .getBlobMaterialization() == BlobMaterialization.Implicit)) { + keysToCreateAsFolder.add(currentKey); + childCreated = true; + } else { + // The directory already exists. Its last modified time need to be + // updated if there is a child directory created under it. + if (childCreated) { + keysToUpdateAsFolder.add(currentKey); + } + childCreated = false; + } + } + + for (String currentKey : keysToCreateAsFolder) { + store.storeEmptyFolder(currentKey, permissionStatus); + } + + // Take the time after finishing mkdirs as the modified time, and update all + // the existing directories' modified time to it uniformly. + final Calendar lastModifiedCalendar = Calendar + .getInstance(Utility.LOCALE_US); + lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE); + Date lastModified = lastModifiedCalendar.getTime(); + for (String key : keysToUpdateAsFolder) { + store.updateFolderLastModifiedTime(key, lastModified); + } + + // otherwise throws exception + return true; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Opening file: " + f.toString()); + } + + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + FileMetadata meta = store.retrieveMetadata(key); + if (meta == null) { + throw new FileNotFoundException(f.toString()); + } + if (meta.isDir()) { + throw new FileNotFoundException(f.toString() + + " is a directory not a file."); + } + + return new FSDataInputStream(new BufferedFSInputStream( + new NativeAzureFsInputStream(store.retrieve(key), key), bufferSize)); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug("Moving " + src + " to " + dst); + } + + if (containsColon(dst)) { + throw new IOException("Cannot rename to file " + dst + + " through WASB that has colons in the name"); + } + + String srcKey = pathToKey(makeAbsolute(src)); + + if (srcKey.length() == 0) { + // Cannot rename root of file system + return false; + } + + FileMetadata srcMetadata = store.retrieveMetadata(srcKey); + if (srcMetadata == null) { + // Source doesn't exist + if (LOG.isDebugEnabled()) { + LOG.debug("Source " + src + " doesn't exist, failing the rename."); + } + return false; + } + + // Figure out the final destination + Path absoluteDst = makeAbsolute(dst); + String dstKey = pathToKey(absoluteDst); + FileMetadata dstMetadata = store.retrieveMetadata(dstKey); + + // directory rename validations + if (srcMetadata.isDir()) { + + // rename dir to self is an error + if (srcKey.equals(dstKey)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming directory to itself is disallowed. path=" + src); + } + return false; + } + + // rename dir to (sub-)child of self is an error. see + // FileSystemContractBaseTest.testRenameChildDirForbidden + if (dstKey.startsWith(srcKey + PATH_DELIMITER)) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming directory to a itself is disallowed. src=" + src + + " dest=" + dst); + } + return false; + } + } + + // file rename early checks + if (!srcMetadata.isDir()) { + if (srcKey.equals(dstKey)) { + // rename file to self is OK + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming file to itself. This is allowed and is treated as no-op. path=" + + src); + } + return true; + } + } + + // More validations.. + // If target is dir but target already exists, alter the dst to be a + // subfolder. + // eg move("/a/file.txt", "/b") where "/b" already exists causes the target + // to be "/c/file.txt + if (dstMetadata != null && dstMetadata.isDir()) { + dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); + // Best would be to update dstMetadata, but it is not used further, so set + // it to null and skip the additional cost + dstMetadata = null; + // dstMetadata = store.retrieveMetadata(dstKey); + if (LOG.isDebugEnabled()) { + LOG.debug("Destination " + dst + + " is a directory, adjusted the destination to be " + dstKey); + } + + // rename dir to self is an error + if (srcKey.equals(dstKey)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming directory to itself is disallowed. path=" + src); + } + return false; + } + + } else if (dstMetadata != null) { + // Otherwise, attempting to overwrite a file is error + if (LOG.isDebugEnabled()) { + LOG.debug("Destination " + dst + + " is an already existing file, failing the rename."); + } + return false; + } else { + // Either dir or file and target doesn't exist.. Check that the parent + // directory exists. + FileMetadata parentOfDestMetadata = store + .retrieveMetadata(pathToKey(absoluteDst.getParent())); + if (parentOfDestMetadata == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Parent of the destination " + dst + + " doesn't exist, failing the rename."); + } + return false; + } else if (!parentOfDestMetadata.isDir()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Parent of the destination " + dst + + " is a file, failing the rename."); + } + return false; + } + } + + // Validations complete, do the move. + if (!srcMetadata.isDir()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Source " + src + " found as a file, renaming."); + } + store.rename(srcKey, dstKey); + } else { + // Move everything inside the folder. + String priorLastKey = null; + + // Calculate the index of the part of the string to be moved. That + // is everything on the path up to the folder name. + do { + // List all blobs rooted at the source folder. + PartialListing listing = store.listAll(srcKey, AZURE_LIST_ALL, + AZURE_UNBOUNDED_DEPTH, priorLastKey); + + // Rename all the files in the folder. + for (FileMetadata file : listing.getFiles()) { + // Rename all materialized entries under the folder to point to the + // final destination. + if (file.getBlobMaterialization() == BlobMaterialization.Explicit) { + String srcName = file.getKey(); + String suffix = srcName.substring(srcKey.length()); + String dstName = dstKey + suffix; + store.rename(srcName, dstName); + } + } + priorLastKey = listing.getPriorLastKey(); + } while (priorLastKey != null); + // Rename the top level empty blob for the folder. + if (srcMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { + store.rename(srcKey, dstKey); + } + } + + // Update both source and destination parent folder last modified time. + Path srcParent = makeAbsolute(keyToPath(srcKey)).getParent(); + if (srcParent != null && srcParent.getParent() != null) { // not root + String srcParentKey = pathToKey(srcParent); + + // ensure the srcParent is a materialized folder + FileMetadata srcParentMetadata = store.retrieveMetadata(srcParentKey); + if (srcParentMetadata.isDir() + && srcParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { + store.storeEmptyFolder(srcParentKey, + createPermissionStatus(FsPermission.getDefault())); + } + + store.updateFolderLastModifiedTime(srcParentKey); + } + + Path destParent = makeAbsolute(keyToPath(dstKey)).getParent(); + if (destParent != null && destParent.getParent() != null) { // not root + String dstParentKey = pathToKey(destParent); + + // ensure the dstParent is a materialized folder + FileMetadata dstParentMetadata = store.retrieveMetadata(dstParentKey); + if (dstParentMetadata.isDir() + && dstParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { + store.storeEmptyFolder(dstParentKey, + createPermissionStatus(FsPermission.getDefault())); + } + + store.updateFolderLastModifiedTime(dstParentKey); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Renamed " + src + " to " + dst + " successfully."); + } + return true; + } + + /** + * Return an array containing hostnames, offset and size of portions of the + * given file. For WASB we'll just lie and give fake hosts to make sure we get + * many splits in MR jobs. + */ + @Override + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, + long len) throws IOException { + if (file == null) { + return null; + } + + if ((start < 0) || (len < 0)) { + throw new IllegalArgumentException("Invalid start or len parameter"); + } + + if (file.getLen() < start) { + return new BlockLocation[0]; + } + final String blobLocationHost = getConf().get( + AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, + AZURE_BLOCK_LOCATION_HOST_DEFAULT); + final String[] name = { blobLocationHost }; + final String[] host = { blobLocationHost }; + long blockSize = file.getBlockSize(); + if (blockSize <= 0) { + throw new IllegalArgumentException( + "The block size for the given file is not a positive number: " + + blockSize); + } + int numberOfLocations = (int) (len / blockSize) + + ((len % blockSize == 0) ? 0 : 1); + BlockLocation[] locations = new BlockLocation[numberOfLocations]; + for (int i = 0; i < locations.length; i++) { + long currentOffset = start + (i * blockSize); + long currentLength = Math.min(blockSize, start + len - currentOffset); + locations[i] = new BlockLocation(name, host, currentOffset, currentLength); + } + return locations; + } + + /** + * Set the working directory to the given directory. + */ + @Override + public void setWorkingDirectory(Path newDir) { + workingDir = makeAbsolute(newDir); + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Override + public void setPermission(Path p, FsPermission permission) throws IOException { + Path absolutePath = makeAbsolute(p); + String key = pathToKey(absolutePath); + FileMetadata metadata = store.retrieveMetadata(key); + if (metadata == null) { + throw new FileNotFoundException("File doesn't exist: " + p); + } + permission = applyUMask(permission, + metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory + : UMaskApplyMode.ChangeExistingFile); + if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { + // It's an implicit folder, need to materialize it. + store.storeEmptyFolder(key, createPermissionStatus(permission)); + } else if (!metadata.getPermissionStatus().getPermission() + .equals(permission)) { + store.changePermissionStatus(key, new PermissionStatus(metadata + .getPermissionStatus().getUserName(), metadata.getPermissionStatus() + .getGroupName(), permission)); + } + } + + @Override + public void setOwner(Path p, String username, String groupname) + throws IOException { + Path absolutePath = makeAbsolute(p); + String key = pathToKey(absolutePath); + FileMetadata metadata = store.retrieveMetadata(key); + if (metadata == null) { + throw new FileNotFoundException("File doesn't exist: " + p); + } + PermissionStatus newPermissionStatus = new PermissionStatus( + username == null ? metadata.getPermissionStatus().getUserName() + : username, groupname == null ? metadata.getPermissionStatus() + .getGroupName() : groupname, metadata.getPermissionStatus() + .getPermission()); + if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { + // It's an implicit folder, need to materialize it. + store.storeEmptyFolder(key, newPermissionStatus); + } else { + store.changePermissionStatus(key, newPermissionStatus); + } + } + + @Override + public void close() throws IOException { + // Call the base close() to close any resources there. + super.close(); + // Close the store + store.close(); + } + + /** + * A handler that defines what to do with blobs whose upload was interrupted. + */ + private abstract class DanglingFileHandler { + abstract void handleFile(FileMetadata file, FileMetadata tempFile) + throws IOException; + } + + /** + * Handler implementation for just deleting dangling files and cleaning them + * up. + */ + private class DanglingFileDeleter extends DanglingFileHandler { + @Override + void handleFile(FileMetadata file, FileMetadata tempFile) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting dangling file " + file.getKey()); + } + store.delete(file.getKey()); + store.delete(tempFile.getKey()); + } + } + + /** + * Handler implementation for just moving dangling files to recovery location + * (/lost+found). + */ + private class DanglingFileRecoverer extends DanglingFileHandler { + private final Path destination; + + DanglingFileRecoverer(Path destination) { + this.destination = destination; + } + + @Override + void handleFile(FileMetadata file, FileMetadata tempFile) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering " + file.getKey()); + } + // Move to the final destination + String finalDestinationKey = pathToKey(new Path(destination, + file.getKey())); + store.rename(tempFile.getKey(), finalDestinationKey); + if (!finalDestinationKey.equals(file.getKey())) { + // Delete the empty link file now that we've restored it. + store.delete(file.getKey()); + } + } + } + + /** + * Check if a path has colons in its name + */ + private boolean containsColon(Path p) { + return p.toUri().getPath().toString().contains(":"); + } + + /** + * Implements recover and delete (-move and -delete) behaviors for handling + * dangling files (blobs whose upload was interrupted). + * + * @param root + * The root path to check from. + * @param handler + * The handler that deals with dangling files. + */ + private void handleFilesWithDanglingTempData(Path root, + DanglingFileHandler handler) throws IOException { + // Calculate the cut-off for when to consider a blob to be dangling. + long cutoffForDangling = new Date().getTime() + - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME, + AZURE_TEMP_EXPIRY_DEFAULT) * 1000; + // Go over all the blobs under the given root and look for blobs to + // recover. + String priorLastKey = null; + do { + PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL, + AZURE_UNBOUNDED_DEPTH, priorLastKey); + + for (FileMetadata file : listing.getFiles()) { + if (!file.isDir()) { // We don't recover directory blobs + // See if this blob has a link in it (meaning it's a place-holder + // blob for when the upload to the temp blob is complete). + String link = store.getLinkInFileMetadata(file.getKey()); + if (link != null) { + // It has a link, see if the temp blob it is pointing to is + // existent and old enough to be considered dangling. + FileMetadata linkMetadata = store.retrieveMetadata(link); + if (linkMetadata != null + && linkMetadata.getLastModified() >= cutoffForDangling) { + // Found one! + handler.handleFile(file, linkMetadata); + } + } + } + } + priorLastKey = listing.getPriorLastKey(); + } while (priorLastKey != null); + } + + /** + * Looks under the given root path for any blob that are left "dangling", + * meaning that they are place-holder blobs that we created while we upload + * the data to a temporary blob, but for some reason we crashed in the middle + * of the upload and left them there. If any are found, we move them to the + * destination given. + * + * @param root + * The root path to consider. + * @param destination + * The destination path to move any recovered files to. + * @throws IOException + */ + public void recoverFilesWithDanglingTempData(Path root, Path destination) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering files with dangling temp data in " + root); + } + handleFilesWithDanglingTempData(root, + new DanglingFileRecoverer(destination)); + } + + /** + * Looks under the given root path for any blob that are left "dangling", + * meaning that they are place-holder blobs that we created while we upload + * the data to a temporary blob, but for some reason we crashed in the middle + * of the upload and left them there. If any are found, we delete them. + * + * @param root + * The root path to consider. + * @throws IOException + */ + public void deleteFilesWithDanglingTempData(Path root) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting files with dangling temp data in " + root); + } + handleFilesWithDanglingTempData(root, new DanglingFileDeleter()); + } + + /** + * Encode the key with a random prefix for load balancing in Azure storage. + * Upload data to a random temporary file then do storage side renaming to + * recover the original key. + * + * @param aKey + * @param numBuckets + * @return Encoded version of the original key. + */ + private static String encodeKey(String aKey) { + // Get the tail end of the key name. + // + String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1, + aKey.length()); + + // Construct the randomized prefix of the file name. The prefix ensures the + // file always drops into the same folder but with a varying tail key name. + String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR + + UUID.randomUUID().toString(); + + // Concatenate the randomized prefix with the tail of the key name. + String randomizedKey = filePrefix + fileName; + + // Return to the caller with the randomized key. + return randomizedKey; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java new file mode 100644 index 0000000..0fb3c22 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.Date; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.PermissionStatus; + +import com.google.common.annotations.VisibleForTesting; + +/** + *

+ * An abstraction for a key-based {@link File} store. + *

+ */ +@InterfaceAudience.Private +interface NativeFileSystemStore { + + void initialize(URI uri, Configuration conf) throws IOException; + + void storeEmptyFolder(String key, PermissionStatus permissionStatus) + throws AzureException; + + FileMetadata retrieveMetadata(String key) throws IOException; + + DataInputStream retrieve(String key) throws IOException; + + DataInputStream retrieve(String key, long byteRangeStart) throws IOException; + + DataOutputStream storefile(String key, PermissionStatus permissionStatus) + throws AzureException; + + void storeEmptyLinkFile(String key, String tempBlobKey, + PermissionStatus permissionStatus) throws AzureException; + + String getLinkInFileMetadata(String key) throws AzureException; + + PartialListing list(String prefix, final int maxListingCount, + final int maxListingDepth) throws IOException; + + PartialListing list(String prefix, final int maxListingCount, + final int maxListingDepth, String priorLastKey) throws IOException; + + PartialListing listAll(String prefix, final int maxListingCount, + final int maxListingDepth, String priorLastKey) throws IOException; + + void changePermissionStatus(String key, PermissionStatus newPermission) + throws AzureException; + + void delete(String key) throws IOException; + + void rename(String srcKey, String dstKey) throws IOException; + + /** + * Delete all keys with the given prefix. Used for testing. + * + * @throws IOException + */ + @VisibleForTesting + void purge(String prefix) throws IOException; + + /** + * Diagnostic method to dump state to the console. + * + * @throws IOException + */ + void dump() throws IOException; + + void close(); + + void updateFolderLastModifiedTime(String key) throws AzureException; + + void updateFolderLastModifiedTime(String key, Date lastModified) + throws AzureException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java new file mode 100644 index 0000000..9e49de8 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + *

+ * Holds information on a directory listing for a {@link NativeFileSystemStore}. + * This includes the {@link FileMetadata files} and directories (their names) + * contained in a directory. + *

+ *

+ * This listing may be returned in chunks, so a priorLastKey is + * provided so that the next chunk may be requested. + *

+ * + * @see NativeFileSystemStore#list(String, int, String) + */ +@InterfaceAudience.Private +class PartialListing { + + private final String priorLastKey; + private final FileMetadata[] files; + private final String[] commonPrefixes; + + public PartialListing(String priorLastKey, FileMetadata[] files, + String[] commonPrefixes) { + this.priorLastKey = priorLastKey; + this.files = files; + this.commonPrefixes = commonPrefixes; + } + + public FileMetadata[] getFiles() { + return files; + } + + public String[] getCommonPrefixes() { + return commonPrefixes; + } + + public String getPriorLastKey() { + return priorLastKey; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java new file mode 100644 index 0000000..25f2883 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.net.HttpURLConnection; +import java.util.Date; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; + +import com.microsoft.windowsazure.storage.OperationContext; +import com.microsoft.windowsazure.storage.RequestResult; +import com.microsoft.windowsazure.storage.ResponseReceivedEvent; +import com.microsoft.windowsazure.storage.SendingRequestEvent; +import com.microsoft.windowsazure.storage.StorageEvent; + +/* + * Self throttling is implemented by hooking into send & response callbacks + * One instance of this class is created per operationContext so each blobUpload/blobDownload/etc. + * + * Self throttling only applies to 2nd and subsequent packets of an operation. This is a simple way to + * ensure it only affects bulk transfers and not every tiny request. + * + * A blobDownload will involve sequential packet transmissions and so there are no concurrency concerns + * A blobUpload will generally involve concurrent upload worker threads that share one operationContext and one throttling instance. + * -- we do not track the latencies for each worker thread as they are doing similar work and will rarely collide in practice. + * -- concurrent access to lastE2Edelay must be protected. + * -- volatile is necessary and should be sufficient to protect simple access to primitive values (java 1.5 onwards) + * -- synchronized{} blocks are also used to be conservative and for easier maintenance. + * + * If an operation were to perform concurrent GETs and PUTs there is the possibility of getting confused regarding + * whether lastE2Edelay was a read or write measurement. This scenario does not occur. + * + * readFactor = target read throughput as factor of unrestricted throughput. + * writeFactor = target write throughput as factor of unrestricted throughput. + * + * As we introduce delays it is important to only measure the actual E2E latency and not the augmented latency + * To achieve this, we fiddle the 'startDate' of the transfer tracking object. + */ + + +/** + * + * Introduces delays in our Azure traffic to prevent overrunning the server-side throttling limits. + * + */ +@InterfaceAudience.Private +public class SelfThrottlingIntercept { + public static final Log LOG = LogFactory + .getLog(SelfThrottlingIntercept.class); + + private final float readFactor; + private final float writeFactor; + + // Concurrency: access to non-final members must be thread-safe + private long lastE2Elatency; + + public SelfThrottlingIntercept(OperationContext operationContext, + float readFactor, float writeFactor) { + this.readFactor = readFactor; + this.writeFactor = writeFactor; + } + + public static void hook(OperationContext operationContext, float readFactor, + float writeFactor) { + + SelfThrottlingIntercept throttler = new SelfThrottlingIntercept( + operationContext, readFactor, writeFactor); + ResponseReceivedListener responseListener = throttler.new ResponseReceivedListener(); + SendingRequestListener sendingListener = throttler.new SendingRequestListener(); + + operationContext.getResponseReceivedEventHandler().addListener( + responseListener); + operationContext.getSendingRequestEventHandler().addListener( + sendingListener); + } + + public void responseReceived(ResponseReceivedEvent event) { + RequestResult result = event.getRequestResult(); + Date startDate = result.getStartDate(); + Date stopDate = result.getStopDate(); + long elapsed = stopDate.getTime() - startDate.getTime(); + + synchronized (this) { + this.lastE2Elatency = elapsed; + } + + if (LOG.isDebugEnabled()) { + int statusCode = result.getStatusCode(); + String etag = result.getEtag(); + HttpURLConnection urlConnection = (HttpURLConnection) event + .getConnectionObject(); + int contentLength = urlConnection.getContentLength(); + String requestMethod = urlConnection.getRequestMethod(); + long threadId = Thread.currentThread().getId(); + LOG.debug(String + .format( + "SelfThrottlingIntercept:: ResponseReceived: threadId=%d, Status=%d, Elapsed(ms)=%d, ETAG=%s, contentLength=%d, requestMethod=%s", + threadId, statusCode, elapsed, etag, contentLength, requestMethod)); + } + } + + public void sendingRequest(SendingRequestEvent sendEvent) { + long lastLatency; + boolean operationIsRead; // for logging + synchronized (this) { + + lastLatency = this.lastE2Elatency; + } + + float sleepMultiple; + HttpURLConnection urlConnection = (HttpURLConnection) sendEvent + .getConnectionObject(); + + // Azure REST API never uses POST, so PUT is a sufficient test for an + // upload. + if (urlConnection.getRequestMethod().equalsIgnoreCase("PUT")) { + operationIsRead = false; + sleepMultiple = (1 / writeFactor) - 1; + } else { + operationIsRead = true; + sleepMultiple = (1 / readFactor) - 1; + } + + long sleepDuration = (long) (sleepMultiple * lastLatency); + if (sleepDuration < 0) { + sleepDuration = 0; + } + + if (sleepDuration > 0) { + try { + // Thread.sleep() is not exact but it seems sufficiently accurate for + // our needs. If needed this could become a loop of small waits that + // tracks actual + // elapsed time. + Thread.sleep(sleepDuration); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + + // reset to avoid counting the sleep against request latency + sendEvent.getRequestResult().setStartDate(new Date()); + } + + if (LOG.isDebugEnabled()) { + boolean isFirstRequest = (lastLatency == 0); + long threadId = Thread.currentThread().getId(); + LOG.debug(String + .format( + " SelfThrottlingIntercept:: SendingRequest: threadId=%d, requestType=%s, isFirstRequest=%b, sleepDuration=%d", + threadId, operationIsRead ? "read " : "write", isFirstRequest, + sleepDuration)); + } + } + + // simply forwards back to the main class. + // this is necessary as our main class cannot implement two base-classes. + @InterfaceAudience.Private + class SendingRequestListener extends StorageEvent { + + @Override + public void eventOccurred(SendingRequestEvent event) { + sendingRequest(event); + } + } + + // simply forwards back to the main class. + // this is necessary as our main class cannot implement two base-classes. + @InterfaceAudience.Private + class ResponseReceivedListener extends StorageEvent { + + @Override + public void eventOccurred(ResponseReceivedEvent event) { + responseReceived(event); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java new file mode 100644 index 0000000..18f173e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.net.HttpURLConnection; +import java.security.InvalidKeyException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; + +import com.microsoft.windowsazure.storage.Constants.HeaderConstants; +import com.microsoft.windowsazure.storage.OperationContext; +import com.microsoft.windowsazure.storage.SendingRequestEvent; +import com.microsoft.windowsazure.storage.StorageCredentials; +import com.microsoft.windowsazure.storage.StorageEvent; +import com.microsoft.windowsazure.storage.StorageException; + +/** + * Manages the lifetime of binding on the operation contexts to intercept send + * request events to Azure storage. + */ +@InterfaceAudience.Private +public final class SendRequestIntercept extends StorageEvent { + + public static final Log LOG = LogFactory.getLog(SendRequestIntercept.class); + + private static final String ALLOW_ALL_REQUEST_PRECONDITIONS = "*"; + private final StorageCredentials storageCreds; + private final boolean allowConcurrentOOBIo; + private final OperationContext opContext; + + /** + * Getter returning the storage account credentials. + * + * @return storageCreds - account storage credentials. + */ + private StorageCredentials getCredentials() { + return storageCreds; + } + + /** + * Query if out-of-band I/Os are allowed. + * + * return allowConcurrentOOBIo - true if OOB I/O is allowed, and false + * otherwise. + */ + private boolean isOutOfBandIoAllowed() { + return allowConcurrentOOBIo; + } + + /** + * Getter returning the operation context. + * + * @return storageCreds - account storage credentials. + */ + private OperationContext getOperationContext() { + return opContext; + } + + /** + * Constructor for SendRequestThrottle. + * + * @param storageCreds + * - storage account credentials for signing packets. + * + */ + private SendRequestIntercept(StorageCredentials storageCreds, + boolean allowConcurrentOOBIo, OperationContext opContext) { + // Capture the send delay callback interface. + this.storageCreds = storageCreds; + this.allowConcurrentOOBIo = allowConcurrentOOBIo; + this.opContext = opContext; + } + + /** + * Binds a new lister to the operation context so the WASB file system can + * appropriately intercept sends. By allowing concurrent OOB I/Os, we bypass + * the blob immutability check when reading streams. + * + * @param opContext + * The operation context to bind to listener. + * + * @param allowConcurrentOOBIo + * True if reads are allowed with concurrent OOB writes. + */ + public static void bind(StorageCredentials storageCreds, + OperationContext opContext, boolean allowConcurrentOOBIo) { + SendRequestIntercept sendListener = new SendRequestIntercept(storageCreds, + allowConcurrentOOBIo, opContext); + opContext.getSendingRequestEventHandler().addListener(sendListener); + } + + /** + * Handler which processes the sending request event from Azure SDK. The + * handler simply sets reset the conditional header to make all read requests + * unconditional if reads with concurrent OOB writes are allowed. + * + * @param sendEvent + * - send event context from Windows Azure SDK. + */ + @Override + public void eventOccurred(SendingRequestEvent sendEvent) { + + if (!(sendEvent.getConnectionObject() instanceof HttpURLConnection)) { + // Pass if there is no HTTP connection associated with this send + // request. + return; + } + + // Capture the HTTP URL connection object and get size of the payload for + // the request. + HttpURLConnection urlConnection = (HttpURLConnection) sendEvent + .getConnectionObject(); + + // Determine whether this is a download request by checking that the request + // method + // is a "GET" operation. + if (urlConnection.getRequestMethod().equalsIgnoreCase("GET") + && isOutOfBandIoAllowed()) { + // If concurrent reads on OOB writes are allowed, reset the if-match + // condition on the conditional header. + urlConnection.setRequestProperty(HeaderConstants.IF_MATCH, + ALLOW_ALL_REQUEST_PRECONDITIONS); + + // In the Java AzureSDK the packet is signed before firing the + // SendRequest. Setting + // the conditional packet header property changes the contents of the + // packet, therefore the packet has to be re-signed. + try { + // Sign the request. GET's have no payload so the content length is + // zero. + getCredentials().signBlobAndQueueRequest(urlConnection, -1L, getOperationContext()); + } catch (InvalidKeyException e) { + // Log invalid key exception to track signing error before the send + // fails. + String errString = String.format( + "Received invalid key exception when attempting sign packet." + + " Cause: %s", e.getCause().toString()); + LOG.error(errString); + } catch (StorageException e) { + // Log storage exception to track signing error before the call fails. + String errString = String.format( + "Received storage exception when attempting to sign packet." + + " Cause: %s", e.getCause().toString()); + LOG.error(errString); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java new file mode 100644 index 0000000..2ce8ebd --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Shell; + +/** + * Shell decryption key provider which invokes an external script that will + * perform the key decryption. + */ +@InterfaceAudience.Private +public class ShellDecryptionKeyProvider extends SimpleKeyProvider { + static final String KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; + + @Override + public String getStorageAccountKey(String accountName, Configuration conf) + throws KeyProviderException { + String envelope = super.getStorageAccountKey(accountName, conf); + + final String command = conf.get(KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT); + if (command == null) { + throw new KeyProviderException( + "Script path is not specified via fs.azure.shellkeyprovider.script"); + } + + String[] cmd = command.split(" "); + String[] cmdWithEnvelope = Arrays.copyOf(cmd, cmd.length + 1); + cmdWithEnvelope[cmdWithEnvelope.length - 1] = envelope; + + String decryptedKey = null; + try { + decryptedKey = Shell.execCommand(cmdWithEnvelope); + } catch (IOException ex) { + throw new KeyProviderException(ex); + } + + // trim any whitespace + return decryptedKey.trim(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java new file mode 100644 index 0000000..ef44a85 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +/** + * Key provider that simply returns the storage account key from the + * configuration as plaintext. + */ +@InterfaceAudience.Private +public class SimpleKeyProvider implements KeyProvider { + + protected static final String KEY_ACCOUNT_KEY_PREFIX = "fs.azure.account.key."; + + @Override + public String getStorageAccountKey(String accountName, Configuration conf) + throws KeyProviderException { + return conf.get(getStorageAccountKeyName(accountName)); + } + + protected String getStorageAccountKeyName(String accountName) { + return KEY_ACCOUNT_KEY_PREFIX + accountName; + } +}