hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject [04/24] hadoop git commit: HADOOP-9629. Support Windows Azure Storage - Blob as a file system in Hadoop. Contributed by Dexter Bradshaw, Mostafa Elhemali, Xi Fang, Johannes Klein, David Lao, Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys,
Date Wed, 17 Dec 2014 22:59:50 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java
new file mode 100644
index 0000000..5085a0f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+
+/**
+ * <p>
+ * Holds basic metadata for a file stored in a {@link NativeFileSystemStore}.
+ * </p>
+ */
+@InterfaceAudience.Private
+class FileMetadata {
+  private final String key;
+  private final long length;
+  private final long lastModified;
+  private final boolean isDir;
+  private final PermissionStatus permissionStatus;
+  private final BlobMaterialization blobMaterialization;
+
+  /**
+   * Constructs a FileMetadata object for a file.
+   * 
+   * @param key
+   *          The key (path) to the file.
+   * @param length
+   *          The length in bytes of the file.
+   * @param lastModified
+   *          The last modified date (milliseconds since January 1, 1970 UTC.)
+   * @param permissionStatus
+   *          The permission for the file.
+   */
+  public FileMetadata(String key, long length, long lastModified,
+      PermissionStatus permissionStatus) {
+    this.key = key;
+    this.length = length;
+    this.lastModified = lastModified;
+    this.isDir = false;
+    this.permissionStatus = permissionStatus;
+    this.blobMaterialization = BlobMaterialization.Explicit; // File are never
+                                                             // implicit.
+  }
+
+  /**
+   * Constructs a FileMetadata object for a directory.
+   * 
+   * @param key
+   *          The key (path) to the directory.
+   * @param lastModified
+   *          The last modified date (milliseconds since January 1, 1970 UTC.)
+   * @param permissionStatus
+   *          The permission for the directory.
+   * @param blobMaterialization
+   *          Whether this is an implicit (no real blob backing it) or explicit
+   *          directory.
+   */
+  public FileMetadata(String key, long lastModified,
+      PermissionStatus permissionStatus, BlobMaterialization blobMaterialization) {
+    this.key = key;
+    this.isDir = true;
+    this.length = 0;
+    this.lastModified = lastModified;
+    this.permissionStatus = permissionStatus;
+    this.blobMaterialization = blobMaterialization;
+  }
+
+  public boolean isDir() {
+    return isDir;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public long getLastModified() {
+    return lastModified;
+  }
+
+  public PermissionStatus getPermissionStatus() {
+    return permissionStatus;
+  }
+
+  /**
+   * Indicates whether this is an implicit directory (no real blob backing it)
+   * or an explicit one.
+   * 
+   * @return Implicit if this is an implicit directory, or Explicit if it's an
+   *         explicit directory or a file.
+   */
+  public BlobMaterialization getBlobMaterialization() {
+    return blobMaterialization;
+  }
+
+  @Override
+  public String toString() {
+    return "FileMetadata[" + key + ", " + length + ", " + lastModified + ", "
+        + permissionStatus + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProvider.java
new file mode 100644
index 0000000..4c3a369
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProvider.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The interface that every Azure file system key provider must implement.
+ */
+@InterfaceAudience.Private
+public interface KeyProvider {
+  /**
+   * Key providers must implement this method. Given a list of configuration
+   * parameters for the specified Azure storage account, retrieve the plaintext
+   * storage account key.
+   * 
+   * @param accountName
+   *          the storage account name
+   * @param conf
+   *          Hadoop configuration parameters
+   * @return the plaintext storage account key
+   * @throws KeyProviderException
+   */
+  String getStorageAccountKey(String accountName, Configuration conf)
+      throws KeyProviderException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProviderException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProviderException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProviderException.java
new file mode 100644
index 0000000..b65b2e6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/KeyProviderException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Thrown if there is a problem instantiating a KeyProvider or retrieving a key
+ * using a KeyProvider object.
+ */
+@InterfaceAudience.Private
+public class KeyProviderException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  public KeyProviderException(String message) {
+    super(message);
+  }
+
+  public KeyProviderException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public KeyProviderException(Throwable t) {
+    super(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
new file mode 100644
index 0000000..30e6b30
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -0,0 +1,1465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.windowsazure.storage.core.Utility;
+
+/**
+ * <p>
+ * A {@link FileSystem} for reading and writing files stored on <a
+ * href="http://store.azure.com/">Windows Azure</a>. This implementation is
+ * blob-based and stores files on Azure in their native form so they can be read
+ * by other Azure tools.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class NativeAzureFileSystem extends FileSystem {
+
+  @Override
+  public String getScheme() {
+    return "wasb";
+  }
+
+  
+  /**
+   * <p>
+   * A {@link FileSystem} for reading and writing files stored on <a
+   * href="http://store.azure.com/">Windows Azure</a>. This implementation is
+   * blob-based and stores files on Azure in their native form so they can be read
+   * by other Azure tools. This implementation uses HTTPS for secure network communication.
+   * </p>
+   */
+  public static class Secure extends NativeAzureFileSystem {
+    @Override
+    public String getScheme() {
+      return "wasbs";
+    }
+  }
+
+  public static final Log LOG = LogFactory.getLog(NativeAzureFileSystem.class);
+
+  static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
+  /**
+   * The time span in seconds before which we consider a temp blob to be
+   * dangling (not being actively uploaded to) and up for reclamation.
+   * 
+   * So e.g. if this is 60, then any temporary blobs more than a minute old
+   * would be considered dangling.
+   */
+  static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds";
+  private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600;
+  static final String PATH_DELIMITER = Path.SEPARATOR;
+  static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$";
+
+  private static final int AZURE_LIST_ALL = -1;
+  private static final int AZURE_UNBOUNDED_DEPTH = -1;
+
+  private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
+
+  /**
+   * The configuration property that determines which group owns files created
+   * in WASB.
+   */
+  private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup";
+  /**
+   * The default value for fs.azure.permissions.supergroup. Chosen as the same
+   * default as DFS.
+   */
+  static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
+
+  static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost";
+  private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
+
+  private class NativeAzureFsInputStream extends FSInputStream {
+    private InputStream in;
+    private final String key;
+    private long pos = 0;
+
+    public NativeAzureFsInputStream(DataInputStream in, String key) {
+      this.in = in;
+      this.key = key;
+    }
+
+    /*
+     * Reads the next byte of data from the input stream. The value byte is
+     * returned as an integer in the range 0 to 255. If no byte is available
+     * because the end of the stream has been reached, the value -1 is returned.
+     * This method blocks until input data is available, the end of the stream
+     * is detected, or an exception is thrown.
+     * 
+     * @returns int An integer corresponding to the byte read.
+     */
+    @Override
+    public synchronized int read() throws IOException {
+      int result = 0;
+      result = in.read();
+      if (result != -1) {
+        pos++;
+        if (statistics != null) {
+          statistics.incrementBytesRead(1);
+        }
+      }
+
+      // Return to the caller with the result.
+      //
+      return result;
+    }
+
+    /*
+     * Reads up to len bytes of data from the input stream into an array of
+     * bytes. An attempt is made to read as many as len bytes, but a smaller
+     * number may be read. The number of bytes actually read is returned as an
+     * integer. This method blocks until input data is available, end of file is
+     * detected, or an exception is thrown. If len is zero, then no bytes are
+     * read and 0 is returned; otherwise, there is an attempt to read at least
+     * one byte. If no byte is available because the stream is at end of file,
+     * the value -1 is returned; otherwise, at least one byte is read and stored
+     * into b.
+     * 
+     * @param b -- the buffer into which data is read
+     * 
+     * @param off -- the start offset in the array b at which data is written
+     * 
+     * @param len -- the maximum number of bytes read
+     * 
+     * @ returns int The total number of byes read into the buffer, or -1 if
+     * there is no more data because the end of stream is reached.
+     */
+    @Override
+    public synchronized int read(byte[] b, int off, int len) throws IOException {
+      int result = 0;
+      result = in.read(b, off, len);
+      if (result > 0) {
+        pos += result;
+      }
+
+      if (null != statistics) {
+        statistics.incrementBytesRead(result);
+      }
+
+      // Return to the caller with the result.
+      return result;
+    }
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+    }
+
+    @Override
+    public synchronized void seek(long pos) throws IOException {
+      in.close();
+      in = store.retrieve(key, pos);
+      this.pos = pos;
+    }
+
+    @Override
+    public synchronized long getPos() throws IOException {
+      return pos;
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+  }
+
+  private class NativeAzureFsOutputStream extends OutputStream {
+    // We should not override flush() to actually close current block and flush
+    // to DFS, this will break applications that assume flush() is a no-op.
+    // Applications are advised to use Syncable.hflush() for that purpose.
+    // NativeAzureFsOutputStream needs to implement Syncable if needed.
+    private String key;
+    private String keyEncoded;
+    private OutputStream out;
+
+    public NativeAzureFsOutputStream(OutputStream out, String aKey,
+        String anEncodedKey) throws IOException {
+      // Check input arguments. The output stream should be non-null and the
+      // keys
+      // should be valid strings.
+      if (null == out) {
+        throw new IllegalArgumentException(
+            "Illegal argument: the output stream is null.");
+      }
+
+      if (null == aKey || 0 == aKey.length()) {
+        throw new IllegalArgumentException(
+            "Illegal argument the key string is null or empty");
+      }
+
+      if (null == anEncodedKey || 0 == anEncodedKey.length()) {
+        throw new IllegalArgumentException(
+            "Illegal argument the encoded key string is null or empty");
+      }
+
+      // Initialize the member variables with the incoming parameters.
+      this.out = out;
+
+      setKey(aKey);
+      setEncodedKey(anEncodedKey);
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+      if (out != null) {
+        // Close the output stream and decode the key for the output stream
+        // before returning to the caller.
+        //
+        out.close();
+        restoreKey();
+        out = null;
+      }
+    }
+
+    /**
+     * Writes the specified byte to this output stream. The general contract for
+     * write is that one byte is written to the output stream. The byte to be
+     * written is the eight low-order bits of the argument b. The 24 high-order
+     * bits of b are ignored.
+     * 
+     * @param b
+     *          32-bit integer of block of 4 bytes
+     */
+    @Override
+    public void write(int b) throws IOException {
+      out.write(b);
+    }
+
+    /**
+     * Writes b.length bytes from the specified byte array to this output
+     * stream. The general contract for write(b) is that it should have exactly
+     * the same effect as the call write(b, 0, b.length).
+     * 
+     * @param b
+     *          Block of bytes to be written to the output stream.
+     */
+    @Override
+    public void write(byte[] b) throws IOException {
+      out.write(b);
+    }
+
+    /**
+     * Writes <code>len</code> from the specified byte array starting at offset
+     * <code>off</code> to the output stream. The general contract for write(b,
+     * off, len) is that some of the bytes in the array <code>
+     * b</code b> are written to the output stream in order; element
+     * <code>b[off]</code> is the first byte written and
+     * <code>b[off+len-1]</code> is the last byte written by this operation.
+     * 
+     * @param b
+     *          Byte array to be written.
+     * @param off
+     *          Write this offset in stream.
+     * @param len
+     *          Number of bytes to be written.
+     */
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      out.write(b, off, len);
+    }
+
+    /**
+     * Get the blob name.
+     * 
+     * @return String Blob name.
+     */
+    public String getKey() {
+      return key;
+    }
+
+    /**
+     * Set the blob name.
+     * 
+     * @param key
+     *          Blob name.
+     */
+    public void setKey(String key) {
+      this.key = key;
+    }
+
+    /**
+     * Get the blob name.
+     * 
+     * @return String Blob name.
+     */
+    public String getEncodedKey() {
+      return keyEncoded;
+    }
+
+    /**
+     * Set the blob name.
+     * 
+     * @param anEncodedKey
+     *          Blob name.
+     */
+    public void setEncodedKey(String anEncodedKey) {
+      this.keyEncoded = anEncodedKey;
+    }
+
+    /**
+     * Restore the original key name from the m_key member variable. Note: The
+     * output file stream is created with an encoded blob store key to guarantee
+     * load balancing on the front end of the Azure storage partition servers.
+     * The create also includes the name of the original key value which is
+     * stored in the m_key member variable. This method should only be called
+     * when the stream is closed.
+     * 
+     * @param anEncodedKey
+     *          Encoding of the original key stored in m_key member.
+     */
+    private void restoreKey() throws IOException {
+      store.rename(getEncodedKey(), getKey());
+    }
+  }
+
+  private URI uri;
+  private NativeFileSystemStore store;
+  private AzureNativeFileSystemStore actualStore;
+  private Path workingDir;
+  private long blockSize = MAX_AZURE_BLOCK_SIZE;
+  private static boolean suppressRetryPolicy = false;
+
+  public NativeAzureFileSystem() {
+    // set store in initialize()
+  }
+
+  public NativeAzureFileSystem(NativeFileSystemStore store) {
+    this.store = store;
+  }
+
+  /**
+   * Suppress the default retry policy for the Storage, useful in unit tests to
+   * test negative cases without waiting forever.
+   */
+  @VisibleForTesting
+  static void suppressRetryPolicy() {
+    suppressRetryPolicy = true;
+  }
+
+  /**
+   * Undo the effect of suppressRetryPolicy.
+   */
+  @VisibleForTesting
+  static void resumeRetryPolicy() {
+    suppressRetryPolicy = false;
+  }
+
+  /**
+   * Checks if the given URI scheme is a scheme that's affiliated with the Azure
+   * File System.
+   * 
+   * @param scheme
+   *          The URI scheme.
+   * @return true iff it's an Azure File System URI scheme.
+   */
+  private static boolean isWasbScheme(String scheme) {
+    // The valid schemes are: asv (old name), asvs (old name over HTTPS),
+    // wasb (new name), wasbs (new name over HTTPS).
+    return scheme != null
+        && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs")
+            || scheme.equalsIgnoreCase("wasb") || scheme
+              .equalsIgnoreCase("wasbs"));
+  }
+
+  /**
+   * Puts in the authority of the default file system if it is a WASB file
+   * system and the given URI's authority is null.
+   * 
+   * @return The URI with reconstructed authority if necessary and possible.
+   */
+  private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
+    if (null == uri.getAuthority()) {
+      // If WASB is the default file system, get the authority from there
+      URI defaultUri = FileSystem.getDefaultUri(conf);
+      if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) {
+        try {
+          // Reconstruct the URI with the authority from the default URI.
+          return new URI(uri.getScheme(), defaultUri.getAuthority(),
+              uri.getPath(), uri.getQuery(), uri.getFragment());
+        } catch (URISyntaxException e) {
+          // This should never happen.
+          throw new Error("Bad URI construction", e);
+        }
+      }
+    }
+    return uri;
+  }
+
+  @Override
+  protected void checkPath(Path path) {
+    // Make sure to reconstruct the path's authority if needed
+    super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(),
+        getConf())));
+  }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    // Check authority for the URI to guarantee that it is non-null.
+    uri = reconstructAuthorityIfNeeded(uri, conf);
+    if (null == uri.getAuthority()) {
+      final String errMsg = String
+          .format("Cannot initialize WASB file system, URI authority not recognized.");
+      throw new IllegalArgumentException(errMsg);
+    }
+    super.initialize(uri, conf);
+
+    if (store == null) {
+      store = createDefaultStore(conf);
+    }
+
+    store.initialize(uri, conf);
+    setConf(conf);
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+    this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
+        .getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
+    this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
+        MAX_AZURE_BLOCK_SIZE);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("NativeAzureFileSystem. Initializing.");
+      LOG.debug("  blockSize  = "
+          + conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
+    }
+
+  }
+
+  private NativeFileSystemStore createDefaultStore(Configuration conf) {
+    actualStore = new AzureNativeFileSystemStore();
+
+    if (suppressRetryPolicy) {
+      actualStore.suppressRetryPolicy();
+    }
+    return actualStore;
+  }
+
+  // Note: The logic for this method is confusing as to whether it strips the
+  // last slash or not (it adds it in the beginning, then strips it at the end).
+  // We should revisit that.
+  private String pathToKey(Path path) {
+    // Convert the path to a URI to parse the scheme, the authority, and the
+    // path from the path object.
+    URI tmpUri = path.toUri();
+    String pathUri = tmpUri.getPath();
+
+    // The scheme and authority is valid. If the path does not exist add a "/"
+    // separator to list the root of the container.
+    Path newPath = path;
+    if ("".equals(pathUri)) {
+      newPath = new Path(tmpUri.toString() + Path.SEPARATOR);
+    }
+
+    // Verify path is absolute if the path refers to a windows drive scheme.
+    if (!newPath.isAbsolute()) {
+      throw new IllegalArgumentException("Path must be absolute: " + path);
+    }
+
+    String key = null;
+    key = newPath.toUri().getPath();
+    if (key.length() == 1) {
+      return key;
+    } else {
+      return key.substring(1); // remove initial slash
+    }
+  }
+
+  private static Path keyToPath(String key) {
+    if (key.equals("/")) {
+      return new Path("/"); // container
+    }
+    return new Path("/" + key);
+  }
+
+  private Path makeAbsolute(Path path) {
+    if (path.isAbsolute()) {
+      return path;
+    }
+    return new Path(workingDir, path);
+  }
+
+  /**
+   * For unit test purposes, retrieves the AzureNativeFileSystemStore store
+   * backing this file system.
+   * 
+   * @return The store object.
+   */
+  @VisibleForTesting
+  AzureNativeFileSystemStore getStore() {
+    return actualStore;
+  }
+
+  /** This optional operation is not yet supported. */
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
+      throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating file: " + f.toString());
+    }
+
+    if (containsColon(f)) {
+      throw new IOException("Cannot create file " + f
+          + " through WASB that has colons in the name");
+    }
+
+    Path absolutePath = makeAbsolute(f);
+    String key = pathToKey(absolutePath);
+
+    FileMetadata existingMetadata = store.retrieveMetadata(key);
+    if (existingMetadata != null) {
+      if (existingMetadata.isDir()) {
+        throw new IOException("Cannot create file " + f
+            + "; already exists as a directory.");
+      }
+      if (!overwrite) {
+        throw new IOException("File already exists:" + f);
+      }
+    }
+
+    Path parentFolder = absolutePath.getParent();
+    if (parentFolder != null && parentFolder.getParent() != null) { // skip root
+      // Update the parent folder last modified time if the parent folder
+      // already exists.
+      String parentKey = pathToKey(parentFolder);
+      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
+      if (parentMetadata != null
+          && parentMetadata.isDir()
+          && parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
+        store.updateFolderLastModifiedTime(parentKey);
+      } else {
+        // Make sure that the parent folder exists.
+        mkdirs(parentFolder, permission);
+      }
+    }
+
+    // Open the output blob stream based on the encoded key.
+    String keyEncoded = encodeKey(key);
+
+    // Mask the permission first (with the default permission mask as well).
+    FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile);
+    PermissionStatus permissionStatus = createPermissionStatus(masked);
+
+    // First create a blob at the real key, pointing back to the temporary file
+    // This accomplishes a few things:
+    // 1. Makes sure we can create a file there.
+    // 2. Makes it visible to other concurrent threads/processes/nodes what
+    // we're
+    // doing.
+    // 3. Makes it easier to restore/cleanup data in the event of us crashing.
+    store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
+
+    // The key is encoded to point to a common container at the storage server.
+    // This reduces the number of splits on the server side when load balancing.
+    // Ingress to Azure storage can take advantage of earlier splits. We remove
+    // the root path to the key and prefix a random GUID to the tail (or leaf
+    // filename) of the key. Keys are thus broadly and randomly distributed over
+    // a single container to ease load balancing on the storage server. When the
+    // blob is committed it is renamed to its earlier key. Uncommitted blocks
+    // are not cleaned up and we leave it to Azure storage to garbage collect
+    // these
+    // blocks.
+    OutputStream bufOutStream = new NativeAzureFsOutputStream(store.storefile(
+        keyEncoded, permissionStatus), key, keyEncoded);
+
+    // Construct the data output stream from the buffered output stream.
+    FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
+
+    // Return data output stream to caller.
+    return fsOut;
+  }
+
+  @Override
+  @Deprecated
+  public boolean delete(Path path) throws IOException {
+    return delete(path, true);
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Deleting file: " + f.toString());
+    }
+
+    Path absolutePath = makeAbsolute(f);
+    String key = pathToKey(absolutePath);
+
+    // Capture the metadata for the path.
+    //
+    FileMetadata metaFile = store.retrieveMetadata(key);
+
+    if (null == metaFile) {
+      // The path to be deleted does not exist.
+      return false;
+    }
+
+    // The path exists, determine if it is a folder containing objects,
+    // an empty folder, or a simple file and take the appropriate actions.
+    if (!metaFile.isDir()) {
+      // The path specifies a file. We need to check the parent path
+      // to make sure it's a proper materialized directory before we
+      // delete the file. Otherwise we may get into a situation where
+      // the file we were deleting was the last one in an implicit directory
+      // (e.g. the blob store only contains the blob a/b and there's no
+      // corresponding directory blob a) and that would implicitly delete
+      // the directory as well, which is not correct.
+      Path parentPath = absolutePath.getParent();
+      if (parentPath.getParent() != null) {// Not root
+        String parentKey = pathToKey(parentPath);
+        FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
+        if (!parentMetadata.isDir()) {
+          // Invalid state: the parent path is actually a file. Throw.
+          throw new AzureException("File " + f + " has a parent directory "
+              + parentPath + " which is also a file. Can't resolve.");
+        }
+        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found an implicit parent directory while trying to"
+                + " delete the file " + f + ". Creating the directory blob for"
+                + " it in " + parentKey + ".");
+          }
+          store.storeEmptyFolder(parentKey,
+              createPermissionStatus(FsPermission.getDefault()));
+        } else {
+          store.updateFolderLastModifiedTime(parentKey);
+        }
+      }
+      store.delete(key);
+    } else {
+      // The path specifies a folder. Recursively delete all entries under the
+      // folder.
+      Path parentPath = absolutePath.getParent();
+      if (parentPath.getParent() != null) {
+        String parentKey = pathToKey(parentPath);
+        FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
+
+        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found an implicit parent directory while trying to"
+                + " delete the directory " + f
+                + ". Creating the directory blob for" + " it in " + parentKey
+                + ".");
+          }
+          store.storeEmptyFolder(parentKey,
+              createPermissionStatus(FsPermission.getDefault()));
+        }
+      }
+
+      // List all the blobs in the current folder.
+      String priorLastKey = null;
+      PartialListing listing = store.listAll(key, AZURE_LIST_ALL, 1,
+          priorLastKey);
+      FileMetadata[] contents = listing.getFiles();
+      if (!recursive && contents.length > 0) {
+        // The folder is non-empty and recursive delete was not specified.
+        // Throw an exception indicating that a non-recursive delete was
+        // specified for a non-empty folder.
+        throw new IOException("Non-recursive delete of non-empty directory "
+            + f.toString());
+      }
+
+      // Delete all the files in the folder.
+      for (FileMetadata p : contents) {
+        // Tag on the directory name found as the suffix of the suffix of the
+        // parent directory to get the new absolute path.
+        String suffix = p.getKey().substring(
+            p.getKey().lastIndexOf(PATH_DELIMITER));
+        if (!p.isDir()) {
+          store.delete(key + suffix);
+        } else {
+          // Recursively delete contents of the sub-folders. Notice this also
+          // deletes the blob for the directory.
+          if (!delete(new Path(f.toString() + suffix), true)) {
+            return false;
+          }
+        }
+      }
+      store.delete(key);
+
+      // Update parent directory last modified time
+      Path parent = absolutePath.getParent();
+      if (parent != null && parent.getParent() != null) { // not root
+        String parentKey = pathToKey(parent);
+        store.updateFolderLastModifiedTime(parentKey);
+      }
+    }
+
+    // File or directory was successfully deleted.
+    return true;
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Getting the file status for " + f.toString());
+    }
+
+    // Capture the absolute path and the path to key.
+    Path absolutePath = makeAbsolute(f);
+    String key = pathToKey(absolutePath);
+    if (key.length() == 0) { // root always exists
+      return newDirectory(null, absolutePath);
+    }
+
+    // The path is either a folder or a file. Retrieve metadata to
+    // determine if it is a directory or file.
+    FileMetadata meta = store.retrieveMetadata(key);
+    if (meta != null) {
+      if (meta.isDir()) {
+        // The path is a folder with files in it.
+        //
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Path " + f.toString() + "is a folder.");
+        }
+
+        // Return reference to the directory object.
+        return newDirectory(meta, absolutePath);
+      }
+
+      // The path is a file.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Found the path: " + f.toString() + " as a file.");
+      }
+
+      // Return with reference to a file object.
+      return newFile(meta, absolutePath);
+    }
+
+    // File not found. Throw exception no such file or directory.
+    // Note: Should never get to this point since the root always exists.
+    throw new FileNotFoundException(absolutePath
+        + ": No such file or directory.");
+  }
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  /**
+   * Retrieve the status of a given path if it is a file, or of all the
+   * contained files if it is a directory.
+   */
+  @Override
+  public FileStatus[] listStatus(Path f) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Listing status for " + f.toString());
+    }
+
+    Path absolutePath = makeAbsolute(f);
+    String key = pathToKey(absolutePath);
+    Set<FileStatus> status = new TreeSet<FileStatus>();
+    FileMetadata meta = store.retrieveMetadata(key);
+
+    if (meta != null) {
+      if (!meta.isDir()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found path as a file");
+        }
+        return new FileStatus[] { newFile(meta, absolutePath) };
+      }
+      String partialKey = null;
+      PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
+      for (FileMetadata fileMetadata : listing.getFiles()) {
+        Path subpath = keyToPath(fileMetadata.getKey());
+
+        // Test whether the metadata represents a file or directory and
+        // add the appropriate metadata object.
+        //
+        // Note: There was a very old bug here where directories were added
+        // to the status set as files flattening out recursive listings
+        // using "-lsr" down the file system hierarchy.
+        if (fileMetadata.isDir()) {
+          // Make sure we hide the temp upload folder
+          if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
+            // Don't expose that.
+            continue;
+          }
+          status.add(newDirectory(fileMetadata, subpath));
+        } else {
+          status.add(newFile(fileMetadata, subpath));
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Found path as a directory with " + status.size()
+            + " files in it.");
+      }
+    } else {
+      // There is no metadata found for the path.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Did not find any metadata for path: " + key);
+      }
+
+      throw new FileNotFoundException("File" + f + " does not exist.");
+    }
+
+    return status.toArray(new FileStatus[0]);
+  }
+
+  private FileStatus newFile(FileMetadata meta, Path path) {
+    return new FileStatus(meta.getLength(), false, 1, blockSize,
+        meta.getLastModified(), 0, meta.getPermissionStatus().getPermission(),
+        meta.getPermissionStatus().getUserName(), meta.getPermissionStatus()
+            .getGroupName(),
+        path.makeQualified(getUri(), getWorkingDirectory()));
+  }
+
+  private FileStatus newDirectory(FileMetadata meta, Path path) {
+    return new FileStatus(0, true, 1, blockSize, meta == null ? 0
+        : meta.getLastModified(), 0, meta == null ? FsPermission.getDefault()
+        : meta.getPermissionStatus().getPermission(), meta == null ? "" : meta
+        .getPermissionStatus().getUserName(), meta == null ? "" : meta
+        .getPermissionStatus().getGroupName(), path.makeQualified(getUri(),
+        getWorkingDirectory()));
+  }
+
+  private static enum UMaskApplyMode {
+    NewFile, NewDirectory, ChangeExistingFile, ChangeExistingDirectory,
+  }
+
+  /**
+   * Applies the applicable UMASK's on the given permission.
+   * 
+   * @param permission
+   *          The permission to mask.
+   * @param applyDefaultUmask
+   *          Whether to also apply the default umask.
+   * @return The masked persmission.
+   */
+  private FsPermission applyUMask(final FsPermission permission,
+      final UMaskApplyMode applyMode) {
+    FsPermission newPermission = new FsPermission(permission);
+    // Apply the default umask - this applies for new files or directories.
+    if (applyMode == UMaskApplyMode.NewFile
+        || applyMode == UMaskApplyMode.NewDirectory) {
+      newPermission = newPermission
+          .applyUMask(FsPermission.getUMask(getConf()));
+    }
+    return newPermission;
+  }
+
+  /**
+   * Creates the PermissionStatus object to use for the given permission, based
+   * on the current user in context.
+   * 
+   * @param permission
+   *          The permission for the file.
+   * @return The permission status object to use.
+   * @throws IOException
+   *           If login fails in getCurrentUser
+   */
+  private PermissionStatus createPermissionStatus(FsPermission permission)
+      throws IOException {
+    // Create the permission status for this file based on current user
+    return new PermissionStatus(UserGroupInformation.getCurrentUser()
+        .getShortUserName(), getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
+        AZURE_DEFAULT_GROUP_DEFAULT), permission);
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating directory: " + f.toString());
+    }
+
+    if (containsColon(f)) {
+      throw new IOException("Cannot create directory " + f
+          + " through WASB that has colons in the name");
+    }
+
+    Path absolutePath = makeAbsolute(f);
+    PermissionStatus permissionStatus = createPermissionStatus(applyUMask(
+        permission, UMaskApplyMode.NewDirectory));
+
+    ArrayList<String> keysToCreateAsFolder = new ArrayList<String>();
+    ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>();
+    boolean childCreated = false;
+    // Check that there is no file in the parent chain of the given path.
+    // Stop when you get to the root
+    for (Path current = absolutePath, parent = current.getParent(); parent != null; current = parent, parent = current
+        .getParent()) {
+      String currentKey = pathToKey(current);
+      FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
+      if (currentMetadata != null && !currentMetadata.isDir()) {
+        throw new IOException("Cannot create directory " + f + " because "
+            + current + " is an existing file.");
+      } else if (currentMetadata == null
+          || (currentMetadata.isDir() && currentMetadata
+              .getBlobMaterialization() == BlobMaterialization.Implicit)) {
+        keysToCreateAsFolder.add(currentKey);
+        childCreated = true;
+      } else {
+        // The directory already exists. Its last modified time need to be
+        // updated if there is a child directory created under it.
+        if (childCreated) {
+          keysToUpdateAsFolder.add(currentKey);
+        }
+        childCreated = false;
+      }
+    }
+
+    for (String currentKey : keysToCreateAsFolder) {
+      store.storeEmptyFolder(currentKey, permissionStatus);
+    }
+
+    // Take the time after finishing mkdirs as the modified time, and update all
+    // the existing directories' modified time to it uniformly.
+    final Calendar lastModifiedCalendar = Calendar
+        .getInstance(Utility.LOCALE_US);
+    lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE);
+    Date lastModified = lastModifiedCalendar.getTime();
+    for (String key : keysToUpdateAsFolder) {
+      store.updateFolderLastModifiedTime(key, lastModified);
+    }
+
+    // otherwise throws exception
+    return true;
+  }
+
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Opening file: " + f.toString());
+    }
+
+    Path absolutePath = makeAbsolute(f);
+    String key = pathToKey(absolutePath);
+    FileMetadata meta = store.retrieveMetadata(key);
+    if (meta == null) {
+      throw new FileNotFoundException(f.toString());
+    }
+    if (meta.isDir()) {
+      throw new FileNotFoundException(f.toString()
+          + " is a directory not a file.");
+    }
+
+    return new FSDataInputStream(new BufferedFSInputStream(
+        new NativeAzureFsInputStream(store.retrieve(key), key), bufferSize));
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Moving " + src + " to " + dst);
+    }
+
+    if (containsColon(dst)) {
+      throw new IOException("Cannot rename to file " + dst
+          + " through WASB that has colons in the name");
+    }
+
+    String srcKey = pathToKey(makeAbsolute(src));
+
+    if (srcKey.length() == 0) {
+      // Cannot rename root of file system
+      return false;
+    }
+
+    FileMetadata srcMetadata = store.retrieveMetadata(srcKey);
+    if (srcMetadata == null) {
+      // Source doesn't exist
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Source " + src + " doesn't exist, failing the rename.");
+      }
+      return false;
+    }
+
+    // Figure out the final destination
+    Path absoluteDst = makeAbsolute(dst);
+    String dstKey = pathToKey(absoluteDst);
+    FileMetadata dstMetadata = store.retrieveMetadata(dstKey);
+
+    // directory rename validations
+    if (srcMetadata.isDir()) {
+
+      // rename dir to self is an error
+      if (srcKey.equals(dstKey)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Renaming directory to itself is disallowed. path=" + src);
+        }
+        return false;
+      }
+
+      // rename dir to (sub-)child of self is an error. see
+      // FileSystemContractBaseTest.testRenameChildDirForbidden
+      if (dstKey.startsWith(srcKey + PATH_DELIMITER)) {
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Renaming directory to a itself is disallowed. src=" + src
+              + " dest=" + dst);
+        }
+        return false;
+      }
+    }
+
+    // file rename early checks
+    if (!srcMetadata.isDir()) {
+      if (srcKey.equals(dstKey)) {
+        // rename file to self is OK
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Renaming file to itself. This is allowed and is treated as no-op. path="
+              + src);
+        }
+        return true;
+      }
+    }
+
+    // More validations..
+    // If target is dir but target already exists, alter the dst to be a
+    // subfolder.
+    // eg move("/a/file.txt", "/b") where "/b" already exists causes the target
+    // to be "/c/file.txt
+    if (dstMetadata != null && dstMetadata.isDir()) {
+      dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
+      // Best would be to update dstMetadata, but it is not used further, so set
+      // it to null and skip the additional cost
+      dstMetadata = null;
+      // dstMetadata = store.retrieveMetadata(dstKey);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Destination " + dst
+            + " is a directory, adjusted the destination to be " + dstKey);
+      }
+
+      // rename dir to self is an error
+      if (srcKey.equals(dstKey)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Renaming directory to itself is disallowed. path=" + src);
+        }
+        return false;
+      }
+
+    } else if (dstMetadata != null) {
+      // Otherwise, attempting to overwrite a file is error
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Destination " + dst
+            + " is an already existing file, failing the rename.");
+      }
+      return false;
+    } else {
+      // Either dir or file and target doesn't exist.. Check that the parent
+      // directory exists.
+      FileMetadata parentOfDestMetadata = store
+          .retrieveMetadata(pathToKey(absoluteDst.getParent()));
+      if (parentOfDestMetadata == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Parent of the destination " + dst
+              + " doesn't exist, failing the rename.");
+        }
+        return false;
+      } else if (!parentOfDestMetadata.isDir()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Parent of the destination " + dst
+              + " is a file, failing the rename.");
+        }
+        return false;
+      }
+    }
+
+    // Validations complete, do the move.
+    if (!srcMetadata.isDir()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Source " + src + " found as a file, renaming.");
+      }
+      store.rename(srcKey, dstKey);
+    } else {
+      // Move everything inside the folder.
+      String priorLastKey = null;
+
+      // Calculate the index of the part of the string to be moved. That
+      // is everything on the path up to the folder name.
+      do {
+        // List all blobs rooted at the source folder.
+        PartialListing listing = store.listAll(srcKey, AZURE_LIST_ALL,
+            AZURE_UNBOUNDED_DEPTH, priorLastKey);
+
+        // Rename all the files in the folder.
+        for (FileMetadata file : listing.getFiles()) {
+          // Rename all materialized entries under the folder to point to the
+          // final destination.
+          if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
+            String srcName = file.getKey();
+            String suffix = srcName.substring(srcKey.length());
+            String dstName = dstKey + suffix;
+            store.rename(srcName, dstName);
+          }
+        }
+        priorLastKey = listing.getPriorLastKey();
+      } while (priorLastKey != null);
+      // Rename the top level empty blob for the folder.
+      if (srcMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
+        store.rename(srcKey, dstKey);
+      }
+    }
+
+    // Update both source and destination parent folder last modified time.
+    Path srcParent = makeAbsolute(keyToPath(srcKey)).getParent();
+    if (srcParent != null && srcParent.getParent() != null) { // not root
+      String srcParentKey = pathToKey(srcParent);
+
+      // ensure the srcParent is a materialized folder
+      FileMetadata srcParentMetadata = store.retrieveMetadata(srcParentKey);
+      if (srcParentMetadata.isDir()
+          && srcParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
+        store.storeEmptyFolder(srcParentKey,
+            createPermissionStatus(FsPermission.getDefault()));
+      }
+
+      store.updateFolderLastModifiedTime(srcParentKey);
+    }
+
+    Path destParent = makeAbsolute(keyToPath(dstKey)).getParent();
+    if (destParent != null && destParent.getParent() != null) { // not root
+      String dstParentKey = pathToKey(destParent);
+
+      // ensure the dstParent is a materialized folder
+      FileMetadata dstParentMetadata = store.retrieveMetadata(dstParentKey);
+      if (dstParentMetadata.isDir()
+          && dstParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
+        store.storeEmptyFolder(dstParentKey,
+            createPermissionStatus(FsPermission.getDefault()));
+      }
+
+      store.updateFolderLastModifiedTime(dstParentKey);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renamed " + src + " to " + dst + " successfully.");
+    }
+    return true;
+  }
+
+  /**
+   * Return an array containing hostnames, offset and size of portions of the
+   * given file. For WASB we'll just lie and give fake hosts to make sure we get
+   * many splits in MR jobs.
+   */
+  @Override
+  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
+      long len) throws IOException {
+    if (file == null) {
+      return null;
+    }
+
+    if ((start < 0) || (len < 0)) {
+      throw new IllegalArgumentException("Invalid start or len parameter");
+    }
+
+    if (file.getLen() < start) {
+      return new BlockLocation[0];
+    }
+    final String blobLocationHost = getConf().get(
+        AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
+        AZURE_BLOCK_LOCATION_HOST_DEFAULT);
+    final String[] name = { blobLocationHost };
+    final String[] host = { blobLocationHost };
+    long blockSize = file.getBlockSize();
+    if (blockSize <= 0) {
+      throw new IllegalArgumentException(
+          "The block size for the given file is not a positive number: "
+              + blockSize);
+    }
+    int numberOfLocations = (int) (len / blockSize)
+        + ((len % blockSize == 0) ? 0 : 1);
+    BlockLocation[] locations = new BlockLocation[numberOfLocations];
+    for (int i = 0; i < locations.length; i++) {
+      long currentOffset = start + (i * blockSize);
+      long currentLength = Math.min(blockSize, start + len - currentOffset);
+      locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
+    }
+    return locations;
+  }
+
+  /**
+   * Set the working directory to the given directory.
+   */
+  @Override
+  public void setWorkingDirectory(Path newDir) {
+    workingDir = makeAbsolute(newDir);
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Override
+  public void setPermission(Path p, FsPermission permission) throws IOException {
+    Path absolutePath = makeAbsolute(p);
+    String key = pathToKey(absolutePath);
+    FileMetadata metadata = store.retrieveMetadata(key);
+    if (metadata == null) {
+      throw new FileNotFoundException("File doesn't exist: " + p);
+    }
+    permission = applyUMask(permission,
+        metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
+            : UMaskApplyMode.ChangeExistingFile);
+    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
+      // It's an implicit folder, need to materialize it.
+      store.storeEmptyFolder(key, createPermissionStatus(permission));
+    } else if (!metadata.getPermissionStatus().getPermission()
+        .equals(permission)) {
+      store.changePermissionStatus(key, new PermissionStatus(metadata
+          .getPermissionStatus().getUserName(), metadata.getPermissionStatus()
+          .getGroupName(), permission));
+    }
+  }
+
+  @Override
+  public void setOwner(Path p, String username, String groupname)
+      throws IOException {
+    Path absolutePath = makeAbsolute(p);
+    String key = pathToKey(absolutePath);
+    FileMetadata metadata = store.retrieveMetadata(key);
+    if (metadata == null) {
+      throw new FileNotFoundException("File doesn't exist: " + p);
+    }
+    PermissionStatus newPermissionStatus = new PermissionStatus(
+        username == null ? metadata.getPermissionStatus().getUserName()
+            : username, groupname == null ? metadata.getPermissionStatus()
+            .getGroupName() : groupname, metadata.getPermissionStatus()
+            .getPermission());
+    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
+      // It's an implicit folder, need to materialize it.
+      store.storeEmptyFolder(key, newPermissionStatus);
+    } else {
+      store.changePermissionStatus(key, newPermissionStatus);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Call the base close() to close any resources there.
+    super.close();
+    // Close the store
+    store.close();
+  }
+
+  /**
+   * A handler that defines what to do with blobs whose upload was interrupted.
+   */
+  private abstract class DanglingFileHandler {
+    abstract void handleFile(FileMetadata file, FileMetadata tempFile)
+        throws IOException;
+  }
+
+  /**
+   * Handler implementation for just deleting dangling files and cleaning them
+   * up.
+   */
+  private class DanglingFileDeleter extends DanglingFileHandler {
+    @Override
+    void handleFile(FileMetadata file, FileMetadata tempFile)
+        throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleting dangling file " + file.getKey());
+      }
+      store.delete(file.getKey());
+      store.delete(tempFile.getKey());
+    }
+  }
+
+  /**
+   * Handler implementation for just moving dangling files to recovery location
+   * (/lost+found).
+   */
+  private class DanglingFileRecoverer extends DanglingFileHandler {
+    private final Path destination;
+
+    DanglingFileRecoverer(Path destination) {
+      this.destination = destination;
+    }
+
+    @Override
+    void handleFile(FileMetadata file, FileMetadata tempFile)
+        throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Recovering " + file.getKey());
+      }
+      // Move to the final destination
+      String finalDestinationKey = pathToKey(new Path(destination,
+          file.getKey()));
+      store.rename(tempFile.getKey(), finalDestinationKey);
+      if (!finalDestinationKey.equals(file.getKey())) {
+        // Delete the empty link file now that we've restored it.
+        store.delete(file.getKey());
+      }
+    }
+  }
+
+  /**
+   * Check if a path has colons in its name
+   */
+  private boolean containsColon(Path p) {
+    return p.toUri().getPath().toString().contains(":");
+  }
+
+  /**
+   * Implements recover and delete (-move and -delete) behaviors for handling
+   * dangling files (blobs whose upload was interrupted).
+   * 
+   * @param root
+   *          The root path to check from.
+   * @param handler
+   *          The handler that deals with dangling files.
+   */
+  private void handleFilesWithDanglingTempData(Path root,
+      DanglingFileHandler handler) throws IOException {
+    // Calculate the cut-off for when to consider a blob to be dangling.
+    long cutoffForDangling = new Date().getTime()
+        - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME,
+            AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
+    // Go over all the blobs under the given root and look for blobs to
+    // recover.
+    String priorLastKey = null;
+    do {
+      PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
+          AZURE_UNBOUNDED_DEPTH, priorLastKey);
+
+      for (FileMetadata file : listing.getFiles()) {
+        if (!file.isDir()) { // We don't recover directory blobs
+          // See if this blob has a link in it (meaning it's a place-holder
+          // blob for when the upload to the temp blob is complete).
+          String link = store.getLinkInFileMetadata(file.getKey());
+          if (link != null) {
+            // It has a link, see if the temp blob it is pointing to is
+            // existent and old enough to be considered dangling.
+            FileMetadata linkMetadata = store.retrieveMetadata(link);
+            if (linkMetadata != null
+                && linkMetadata.getLastModified() >= cutoffForDangling) {
+              // Found one!
+              handler.handleFile(file, linkMetadata);
+            }
+          }
+        }
+      }
+      priorLastKey = listing.getPriorLastKey();
+    } while (priorLastKey != null);
+  }
+
+  /**
+   * Looks under the given root path for any blob that are left "dangling",
+   * meaning that they are place-holder blobs that we created while we upload
+   * the data to a temporary blob, but for some reason we crashed in the middle
+   * of the upload and left them there. If any are found, we move them to the
+   * destination given.
+   * 
+   * @param root
+   *          The root path to consider.
+   * @param destination
+   *          The destination path to move any recovered files to.
+   * @throws IOException
+   */
+  public void recoverFilesWithDanglingTempData(Path root, Path destination)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Recovering files with dangling temp data in " + root);
+    }
+    handleFilesWithDanglingTempData(root,
+        new DanglingFileRecoverer(destination));
+  }
+
+  /**
+   * Looks under the given root path for any blob that are left "dangling",
+   * meaning that they are place-holder blobs that we created while we upload
+   * the data to a temporary blob, but for some reason we crashed in the middle
+   * of the upload and left them there. If any are found, we delete them.
+   * 
+   * @param root
+   *          The root path to consider.
+   * @throws IOException
+   */
+  public void deleteFilesWithDanglingTempData(Path root) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Deleting files with dangling temp data in " + root);
+    }
+    handleFilesWithDanglingTempData(root, new DanglingFileDeleter());
+  }
+
+  /**
+   * Encode the key with a random prefix for load balancing in Azure storage.
+   * Upload data to a random temporary file then do storage side renaming to
+   * recover the original key.
+   * 
+   * @param aKey
+   * @param numBuckets
+   * @return Encoded version of the original key.
+   */
+  private static String encodeKey(String aKey) {
+    // Get the tail end of the key name.
+    //
+    String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1,
+        aKey.length());
+
+    // Construct the randomized prefix of the file name. The prefix ensures the
+    // file always drops into the same folder but with a varying tail key name.
+    String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR
+        + UUID.randomUUID().toString();
+
+    // Concatenate the randomized prefix with the tail of the key name.
+    String randomizedKey = filePrefix + fileName;
+
+    // Return to the caller with the randomized key.
+    return randomizedKey;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
new file mode 100644
index 0000000..0fb3c22
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <p>
+ * An abstraction for a key-based {@link File} store.
+ * </p>
+ */
+@InterfaceAudience.Private
+interface NativeFileSystemStore {
+
+  void initialize(URI uri, Configuration conf) throws IOException;
+
+  void storeEmptyFolder(String key, PermissionStatus permissionStatus)
+      throws AzureException;
+
+  FileMetadata retrieveMetadata(String key) throws IOException;
+
+  DataInputStream retrieve(String key) throws IOException;
+
+  DataInputStream retrieve(String key, long byteRangeStart) throws IOException;
+
+  DataOutputStream storefile(String key, PermissionStatus permissionStatus)
+      throws AzureException;
+
+  void storeEmptyLinkFile(String key, String tempBlobKey,
+      PermissionStatus permissionStatus) throws AzureException;
+
+  String getLinkInFileMetadata(String key) throws AzureException;
+
+  PartialListing list(String prefix, final int maxListingCount,
+      final int maxListingDepth) throws IOException;
+
+  PartialListing list(String prefix, final int maxListingCount,
+      final int maxListingDepth, String priorLastKey) throws IOException;
+
+  PartialListing listAll(String prefix, final int maxListingCount,
+      final int maxListingDepth, String priorLastKey) throws IOException;
+
+  void changePermissionStatus(String key, PermissionStatus newPermission)
+      throws AzureException;
+
+  void delete(String key) throws IOException;
+
+  void rename(String srcKey, String dstKey) throws IOException;
+
+  /**
+   * Delete all keys with the given prefix. Used for testing.
+   * 
+   * @throws IOException
+   */
+  @VisibleForTesting
+  void purge(String prefix) throws IOException;
+
+  /**
+   * Diagnostic method to dump state to the console.
+   * 
+   * @throws IOException
+   */
+  void dump() throws IOException;
+
+  void close();
+
+  void updateFolderLastModifiedTime(String key) throws AzureException;
+
+  void updateFolderLastModifiedTime(String key, Date lastModified)
+      throws AzureException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
new file mode 100644
index 0000000..9e49de8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * <p>
+ * Holds information on a directory listing for a {@link NativeFileSystemStore}.
+ * This includes the {@link FileMetadata files} and directories (their names)
+ * contained in a directory.
+ * </p>
+ * <p>
+ * This listing may be returned in chunks, so a <code>priorLastKey</code> is
+ * provided so that the next chunk may be requested.
+ * </p>
+ * 
+ * @see NativeFileSystemStore#list(String, int, String)
+ */
+@InterfaceAudience.Private
+class PartialListing {
+
+  private final String priorLastKey;
+  private final FileMetadata[] files;
+  private final String[] commonPrefixes;
+
+  public PartialListing(String priorLastKey, FileMetadata[] files,
+      String[] commonPrefixes) {
+    this.priorLastKey = priorLastKey;
+    this.files = files;
+    this.commonPrefixes = commonPrefixes;
+  }
+
+  public FileMetadata[] getFiles() {
+    return files;
+  }
+
+  public String[] getCommonPrefixes() {
+    return commonPrefixes;
+  }
+
+  public String getPriorLastKey() {
+    return priorLastKey;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
new file mode 100644
index 0000000..25f2883
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.net.HttpURLConnection;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.RequestResult;
+import com.microsoft.windowsazure.storage.ResponseReceivedEvent;
+import com.microsoft.windowsazure.storage.SendingRequestEvent;
+import com.microsoft.windowsazure.storage.StorageEvent;
+
+/*
+ * Self throttling is implemented by hooking into send & response callbacks 
+ * One instance of this class is created per operationContext so each blobUpload/blobDownload/etc.
+ * 
+ * Self throttling only applies to 2nd and subsequent packets of an operation.  This is a simple way to 
+ * ensure it only affects bulk transfers and not every tiny request.
+ * 
+ * A blobDownload will involve sequential packet transmissions and so there are no concurrency concerns
+ * A blobUpload will generally involve concurrent upload worker threads that share one operationContext and one throttling instance.
+ *   -- we do not track the latencies for each worker thread as they are doing similar work and will rarely collide in practice.  
+ *   -- concurrent access to lastE2Edelay must be protected.  
+ *       -- volatile is necessary and should be sufficient to protect simple access to primitive values (java 1.5 onwards) 
+ *       -- synchronized{} blocks are also used to be conservative and for easier maintenance.
+ *   
+ * If an operation were to perform concurrent GETs and PUTs there is the possibility of getting confused regarding
+ * whether lastE2Edelay was a read or write measurement.  This scenario does not occur.
+ *
+ * readFactor  = target read throughput as factor of unrestricted throughput.
+ * writeFactor = target write throughput as factor of unrestricted throughput.
+ * 
+ * As we introduce delays it is important to only measure the actual E2E latency and not the augmented latency
+ * To achieve this, we fiddle the 'startDate' of the transfer tracking object.
+ */
+
+
+/**
+ * 
+ * Introduces delays in our Azure traffic to prevent overrunning the server-side throttling limits.
+ *
+ */
+@InterfaceAudience.Private
+public class SelfThrottlingIntercept {
+  public static final Log LOG = LogFactory
+      .getLog(SelfThrottlingIntercept.class);
+
+  private final float readFactor;
+  private final float writeFactor;
+
+  // Concurrency: access to non-final members must be thread-safe
+  private long lastE2Elatency;
+
+  public SelfThrottlingIntercept(OperationContext operationContext,
+      float readFactor, float writeFactor) {
+    this.readFactor = readFactor;
+    this.writeFactor = writeFactor;
+  }
+
+  public static void hook(OperationContext operationContext, float readFactor,
+      float writeFactor) {
+
+    SelfThrottlingIntercept throttler = new SelfThrottlingIntercept(
+        operationContext, readFactor, writeFactor);
+    ResponseReceivedListener responseListener = throttler.new ResponseReceivedListener();
+    SendingRequestListener sendingListener = throttler.new SendingRequestListener();
+
+    operationContext.getResponseReceivedEventHandler().addListener(
+        responseListener);
+    operationContext.getSendingRequestEventHandler().addListener(
+        sendingListener);
+  }
+
+  public void responseReceived(ResponseReceivedEvent event) {
+    RequestResult result = event.getRequestResult();
+    Date startDate = result.getStartDate();
+    Date stopDate = result.getStopDate();
+    long elapsed = stopDate.getTime() - startDate.getTime();
+
+    synchronized (this) {
+      this.lastE2Elatency = elapsed;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      int statusCode = result.getStatusCode();
+      String etag = result.getEtag();
+      HttpURLConnection urlConnection = (HttpURLConnection) event
+          .getConnectionObject();
+      int contentLength = urlConnection.getContentLength();
+      String requestMethod = urlConnection.getRequestMethod();
+      long threadId = Thread.currentThread().getId();
+      LOG.debug(String
+          .format(
+              "SelfThrottlingIntercept:: ResponseReceived: threadId=%d, Status=%d, Elapsed(ms)=%d, ETAG=%s, contentLength=%d, requestMethod=%s",
+              threadId, statusCode, elapsed, etag, contentLength, requestMethod));
+    }
+  }
+
+  public void sendingRequest(SendingRequestEvent sendEvent) {
+    long lastLatency;
+    boolean operationIsRead; // for logging
+    synchronized (this) {
+
+      lastLatency = this.lastE2Elatency;
+    }
+
+    float sleepMultiple;
+    HttpURLConnection urlConnection = (HttpURLConnection) sendEvent
+        .getConnectionObject();
+
+    // Azure REST API never uses POST, so PUT is a sufficient test for an
+    // upload.
+    if (urlConnection.getRequestMethod().equalsIgnoreCase("PUT")) {
+      operationIsRead = false;
+      sleepMultiple = (1 / writeFactor) - 1;
+    } else {
+      operationIsRead = true;
+      sleepMultiple = (1 / readFactor) - 1;
+    }
+
+    long sleepDuration = (long) (sleepMultiple * lastLatency);
+    if (sleepDuration < 0) {
+      sleepDuration = 0;
+    }
+
+    if (sleepDuration > 0) {
+      try {
+        // Thread.sleep() is not exact but it seems sufficiently accurate for
+        // our needs. If needed this could become a loop of small waits that
+        // tracks actual
+        // elapsed time.
+        Thread.sleep(sleepDuration);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+
+      // reset to avoid counting the sleep against request latency
+      sendEvent.getRequestResult().setStartDate(new Date());
+    }
+
+    if (LOG.isDebugEnabled()) {
+      boolean isFirstRequest = (lastLatency == 0);
+      long threadId = Thread.currentThread().getId();
+      LOG.debug(String
+          .format(
+              " SelfThrottlingIntercept:: SendingRequest:   threadId=%d, requestType=%s, isFirstRequest=%b, sleepDuration=%d",
+              threadId, operationIsRead ? "read " : "write", isFirstRequest,
+              sleepDuration));
+    }
+  }
+
+  // simply forwards back to the main class.
+  // this is necessary as our main class cannot implement two base-classes.
+  @InterfaceAudience.Private
+  class SendingRequestListener extends StorageEvent<SendingRequestEvent> {
+
+    @Override
+    public void eventOccurred(SendingRequestEvent event) {
+      sendingRequest(event);
+    }
+  }
+
+  // simply forwards back to the main class.
+  // this is necessary as our main class cannot implement two base-classes.
+  @InterfaceAudience.Private
+  class ResponseReceivedListener extends StorageEvent<ResponseReceivedEvent> {
+
+    @Override
+    public void eventOccurred(ResponseReceivedEvent event) {
+      responseReceived(event);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java
new file mode 100644
index 0000000..18f173e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.net.HttpURLConnection;
+import java.security.InvalidKeyException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.microsoft.windowsazure.storage.Constants.HeaderConstants;
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.SendingRequestEvent;
+import com.microsoft.windowsazure.storage.StorageCredentials;
+import com.microsoft.windowsazure.storage.StorageEvent;
+import com.microsoft.windowsazure.storage.StorageException;
+
+/**
+ * Manages the lifetime of binding on the operation contexts to intercept send
+ * request events to Azure storage.
+ */
+@InterfaceAudience.Private
+public final class SendRequestIntercept extends StorageEvent<SendingRequestEvent> {
+
+  public static final Log LOG = LogFactory.getLog(SendRequestIntercept.class);
+
+  private static final String ALLOW_ALL_REQUEST_PRECONDITIONS = "*";
+  private final StorageCredentials storageCreds;
+  private final boolean allowConcurrentOOBIo;
+  private final OperationContext opContext;
+
+  /**
+   * Getter returning the storage account credentials.
+   * 
+   * @return storageCreds - account storage credentials.
+   */
+  private StorageCredentials getCredentials() {
+    return storageCreds;
+  }
+
+  /**
+   * Query if out-of-band I/Os are allowed.
+   * 
+   * return allowConcurrentOOBIo - true if OOB I/O is allowed, and false
+   * otherwise.
+   */
+  private boolean isOutOfBandIoAllowed() {
+    return allowConcurrentOOBIo;
+  }
+
+  /**
+   * Getter returning the operation context.
+   * 
+   * @return storageCreds - account storage credentials.
+   */
+  private OperationContext getOperationContext() {
+    return opContext;
+  }
+
+  /**
+   * Constructor for SendRequestThrottle.
+   * 
+   * @param storageCreds
+   *          - storage account credentials for signing packets.
+   * 
+   */
+  private SendRequestIntercept(StorageCredentials storageCreds,
+      boolean allowConcurrentOOBIo, OperationContext opContext) {
+    // Capture the send delay callback interface.
+    this.storageCreds = storageCreds;
+    this.allowConcurrentOOBIo = allowConcurrentOOBIo;
+    this.opContext = opContext;
+  }
+
+  /**
+   * Binds a new lister to the operation context so the WASB file system can
+   * appropriately intercept sends. By allowing concurrent OOB I/Os, we bypass
+   * the blob immutability check when reading streams.
+   * 
+   * @param opContext
+   *          The operation context to bind to listener.
+   * 
+   * @param allowConcurrentOOBIo
+   *          True if reads are allowed with concurrent OOB writes.
+   */
+  public static void bind(StorageCredentials storageCreds,
+      OperationContext opContext, boolean allowConcurrentOOBIo) {
+    SendRequestIntercept sendListener = new SendRequestIntercept(storageCreds,
+        allowConcurrentOOBIo, opContext);
+    opContext.getSendingRequestEventHandler().addListener(sendListener);
+  }
+
+  /**
+   * Handler which processes the sending request event from Azure SDK. The
+   * handler simply sets reset the conditional header to make all read requests
+   * unconditional if reads with concurrent OOB writes are allowed.
+   * 
+   * @param sendEvent
+   *          - send event context from Windows Azure SDK.
+   */
+  @Override
+  public void eventOccurred(SendingRequestEvent sendEvent) {
+
+    if (!(sendEvent.getConnectionObject() instanceof HttpURLConnection)) {
+      // Pass if there is no HTTP connection associated with this send
+      // request.
+      return;
+    }
+
+    // Capture the HTTP URL connection object and get size of the payload for
+    // the request.
+    HttpURLConnection urlConnection = (HttpURLConnection) sendEvent
+        .getConnectionObject();
+
+    // Determine whether this is a download request by checking that the request
+    // method
+    // is a "GET" operation.
+    if (urlConnection.getRequestMethod().equalsIgnoreCase("GET")
+        && isOutOfBandIoAllowed()) {
+      // If concurrent reads on OOB writes are allowed, reset the if-match
+      // condition on the conditional header.
+      urlConnection.setRequestProperty(HeaderConstants.IF_MATCH,
+          ALLOW_ALL_REQUEST_PRECONDITIONS);
+
+      // In the Java AzureSDK the packet is signed before firing the
+      // SendRequest. Setting
+      // the conditional packet header property changes the contents of the
+      // packet, therefore the packet has to be re-signed.
+      try {
+        // Sign the request. GET's have no payload so the content length is
+        // zero.
+        getCredentials().signBlobAndQueueRequest(urlConnection, -1L, getOperationContext());
+      } catch (InvalidKeyException e) {
+        // Log invalid key exception to track signing error before the send
+        // fails.
+        String errString = String.format(
+            "Received invalid key exception when attempting sign packet."
+                + " Cause: %s", e.getCause().toString());
+        LOG.error(errString);
+      } catch (StorageException e) {
+        // Log storage exception to track signing error before the call fails.
+        String errString = String.format(
+            "Received storage exception when attempting to sign packet."
+                + " Cause: %s", e.getCause().toString());
+        LOG.error(errString);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
new file mode 100644
index 0000000..2ce8ebd
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
+
+/**
+ * Shell decryption key provider which invokes an external script that will
+ * perform the key decryption.
+ */
+@InterfaceAudience.Private
+public class ShellDecryptionKeyProvider extends SimpleKeyProvider {
+  static final String KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
+
+  @Override
+  public String getStorageAccountKey(String accountName, Configuration conf)
+      throws KeyProviderException {
+    String envelope = super.getStorageAccountKey(accountName, conf);
+
+    final String command = conf.get(KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT);
+    if (command == null) {
+      throw new KeyProviderException(
+          "Script path is not specified via fs.azure.shellkeyprovider.script");
+    }
+
+    String[] cmd = command.split(" ");
+    String[] cmdWithEnvelope = Arrays.copyOf(cmd, cmd.length + 1);
+    cmdWithEnvelope[cmdWithEnvelope.length - 1] = envelope;
+
+    String decryptedKey = null;
+    try {
+      decryptedKey = Shell.execCommand(cmdWithEnvelope);
+    } catch (IOException ex) {
+      throw new KeyProviderException(ex);
+    }
+
+    // trim any whitespace
+    return decryptedKey.trim();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82268d87/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
new file mode 100644
index 0000000..ef44a85
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Key provider that simply returns the storage account key from the
+ * configuration as plaintext.
+ */
+@InterfaceAudience.Private
+public class SimpleKeyProvider implements KeyProvider {
+
+  protected static final String KEY_ACCOUNT_KEY_PREFIX = "fs.azure.account.key.";
+
+  @Override
+  public String getStorageAccountKey(String accountName, Configuration conf)
+      throws KeyProviderException {
+    return conf.get(getStorageAccountKeyName(accountName));
+  }
+
+  protected String getStorageAccountKeyName(String accountName) {
+    return KEY_ACCOUNT_KEY_PREFIX + accountName;
+  }
+}


Mime
View raw message