hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1605187 [1/2] - in /hadoop/common/trunk/hadoop-tools/hadoop-azure: ./ src/main/java/org/apache/hadoop/fs/azure/ src/main/java/org/apache/hadoop/fs/azure/metrics/ src/test/java/org/apache/hadoop/fs/azure/ src/test/java/org/apache/hadoop/fs/...
Date Tue, 24 Jun 2014 20:52:45 GMT
Author: cnauroth
Date: Tue Jun 24 20:52:44 2014
New Revision: 1605187

URL: http://svn.apache.org/r1605187
Log:
HADOOP-10728. Metrics system for Windows Azure Storage Filesystem. Contributed by Dexter Bradshaw, Mostafa Elhemali, Xi Fang, Johannes Klein, David Lao, Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys, Alexander Stojanovic, Brian Swan, and Min Wei.

Added:
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestBandwidthGaugeUpdater.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestNativeAzureFileSystemMetricsSystem.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestRollingWindowAverage.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/resources/hadoop-metrics2-azure-file-system.properties
Modified:
    hadoop/common/trunk/hadoop-tools/hadoop-azure/.gitignore
    hadoop/common/trunk/hadoop-tools/hadoop-azure/README.txt
    hadoop/common/trunk/hadoop-tools/hadoop-azure/pom.xml
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
    hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml

Modified: hadoop/common/trunk/hadoop-tools/hadoop-azure/.gitignore
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/.gitignore?rev=1605187&r1=1605186&r2=1605187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/.gitignore (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/.gitignore Tue Jun 24 20:52:44 2014
@@ -1 +1,2 @@
-.checkstyle
\ No newline at end of file
+.checkstyle
+bin/
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-tools/hadoop-azure/README.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/README.txt?rev=1605187&r1=1605186&r2=1605187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/README.txt (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/README.txt Tue Jun 24 20:52:44 2014
@@ -12,9 +12,13 @@ Unit tests
 =============
 Most of the tests will run without additional configuration.
 For complete testing, configuration in src/test/resources is required:
-  src/test/resources/azure-test.xml
-  src/test/resources/log4j.properties
+  
+  src/test/resources/azure-test.xml -> Defines Azure storage dependencies, including account information 
 
+The other files in src/test/resources do not normally need alteration:
+  log4j.properties -> Test logging setup
+  hadoop-metrics2-azure-file-system.properties -> used to wire up instrumentation for testing
+  
 From command-line
 ------------------
 Basic execution:
@@ -59,6 +63,12 @@ Enable the Azure emulator tests by setti
   fs.azure.test.emulator -> true 
 in src\test\resources\azure-test.xml
 
+Known issues:
+  Symptom: When running tests for emulator, you see the following failure message
+           com.microsoft.windowsazure.storage.StorageException: The value for one of the HTTP headers is not in the correct format.
+  Issue:   The emulator can get into a confused state.  
+  Fix:     Restart the Azure Emulator.  Ensure it is v3.2 or later.
+ 
 Running tests against live Azure storage 
 -------------------------------------------------------------------------
 In order to run WASB unit tests against a live Azure Storage account, add credentials to 
@@ -101,4 +111,8 @@ Eclipse:
 NOTE:
 - After any change to the checkstyle rules xml, use window|preferences|checkstyle|{refresh}|OK
 
- 
\ No newline at end of file
+=============
+Javadoc
+============= 
+Command-line
+> mvn javadoc:javadoc
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-tools/hadoop-azure/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/pom.xml?rev=1605187&r1=1605186&r2=1605187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/pom.xml (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/pom.xml Tue Jun 24 20:52:44 2014
@@ -37,22 +37,6 @@
   </properties>
 
   <build>
-  
-    <testResources>
-      <testResource>
-        <directory>src/test/resources</directory>
-        <includes>
-          <include>log4j.properties</include>
-        </includes>
-      </testResource>
-      <testResource>
-        <directory>src/test/resources</directory>
-        <includes>
-          <include>azure-test.xml</include>
-        </includes>
-      </testResource>
-    </testResources>
-  
     <plugins>
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
@@ -198,5 +182,11 @@
       <type>test-jar</type>
     </dependency>
     
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
   </dependencies>
 </project>

Modified: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java?rev=1605187&r1=1605186&r2=1605187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java Tue Jun 24 20:52:44 2014
@@ -17,7 +17,6 @@
  */
 
 package org.apache.hadoop.fs.azure;
-
 import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
 
 import java.io.BufferedInputStream;
@@ -46,6 +45,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+import org.apache.hadoop.fs.azure.metrics.BandwidthGaugeUpdater;
+import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater;
+import org.apache.hadoop.fs.azure.metrics.ResponseReceivedMetricUpdater;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.mortbay.util.ajax.JSON;
@@ -69,8 +72,15 @@ import com.microsoft.windowsazure.storag
 import com.microsoft.windowsazure.storage.blob.ListBlobItem;
 import com.microsoft.windowsazure.storage.core.Utility;
 
+
+/**
+ * Core implementation of Windows Azure Filesystem for Hadoop.
+ * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage 
+ *
+ */
 @InterfaceAudience.Private
-class AzureNativeFileSystemStore implements NativeFileSystemStore {
+@VisibleForTesting
+public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   
   /**
    * Configuration knob on whether we do block-level MD5 validation on
@@ -169,6 +179,8 @@ class AzureNativeFileSystemStore impleme
   private boolean isAnonymousCredentials = false;
   // Set to true if we are connecting using shared access signatures.
   private boolean connectingUsingSAS = false;
+  private AzureFileSystemInstrumentation instrumentation;
+  private BandwidthGaugeUpdater bandwidthGaugeUpdater;
   private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
 
   private boolean suppressRetryPolicy = false;
@@ -301,6 +313,11 @@ class AzureNativeFileSystemStore impleme
     this.storageInteractionLayer = storageInteractionLayer;
   }
 
+  @VisibleForTesting
+  public BandwidthGaugeUpdater getBandwidthGaugeUpdater() {
+    return bandwidthGaugeUpdater;
+  }
+  
   /**
    * Check if concurrent reads and writes on the same blob are allowed.
    * 
@@ -325,12 +342,18 @@ class AzureNativeFileSystemStore impleme
    *           if URI or job object is null, or invalid scheme.
    */
   @Override
-  public void initialize(URI uri, Configuration conf) throws AzureException {
+  public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws AzureException {
 
     if (null == this.storageInteractionLayer) {
       this.storageInteractionLayer = new StorageInterfaceImpl();
     }
 
+    this.instrumentation = instrumentation;
+    this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation);
+    if (null == this.storageInteractionLayer) {
+      this.storageInteractionLayer = new StorageInterfaceImpl();
+    }
+    
     // Check that URI exists.
     //
     if (null == uri) {
@@ -775,8 +798,10 @@ class AzureNativeFileSystemStore impleme
         throw new AzureException(errMsg);
       }
 
+      instrumentation.setAccountName(accountName);
       String containerName = getContainerFromAuthority(sessionUri);
-
+      instrumentation.setContainerName(containerName);
+      
       // Check whether this is a storage emulator account.
       if (isStorageEmulatorAccount(accountName)) {
         // It is an emulator account, connect to it with no credentials.
@@ -1522,6 +1547,11 @@ class AzureNativeFileSystemStore impleme
           selfThrottlingWriteFactor);
     }
 
+    ResponseReceivedMetricUpdater.hook(
+        operationContext,
+        instrumentation,
+        bandwidthGaugeUpdater);
+    
     // Bind operation context to receive send request callbacks on this
     // operation.
     // If reads concurrent to OOB writes are allowed, the interception will
@@ -1535,6 +1565,8 @@ class AzureNativeFileSystemStore impleme
       operationContext = testHookOperationContext
           .modifyOperationContext(operationContext);
     }
+    
+    ErrorMetricUpdater.hook(operationContext, instrumentation);
 
     // Return the operation context.
     return operationContext;
@@ -2218,5 +2250,14 @@ class AzureNativeFileSystemStore impleme
 
   @Override
   public void close() {
+    bandwidthGaugeUpdater.close();
+  }
+  
+  // Finalizer to ensure complete shutdown
+  @Override
+  protected void finalize() throws Throwable {
+    LOG.debug("finalize() called");
+    close();
+    super.finalize();
   }
 }

Modified: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java?rev=1605187&r1=1605186&r2=1605187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java Tue Jun 24 20:52:44 2014
@@ -31,12 +31,13 @@ import java.util.Date;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BufferedFSInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -45,12 +46,14 @@ import org.apache.hadoop.fs.FSInputStrea
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 
-
 import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.windowsazure.storage.core.Utility;
 
@@ -369,8 +372,12 @@ public class NativeAzureFileSystem exten
   private AzureNativeFileSystemStore actualStore;
   private Path workingDir;
   private long blockSize = MAX_AZURE_BLOCK_SIZE;
+  private AzureFileSystemInstrumentation instrumentation;
   private static boolean suppressRetryPolicy = false;
+  // A counter to create unique (within-process) names for my metrics sources.
+  private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
 
+  
   public NativeAzureFileSystem() {
     // set store in initialize()
   }
@@ -397,6 +404,20 @@ public class NativeAzureFileSystem exten
   }
 
   /**
+   * Creates a new metrics source name that's unique within this process.
+   */
+  @VisibleForTesting
+  public static String newMetricsSourceName() {
+    int number = metricsSourceNameCounter.incrementAndGet();
+    final String baseName = "AzureFileSystemMetrics";
+    if (number == 1) { // No need for a suffix for the first one
+      return baseName;
+    } else {
+      return baseName + number;
+    }
+  }
+  
+  /**
    * Checks if the given URI scheme is a scheme that's affiliated with the Azure
    * File System.
    * 
@@ -459,7 +480,16 @@ public class NativeAzureFileSystem exten
       store = createDefaultStore(conf);
     }
 
-    store.initialize(uri, conf);
+    // Make sure the metrics system is available before interacting with Azure
+    AzureFileSystemMetricsSystem.fileSystemStarted();
+    String sourceName = newMetricsSourceName(),
+        sourceDesc = "Azure Storage Volume File System metrics";
+    instrumentation = DefaultMetricsSystem.instance().register(sourceName,
+        sourceDesc, new AzureFileSystemInstrumentation(conf));
+    AzureFileSystemMetricsSystem.registerSource(sourceName, sourceDesc,
+        instrumentation);
+
+    store.initialize(uri, conf, instrumentation);
     setConf(conf);
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
     this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
@@ -535,9 +565,19 @@ public class NativeAzureFileSystem exten
    * @return The store object.
    */
   @VisibleForTesting
-  AzureNativeFileSystemStore getStore() {
+  public AzureNativeFileSystemStore getStore() {
     return actualStore;
   }
+  
+  /**
+   * Gets the metrics source for this file system.
+   * This is mainly here for unit testing purposes.
+   *
+   * @return the metrics source.
+   */
+  public AzureFileSystemInstrumentation getInstrumentation() {
+    return instrumentation;
+  }
 
   /** This optional operation is not yet supported. */
   @Override
@@ -622,6 +662,10 @@ public class NativeAzureFileSystem exten
     // Construct the data output stream from the buffered output stream.
     FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
 
+    
+    // Increment the counter
+    instrumentation.fileCreated();
+    
     // Return data output stream to caller.
     return fsOut;
   }
@@ -682,6 +726,7 @@ public class NativeAzureFileSystem exten
           store.updateFolderLastModifiedTime(parentKey);
         }
       }
+      instrumentation.fileDeleted();
       store.delete(key);
     } else {
       // The path specifies a folder. Recursively delete all entries under the
@@ -724,6 +769,7 @@ public class NativeAzureFileSystem exten
             p.getKey().lastIndexOf(PATH_DELIMITER));
         if (!p.isDir()) {
           store.delete(key + suffix);
+          instrumentation.fileDeleted();
         } else {
           // Recursively delete contents of the sub-folders. Notice this also
           // deletes the blob for the directory.
@@ -740,6 +786,7 @@ public class NativeAzureFileSystem exten
         String parentKey = pathToKey(parent);
         store.updateFolderLastModifiedTime(parentKey);
       }
+      instrumentation.directoryDeleted();
     }
 
     // File or directory was successfully deleted.
@@ -972,6 +1019,8 @@ public class NativeAzureFileSystem exten
       store.updateFolderLastModifiedTime(key, lastModified);
     }
 
+    instrumentation.directoryCreated();
+    
     // otherwise throws exception
     return true;
   }
@@ -1293,6 +1342,19 @@ public class NativeAzureFileSystem exten
     super.close();
     // Close the store
     store.close();
+    
+    // Notify the metrics system that this file system is closed, which may
+    // trigger one final metrics push to get the accurate final file system
+    // metrics out.
+
+    long startTime = System.currentTimeMillis();
+
+    AzureFileSystemMetricsSystem.fileSystemClosed();
+
+    if (LOG.isDebugEnabled()) {
+        LOG.debug("Submitting metrics when file system closed took "
+                + (System.currentTimeMillis() - startTime) + " ms.");
+    }
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java?rev=1605187&r1=1605186&r2=1605187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java Tue Jun 24 20:52:44 2014
@@ -26,6 +26,7 @@ import java.util.Date;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -38,7 +39,7 @@ import com.google.common.annotations.Vis
 @InterfaceAudience.Private
 interface NativeFileSystemStore {
 
-  void initialize(URI uri, Configuration conf) throws IOException;
+  void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws IOException;
 
   void storeEmptyFolder(String key, PermissionStatus permissionStatus)
       throws AzureException;

Added: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java?rev=1605187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java Tue Jun 24 20:52:44 2014
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+/**
+ * A metrics source for the WASB file system to track all the metrics we care
+ * about for getting a clear picture of the performance/reliability/interaction
+ * of the Hadoop cluster with Azure Storage.
+ */
+@Metrics(about="Metrics for WASB", context="azureFileSystem")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class AzureFileSystemInstrumentation implements MetricsSource {
+  
+  public static final String METRIC_TAG_FILESYSTEM_ID = "wasbFileSystemId";
+  public static final String METRIC_TAG_ACCOUNT_NAME = "accountName";
+  public static final String METRIC_TAG_CONTAINTER_NAME = "containerName";
+  
+  public static final String WASB_WEB_RESPONSES = "wasb_web_responses";
+  public static final String WASB_BYTES_WRITTEN =
+      "wasb_bytes_written_last_second";
+  public static final String WASB_BYTES_READ =
+      "wasb_bytes_read_last_second";
+  public static final String WASB_RAW_BYTES_UPLOADED =
+      "wasb_raw_bytes_uploaded";
+  public static final String WASB_RAW_BYTES_DOWNLOADED =
+      "wasb_raw_bytes_downloaded";
+  public static final String WASB_FILES_CREATED = "wasb_files_created";
+  public static final String WASB_FILES_DELETED = "wasb_files_deleted";
+  public static final String WASB_DIRECTORIES_CREATED = "wasb_directories_created";
+  public static final String WASB_DIRECTORIES_DELETED = "wasb_directories_deleted";
+  public static final String WASB_UPLOAD_RATE =
+      "wasb_maximum_upload_bytes_per_second";
+  public static final String WASB_DOWNLOAD_RATE =
+      "wasb_maximum_download_bytes_per_second";
+  public static final String WASB_UPLOAD_LATENCY =
+      "wasb_average_block_upload_latency_ms";
+  public static final String WASB_DOWNLOAD_LATENCY =
+      "wasb_average_block_download_latency_ms";
+  public static final String WASB_CLIENT_ERRORS = "wasb_client_errors";
+  public static final String WASB_SERVER_ERRORS = "wasb_server_errors";
+
+  /**
+   * Config key for how big the rolling window size for latency metrics should
+   * be (in seconds).
+   */
+  private static final String KEY_ROLLING_WINDOW_SIZE = "fs.azure.metrics.rolling.window.size";
+
+  private final MetricsRegistry registry =
+      new MetricsRegistry("azureFileSystem")
+      .setContext("azureFileSystem");
+  private final MutableCounterLong numberOfWebResponses =
+      registry.newCounter(
+          WASB_WEB_RESPONSES,
+          "Total number of web responses obtained from Azure Storage",
+          0L);
+  private AtomicLong inMemoryNumberOfWebResponses = new AtomicLong(0);
+  private final MutableCounterLong numberOfFilesCreated =
+      registry.newCounter(
+          WASB_FILES_CREATED,
+          "Total number of files created through the WASB file system.",
+          0L);
+  private final MutableCounterLong numberOfFilesDeleted =
+      registry.newCounter(
+          WASB_FILES_DELETED,
+          "Total number of files deleted through the WASB file system.",
+          0L);
+  private final MutableCounterLong numberOfDirectoriesCreated =
+      registry.newCounter(
+          WASB_DIRECTORIES_CREATED,
+          "Total number of directories created through the WASB file system.",
+          0L);
+  private final MutableCounterLong numberOfDirectoriesDeleted =
+      registry.newCounter(
+          WASB_DIRECTORIES_DELETED,
+          "Total number of directories deleted through the WASB file system.",
+          0L);
+  private final MutableGaugeLong bytesWrittenInLastSecond =
+      registry.newGauge(
+          WASB_BYTES_WRITTEN,
+          "Total number of bytes written to Azure Storage during the last second.",
+          0L);
+  private final MutableGaugeLong bytesReadInLastSecond =
+      registry.newGauge(
+          WASB_BYTES_READ,
+          "Total number of bytes read from Azure Storage during the last second.",
+          0L);
+  private final MutableGaugeLong maximumUploadBytesPerSecond =
+      registry.newGauge(
+          WASB_UPLOAD_RATE,
+          "The maximum upload rate encountered to Azure Storage in bytes/second.",
+          0L);
+  private final MutableGaugeLong maximumDownloadBytesPerSecond =
+      registry.newGauge(
+          WASB_DOWNLOAD_RATE,
+          "The maximum download rate encountered to Azure Storage in bytes/second.",
+          0L);
+  private final MutableCounterLong rawBytesUploaded =
+      registry.newCounter(
+          WASB_RAW_BYTES_UPLOADED,
+          "Total number of raw bytes (including overhead) uploaded to Azure" 
+          + " Storage.",
+          0L);
+  private final MutableCounterLong rawBytesDownloaded =
+      registry.newCounter(
+          WASB_RAW_BYTES_DOWNLOADED,
+          "Total number of raw bytes (including overhead) downloaded from Azure" 
+          + " Storage.",
+          0L);
+  private final MutableCounterLong clientErrors =
+      registry.newCounter(
+          WASB_CLIENT_ERRORS,
+          "Total number of client-side errors by WASB (excluding 404).",
+          0L);
+  private final MutableCounterLong serverErrors =
+      registry.newCounter(
+          WASB_SERVER_ERRORS,
+          "Total number of server-caused errors by WASB.",
+          0L);
+  private final MutableGaugeLong averageBlockUploadLatencyMs;
+  private final MutableGaugeLong averageBlockDownloadLatencyMs;
+  private long currentMaximumUploadBytesPerSecond;
+  private long currentMaximumDownloadBytesPerSecond;
+  private static final int DEFAULT_LATENCY_ROLLING_AVERAGE_WINDOW =
+      5; // seconds
+  private final RollingWindowAverage currentBlockUploadLatency;
+  private final RollingWindowAverage currentBlockDownloadLatency;
+  private UUID fileSystemInstanceId;
+
+  public AzureFileSystemInstrumentation(Configuration conf) {
+    fileSystemInstanceId = UUID.randomUUID();
+    registry.tag("wasbFileSystemId",
+        "A unique identifier for the file ",
+        fileSystemInstanceId.toString());
+    final int rollingWindowSizeInSeconds =
+        conf.getInt(KEY_ROLLING_WINDOW_SIZE,
+            DEFAULT_LATENCY_ROLLING_AVERAGE_WINDOW);
+    averageBlockUploadLatencyMs =
+        registry.newGauge(
+            WASB_UPLOAD_LATENCY,
+            String.format("The average latency in milliseconds of uploading a single block" 
+            + ". The average latency is calculated over a %d-second rolling" 
+            + " window.", rollingWindowSizeInSeconds),
+            0L);
+    averageBlockDownloadLatencyMs =
+        registry.newGauge(
+            WASB_DOWNLOAD_LATENCY,
+            String.format("The average latency in milliseconds of downloading a single block" 
+            + ". The average latency is calculated over a %d-second rolling" 
+            + " window.", rollingWindowSizeInSeconds),
+            0L);
+    currentBlockUploadLatency =
+        new RollingWindowAverage(rollingWindowSizeInSeconds * 1000);
+    currentBlockDownloadLatency =
+        new RollingWindowAverage(rollingWindowSizeInSeconds * 1000);
+  }
+
+  /**
+   * The unique identifier for this file system in the metrics.
+   */
+  public UUID getFileSystemInstanceId() {
+    return fileSystemInstanceId;
+  }
+  
+  /**
+   * Get the metrics registry information.
+   */
+  public MetricsInfo getMetricsRegistryInfo() {
+    return registry.info();
+  }
+
+  /**
+   * Sets the account name to tag all the metrics with.
+   * @param accountName The account name.
+   */
+  public void setAccountName(String accountName) {
+    registry.tag("accountName",
+        "Name of the Azure Storage account that these metrics are going against",
+        accountName);
+  }
+
+  /**
+   * Sets the container name to tag all the metrics with.
+   * @param containerName The container name.
+   */
+  public void setContainerName(String containerName) {
+    registry.tag("containerName",
+        "Name of the Azure Storage container that these metrics are going against",
+        containerName);
+  }
+
+  /**
+   * Indicate that we just got a web response from Azure Storage. This should
+   * be called for every web request/response we do (to get accurate metrics
+   * of how we're hitting the storage service).
+   */
+  public void webResponse() {
+    numberOfWebResponses.incr();
+    inMemoryNumberOfWebResponses.incrementAndGet();
+  }
+
+  /**
+   * Gets the current number of web responses obtained from Azure Storage.
+   * @return The number of web responses.
+   */
+  public long getCurrentWebResponses() {
+    return inMemoryNumberOfWebResponses.get();
+  }
+
+  /**
+   * Indicate that we just created a file through WASB.
+   */
+  public void fileCreated() {
+    numberOfFilesCreated.incr();
+  }
+
+  /**
+   * Indicate that we just deleted a file through WASB.
+   */
+  public void fileDeleted() {
+    numberOfFilesDeleted.incr();
+  }
+
+  /**
+   * Indicate that we just created a directory through WASB.
+   */
+  public void directoryCreated() {
+    numberOfDirectoriesCreated.incr();
+  }
+
+  /**
+   * Indicate that we just deleted a directory through WASB.
+   */
+  public void directoryDeleted() {
+    numberOfDirectoriesDeleted.incr();
+  }
+
+  /**
+   * Sets the current gauge value for how many bytes were written in the last
+   *  second.
+   * @param currentBytesWritten The number of bytes.
+   */
+  public void updateBytesWrittenInLastSecond(long currentBytesWritten) {
+    bytesWrittenInLastSecond.set(currentBytesWritten);
+  }
+
+  /**
+   * Sets the current gauge value for how many bytes were read in the last
+   *  second.
+   * @param currentBytesRead The number of bytes.
+   */
+  public void updateBytesReadInLastSecond(long currentBytesRead) {
+    bytesReadInLastSecond.set(currentBytesRead);
+  }
+
+  /**
+   * Record the current bytes-per-second upload rate seen.
+   * @param bytesPerSecond The bytes per second.
+   */
+  public synchronized void currentUploadBytesPerSecond(long bytesPerSecond) {
+    if (bytesPerSecond > currentMaximumUploadBytesPerSecond) {
+      currentMaximumUploadBytesPerSecond = bytesPerSecond;
+      maximumUploadBytesPerSecond.set(bytesPerSecond);
+    }
+  }
+
+  /**
+   * Record the current bytes-per-second download rate seen.
+   * @param bytesPerSecond The bytes per second.
+   */
+  public synchronized void currentDownloadBytesPerSecond(long bytesPerSecond) {
+    if (bytesPerSecond > currentMaximumDownloadBytesPerSecond) {
+      currentMaximumDownloadBytesPerSecond = bytesPerSecond;
+      maximumDownloadBytesPerSecond.set(bytesPerSecond);
+    }
+  }
+
+  /**
+   * Indicate that we just uploaded some data to Azure storage.
+   * @param numberOfBytes The raw number of bytes uploaded (including overhead).
+   */
+  public void rawBytesUploaded(long numberOfBytes) {
+    rawBytesUploaded.incr(numberOfBytes);
+  }
+
+  /**
+   * Indicate that we just downloaded some data to Azure storage.
+   * @param numberOfBytes The raw number of bytes downloaded (including overhead).
+   */
+  public void rawBytesDownloaded(long numberOfBytes) {
+    rawBytesDownloaded.incr(numberOfBytes);
+  }
+
+  /**
+   * Indicate that we just uploaded a block and record its latency.
+   * @param latency The latency in milliseconds.
+   */
+  public void blockUploaded(long latency) {
+    currentBlockUploadLatency.addPoint(latency);
+  }
+
+  /**
+   * Indicate that we just downloaded a block and record its latency.
+   * @param latency The latency in milliseconds.
+   */
+  public void blockDownloaded(long latency) {
+    currentBlockDownloadLatency.addPoint(latency);
+  }
+
+  /**
+   * Indicate that we just encountered a client-side error.
+   */
+  public void clientErrorEncountered() {
+    clientErrors.incr();
+  }
+
+  /**
+   * Indicate that we just encountered a server-caused error.
+   */
+  public void serverErrorEncountered() {
+    serverErrors.incr();
+  }
+
+  /**
+   * Get the current rolling average of the upload latency.
+   * @return rolling average of upload latency in milliseconds.
+   */
+  public long getBlockUploadLatency() {
+    return currentBlockUploadLatency.getCurrentAverage();
+  }
+
+  /**
+   * Get the current rolling average of the download latency.
+   * @return rolling average of download latency in milliseconds.
+   */
+  public long getBlockDownloadLatency() {
+    return currentBlockDownloadLatency.getCurrentAverage();
+  }
+
+  /**
+   * Get the current maximum upload bandwidth.
+   * @return maximum upload bandwidth in bytes per second.
+   */
+  public long getCurrentMaximumUploadBandwidth() {
+    return currentMaximumUploadBytesPerSecond;
+  }
+
+  /**
+   * Get the current maximum download bandwidth.
+   * @return maximum download bandwidth in bytes per second.
+   */
+  public long getCurrentMaximumDownloadBandwidth() {
+    return currentMaximumDownloadBytesPerSecond;
+
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector builder, boolean all) {
+    averageBlockDownloadLatencyMs.set(
+        currentBlockDownloadLatency.getCurrentAverage());
+    averageBlockUploadLatencyMs.set(
+        currentBlockUploadLatency.getCurrentAverage());
+    registry.snapshot(builder.addRecord(registry.info().name()), true);
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java?rev=1605187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java Tue Jun 24 20:52:44 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+
+/**
+ * AzureFileSystemMetricsSystem
+ */
+@InterfaceAudience.Private
+public final class AzureFileSystemMetricsSystem {
+  private static MetricsSystemImpl instance;
+  private static int numFileSystems;
+  
+  //private ctor
+  private AzureFileSystemMetricsSystem(){
+  
+  }
+  
+  public static synchronized void fileSystemStarted() {
+    if (numFileSystems == 0) {
+      instance = new MetricsSystemImpl();
+      instance.init("azure-file-system");
+    }
+    numFileSystems++;
+  }
+  
+  public static synchronized void fileSystemClosed() {
+    if (instance != null) {
+      instance.publishMetricsNow();
+    }
+    if (numFileSystems == 1) {
+      instance.stop();
+      instance.shutdown();
+      instance = null;
+    }
+    numFileSystems--;
+  }
+  
+  public static void registerSource(String name, String desc,
+      MetricsSource source) {
+    // Register the source with the name appended with -WasbSystem
+    // so that the name is globally unique.
+    instance.register(name + "-WasbSystem", desc, source);
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java?rev=1605187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java Tue Jun 24 20:52:44 2014
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import java.util.ArrayList;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Internal implementation class to help calculate the current bytes
+ * uploaded/downloaded and the maximum bandwidth gauges.
+ */
+@InterfaceAudience.Private
+public final class BandwidthGaugeUpdater {
+  public static final Log LOG = LogFactory
+      .getLog(BandwidthGaugeUpdater.class);
+  
+  public static final String THREAD_NAME = "AzureNativeFilesystemStore-UploadBandwidthUpdater";
+  
+  private static final int DEFAULT_WINDOW_SIZE_MS = 1000;
+  private static final int PROCESS_QUEUE_INITIAL_CAPACITY = 1000;
+  private int windowSizeMs;
+  private ArrayList<BlockTransferWindow> allBlocksWritten =
+      createNewToProcessQueue();
+  private ArrayList<BlockTransferWindow> allBlocksRead =
+      createNewToProcessQueue();
+  private final Object blocksWrittenLock = new Object();
+  private final Object blocksReadLock = new Object();
+  private final AzureFileSystemInstrumentation instrumentation;
+  private Thread uploadBandwidthUpdater;
+  private volatile boolean suppressAutoUpdate = false;
+
+  /**
+   * Create a new updater object with default values.
+   * @param instrumentation The metrics source to update.
+   */
+  public BandwidthGaugeUpdater(AzureFileSystemInstrumentation instrumentation) {
+    this(instrumentation, DEFAULT_WINDOW_SIZE_MS, false);
+  }
+
+  /**
+   * Create a new updater object with some overrides (used in unit tests).
+   * @param instrumentation The metrics source to update.
+   * @param windowSizeMs The window size to use for calculating bandwidth
+   *                    (in milliseconds).
+   * @param manualUpdateTrigger If true, then this object won't create the
+   *                            auto-update threads, and will wait for manual
+   *                            calls to triggerUpdate to occur.
+   */
+  public BandwidthGaugeUpdater(AzureFileSystemInstrumentation instrumentation,
+      int windowSizeMs, boolean manualUpdateTrigger) {
+    this.windowSizeMs = windowSizeMs;
+    this.instrumentation = instrumentation;
+    if (!manualUpdateTrigger) {
+      uploadBandwidthUpdater = new Thread(new UploadBandwidthUpdater(), THREAD_NAME);
+      uploadBandwidthUpdater.setDaemon(true);
+      uploadBandwidthUpdater.start();
+    }
+  }
+
+  /**
+   * Indicate that a block has been uploaded.
+   * @param startDate The exact time the upload started.
+   * @param endDate The exact time the upload ended.
+   * @param length The number of bytes uploaded in the block.
+   */
+  public void blockUploaded(Date startDate, Date endDate, long length) {
+    synchronized (blocksWrittenLock) {
+      allBlocksWritten.add(new BlockTransferWindow(startDate, endDate, length));
+    }
+  }
+
+  /**
+   * Indicate that a block has been downloaded.
+   * @param startDate The exact time the download started.
+   * @param endDate The exact time the download ended.
+   * @param length The number of bytes downloaded in the block.
+   */
+  public void blockDownloaded(Date startDate, Date endDate, long length) {
+    synchronized (blocksReadLock) {
+      allBlocksRead.add(new BlockTransferWindow(startDate, endDate, length));
+    }
+  }
+
+  /**
+   * Creates a new ArrayList to hold incoming block transfer notifications
+   * before they're processed.
+   * @return The newly created ArrayList.
+   */
+  private static ArrayList<BlockTransferWindow> createNewToProcessQueue() {
+    return new ArrayList<BlockTransferWindow>(PROCESS_QUEUE_INITIAL_CAPACITY);
+  }
+
+  /**
+   * Update the metrics source gauge for how many bytes were transferred
+   * during the last time window.
+   * @param updateWrite If true, update the write (upload) counter.
+   *                    Otherwise update the read (download) counter.
+   * @param bytes The number of bytes transferred.
+   */
+  private void updateBytesTransferred(boolean updateWrite, long bytes) {
+    if (updateWrite) {
+      instrumentation.updateBytesWrittenInLastSecond(bytes);
+    }
+    else {
+      instrumentation.updateBytesReadInLastSecond(bytes);
+    }
+  }
+
+  /**
+   * Update the metrics source gauge for what the current transfer rate
+   * is.
+   * @param updateWrite If true, update the write (upload) counter.
+   *                    Otherwise update the read (download) counter.
+   * @param bytesPerSecond The number of bytes per second we're seeing.
+   */
+  private void updateBytesTransferRate(boolean updateWrite, long bytesPerSecond) {
+    if (updateWrite) {
+      instrumentation.currentUploadBytesPerSecond(bytesPerSecond);
+    }
+    else {
+      instrumentation.currentDownloadBytesPerSecond(bytesPerSecond);
+    }
+  }
+
+  /**
+   * For unit test purposes, suppresses auto-update of the metrics
+   * from the dedicated thread.
+   */
+  public void suppressAutoUpdate() {
+    suppressAutoUpdate = true;
+  }
+
+  /**
+   * Resumes auto-update (undo suppressAutoUpdate).
+   */
+  public void resumeAutoUpdate() {
+    suppressAutoUpdate = false;
+  }
+
+  /**
+   * Triggers the update of the metrics gauge based on all the blocks
+   * uploaded/downloaded so far. This is typically done periodically in a
+   * dedicated update thread, but exposing as public for unit test purposes.
+   * 
+   * @param updateWrite If true, we'll update the write (upload) metrics.
+   *                    Otherwise we'll update the read (download) ones.
+   */
+  public void triggerUpdate(boolean updateWrite) {
+    ArrayList<BlockTransferWindow> toProcess = null;
+    synchronized (updateWrite ? blocksWrittenLock : blocksReadLock) {
+      if (updateWrite && !allBlocksWritten.isEmpty()) {
+        toProcess = allBlocksWritten;
+        allBlocksWritten = createNewToProcessQueue();
+      } else if (!updateWrite && !allBlocksRead.isEmpty()) {
+        toProcess = allBlocksRead;
+        allBlocksRead = createNewToProcessQueue();        
+      }
+    }
+
+    // Check to see if we have any blocks to process.
+    if (toProcess == null) {
+      // Nothing to process, set the current bytes and rate to zero.
+      updateBytesTransferred(updateWrite, 0);
+      updateBytesTransferRate(updateWrite, 0);
+      return;
+    }
+
+    // The cut-off time for when we want to calculate rates is one
+    // window size ago from now.
+    long cutoffTime = new Date().getTime() - windowSizeMs;
+
+    // Go through all the blocks we're processing, and calculate the
+    // total number of bytes processed as well as the maximum transfer
+    // rate we experienced for any single block during our time window.
+    long maxSingleBlockTransferRate = 0;
+    long bytesInLastSecond = 0;
+    for (BlockTransferWindow currentWindow : toProcess) {
+      long windowDuration = currentWindow.getEndDate().getTime() 
+          - currentWindow.getStartDate().getTime();
+      if (windowDuration == 0) {
+        // Edge case, assume it took 1 ms but we were too fast
+        windowDuration = 1;
+      }
+      if (currentWindow.getStartDate().getTime() > cutoffTime) {
+        // This block was transferred fully within our time window,
+        // just add its bytes to the total.
+        bytesInLastSecond += currentWindow.bytesTransferred;
+      } else if (currentWindow.getEndDate().getTime() > cutoffTime) {
+        // This block started its transfer before our time window,
+        // interpolate to estimate how many bytes from that block
+        // were actually transferred during our time window.
+        long adjustedBytes = (currentWindow.getBytesTransferred() 
+            * (currentWindow.getEndDate().getTime() - cutoffTime)) 
+            / windowDuration;
+        bytesInLastSecond += adjustedBytes;
+      }
+      // Calculate the transfer rate for this block.
+      long currentBlockTransferRate =
+          (currentWindow.getBytesTransferred() * 1000) / windowDuration;
+      maxSingleBlockTransferRate =
+          Math.max(maxSingleBlockTransferRate, currentBlockTransferRate);
+    }
+    updateBytesTransferred(updateWrite, bytesInLastSecond);
+    // The transfer rate we saw in the last second is a tricky concept to
+    // define: If we saw two blocks, one 2 MB block transferred in 0.2 seconds,
+    // and one 4 MB block transferred in 0.2 seconds, then the maximum rate
+    // is 20 MB/s (the 4 MB block), the average of the two blocks is 15 MB/s,
+    // and the aggregate rate is 6 MB/s (total of 6 MB transferred in one
+    // second). As a first cut, I'm taking the definition to be the maximum
+    // of aggregate or of any single block's rate (so in the example case it's
+    // 6 MB/s).
+    long aggregateTransferRate = bytesInLastSecond;
+    long maxObservedTransferRate =
+        Math.max(aggregateTransferRate, maxSingleBlockTransferRate);
+    updateBytesTransferRate(updateWrite, maxObservedTransferRate);
+  }
+
+  /**
+   * A single block transfer.
+   */
+  private static final class BlockTransferWindow {
+    private final Date startDate;
+    private final Date endDate;
+    private final long bytesTransferred;
+
+    public BlockTransferWindow(Date startDate, Date endDate,
+        long bytesTransferred) {
+      this.startDate = startDate;
+      this.endDate = endDate;
+      this.bytesTransferred = bytesTransferred;
+    }
+
+    public Date getStartDate() { return startDate; }
+    public Date getEndDate() { return endDate; }
+    public long getBytesTransferred() { return bytesTransferred; }
+  }
+
+  /**
+   * The auto-update thread.
+   */
+  private final class UploadBandwidthUpdater implements Runnable {
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          Thread.sleep(windowSizeMs);
+          if (!suppressAutoUpdate) {
+            triggerUpdate(true);
+            triggerUpdate(false);
+          }
+        }
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  public void close() {
+    if (uploadBandwidthUpdater != null) {
+      // Interrupt and join the updater thread in death.
+      uploadBandwidthUpdater.interrupt();
+      try {
+        uploadBandwidthUpdater.join();
+      } catch (InterruptedException e) {
+      }
+      uploadBandwidthUpdater = null;
+    }
+  }
+
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java?rev=1605187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java Tue Jun 24 20:52:44 2014
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND; //404
+import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;  //400
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; //500
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.RequestResult;
+import com.microsoft.windowsazure.storage.ResponseReceivedEvent;
+import com.microsoft.windowsazure.storage.StorageEvent;
+
+
+/**
+ * An event listener to the ResponseReceived event from Azure Storage that will
+ * update error metrics appropriately when it gets that event.
+ */
+@InterfaceAudience.Private
+public final class ErrorMetricUpdater extends StorageEvent<ResponseReceivedEvent> {
+  private final AzureFileSystemInstrumentation instrumentation;
+  private final OperationContext operationContext;
+
+  private ErrorMetricUpdater(OperationContext operationContext,
+      AzureFileSystemInstrumentation instrumentation) {
+    this.instrumentation = instrumentation;
+    this.operationContext = operationContext;
+  }
+
+  /**
+   * Hooks a new listener to the given operationContext that will update the
+   * error metrics for the WASB file system appropriately in response to
+   * ResponseReceived events.
+   *
+   * @param operationContext The operationContext to hook.
+   * @param instrumentation The metrics source to update.
+   */
+  public static void hook(
+      OperationContext operationContext,
+      AzureFileSystemInstrumentation instrumentation) {
+    ErrorMetricUpdater listener =
+        new ErrorMetricUpdater(operationContext,
+            instrumentation);
+    operationContext.getResponseReceivedEventHandler().addListener(listener);
+  }
+
+  @Override
+  public void eventOccurred(ResponseReceivedEvent eventArg) {
+    RequestResult currentResult = operationContext.getLastResult();
+    int statusCode = currentResult.getStatusCode();
+    // Check if it's a client-side error: a 4xx status
+    // We exclude 404 because it happens frequently during the normal
+    // course of operation (each call to exists() would generate that
+    // if it's not found).
+    if (statusCode >= HTTP_BAD_REQUEST && statusCode < HTTP_INTERNAL_ERROR 
+        && statusCode != HTTP_NOT_FOUND) {
+      instrumentation.clientErrorEncountered();
+    } else if (statusCode >= HTTP_INTERNAL_ERROR) {
+      // It's a server error: a 5xx status. Could be an Azure Storage
+      // bug or (more likely) throttling.
+      instrumentation.serverErrorEncountered();
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java?rev=1605187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java Tue Jun 24 20:52:44 2014
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import java.net.HttpURLConnection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.microsoft.windowsazure.storage.Constants.HeaderConstants;
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.RequestResult;
+import com.microsoft.windowsazure.storage.ResponseReceivedEvent;
+import com.microsoft.windowsazure.storage.StorageEvent;
+
+
+/**
+ * An event listener to the ResponseReceived event from Azure Storage that will
+ * update metrics appropriately.
+ *
+ */
+@InterfaceAudience.Private
+public final class ResponseReceivedMetricUpdater extends StorageEvent<ResponseReceivedEvent> {
+
+  public static final Log LOG = LogFactory.getLog(ResponseReceivedMetricUpdater.class);
+
+  private final AzureFileSystemInstrumentation instrumentation;
+  private final BandwidthGaugeUpdater blockUploadGaugeUpdater;
+  
+  private ResponseReceivedMetricUpdater(OperationContext operationContext,
+      AzureFileSystemInstrumentation instrumentation,
+      BandwidthGaugeUpdater blockUploadGaugeUpdater) {
+    this.instrumentation = instrumentation;
+    this.blockUploadGaugeUpdater = blockUploadGaugeUpdater;
+  }
+
+  /**
+   * Hooks a new listener to the given operationContext that will update the
+   * metrics for the WASB file system appropriately in response to
+   * ResponseReceived events.
+   *
+   * @param operationContext The operationContext to hook.
+   * @param instrumentation The metrics source to update.
+   * @param blockUploadGaugeUpdater The blockUploadGaugeUpdater to use.
+   */
+  public static void hook(
+      OperationContext operationContext,
+      AzureFileSystemInstrumentation instrumentation,
+      BandwidthGaugeUpdater blockUploadGaugeUpdater) {
+    ResponseReceivedMetricUpdater listener =
+        new ResponseReceivedMetricUpdater(operationContext,
+            instrumentation, blockUploadGaugeUpdater);
+    operationContext.getResponseReceivedEventHandler().addListener(listener);
+  }
+
+  /**
+   * Get the content length of the request in the given HTTP connection.
+   * @param connection The connection.
+   * @return The content length, or zero if not found.
+   */
+  private long getRequestContentLength(HttpURLConnection connection) {
+    String lengthString = connection.getRequestProperty(
+        HeaderConstants.CONTENT_LENGTH);
+    if (lengthString != null){
+      return Long.parseLong(lengthString);
+    }
+    else{
+      return 0;
+    }
+  }
+
+  /**
+   * Gets the content length of the response in the given HTTP connection.
+   * @param connection The connection.
+   * @return The content length.
+   */
+  private long getResponseContentLength(HttpURLConnection connection) {
+    return connection.getContentLength();
+  }
+
+  /**
+   * Handle the response-received event from Azure SDK.
+   */
+  @Override
+  public void eventOccurred(ResponseReceivedEvent eventArg) {
+    instrumentation.webResponse();
+    if (!(eventArg.getConnectionObject() instanceof HttpURLConnection)) {
+      // Typically this shouldn't happen, but just let it pass
+      return;
+    }
+    HttpURLConnection connection =
+        (HttpURLConnection) eventArg.getConnectionObject();
+    RequestResult currentResult = eventArg.getRequestResult();
+    if (currentResult == null) {
+      // Again, typically shouldn't happen, but let it pass
+      return;
+    }
+
+    long requestLatency = currentResult.getStopDate().getTime() 
+        - currentResult.getStartDate().getTime();
+
+    if (currentResult.getStatusCode() == HttpURLConnection.HTTP_CREATED 
+        && connection.getRequestMethod().equalsIgnoreCase("PUT")) {
+      // If it's a PUT with an HTTP_CREATED status then it's a successful
+      // block upload.
+      long length = getRequestContentLength(connection);
+      if (length > 0) {
+        blockUploadGaugeUpdater.blockUploaded(
+            currentResult.getStartDate(),
+            currentResult.getStopDate(),
+            length);
+        instrumentation.rawBytesUploaded(length);
+        instrumentation.blockUploaded(requestLatency);
+      }
+    } else if (currentResult.getStatusCode() == HttpURLConnection.HTTP_PARTIAL 
+        && connection.getRequestMethod().equalsIgnoreCase("GET")) {
+      // If it's a GET with an HTTP_PARTIAL status then it's a successful
+      // block download.
+      long length = getResponseContentLength(connection);
+      if (length > 0) {
+        blockUploadGaugeUpdater.blockDownloaded(
+            currentResult.getStartDate(),
+            currentResult.getStopDate(),
+            length);
+        instrumentation.rawBytesDownloaded(length);
+        instrumentation.blockDownloaded(requestLatency);
+      }
+    } 
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java?rev=1605187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java Tue Jun 24 20:52:44 2014
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import java.util.ArrayDeque;
+import java.util.Date;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Helper class to calculate rolling-window averages.
+ * Used to calculate rolling-window metrics in AzureNativeFileSystem.
+ */
+@InterfaceAudience.Private
+final class RollingWindowAverage {
+  private final ArrayDeque<DataPoint> currentPoints =
+      new ArrayDeque<DataPoint>();
+  private final long windowSizeMs;
+
+  /**
+   * Create a new rolling-window average for the given window size.
+   * @param windowSizeMs The size of the window in milliseconds.
+   */
+  public RollingWindowAverage(long windowSizeMs) {
+    this.windowSizeMs = windowSizeMs;
+  }
+
+  /**
+   * Add a new data point that just happened.
+   * @param value The value of the data point.
+   */
+  public synchronized void addPoint(long value) {
+    currentPoints.offer(new DataPoint(new Date(), value));
+    cleanupOldPoints();
+  }
+
+  /**
+   * Get the current average.
+   * @return The current average.
+   */
+  public synchronized long getCurrentAverage() {
+    cleanupOldPoints();
+    if (currentPoints.isEmpty()) {
+      return 0;
+    }
+    long sum = 0;
+    for (DataPoint current : currentPoints) {
+      sum += current.getValue();
+    }
+    return sum / currentPoints.size();
+  }
+
+  /**
+   * Clean up points that don't count any more (are before our
+   * rolling window) from our current queue of points.
+   */
+  private void cleanupOldPoints() {
+    Date cutoffTime = new Date(new Date().getTime() - windowSizeMs);
+    while (!currentPoints.isEmpty() 
+        && currentPoints.peekFirst().getEventTime().before(cutoffTime)) {
+      currentPoints.removeFirst();
+    }
+  }
+
+  /**
+   * A single data point.
+   */
+  private static class DataPoint {
+    private final Date eventTime;
+    private final long value;
+
+    public DataPoint(Date eventTime, long value) {
+      this.eventTime = eventTime;
+      this.value = value;
+    }
+
+    public Date getEventTime() {
+      return eventTime;
+    }
+
+    public long getValue() {
+      return value;
+    }
+
+    
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html?rev=1605187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html Tue Jun 24 20:52:44 2014
@@ -0,0 +1,28 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+
+<p>
+Infrastructure for a Metrics2 source that provides information on Windows 
+Azure Filesystem for Hadoop instances. 
+</p>
+
+</body>
+</html>

Modified: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java?rev=1605187&r1=1605186&r2=1605187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java Tue Jun 24 20:52:44 2014
@@ -27,9 +27,18 @@ import java.util.Date;
 import java.util.EnumSet;
 import java.util.GregorianCalendar;
 import java.util.TimeZone;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 
 import com.microsoft.windowsazure.storage.AccessCondition;
 import com.microsoft.windowsazure.storage.CloudStorageAccount;
@@ -76,7 +85,10 @@ public final class AzureBlobStorageTestA
   private NativeAzureFileSystem fs;
   private AzureNativeFileSystemStore storage;
   private MockStorageInterface mockStorage;
-
+  private static final ConcurrentLinkedQueue<MetricsRecord> allMetrics =
+      new ConcurrentLinkedQueue<MetricsRecord>();
+  
+  
   private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
       CloudStorageAccount account, CloudBlobContainer container) {
     this.account = account;
@@ -124,6 +136,10 @@ public final class AzureBlobStorageTestA
     this.fs = fs;
     this.mockStorage = mockStorage;
   }
+  
+  private static void addRecord(MetricsRecord record) {
+    allMetrics.add(record);
+  }
 
   public static String getMockContainerUri() {
     return String.format("http://%s/%s",
@@ -141,6 +157,47 @@ public final class AzureBlobStorageTestA
     // Remove the first SEPARATOR
     return toMockUri(path.toUri().getRawPath().substring(1)); 
   }
+  
+  public Number getLatestMetricValue(String metricName, Number defaultValue)
+      throws IndexOutOfBoundsException{
+    boolean found = false;
+    Number ret = null;
+    for (MetricsRecord currentRecord : allMetrics) {
+      // First check if this record is coming for my file system.
+      if (wasGeneratedByMe(currentRecord)) {
+        for (AbstractMetric currentMetric : currentRecord.metrics()) {
+          if (currentMetric.name().equalsIgnoreCase(metricName)) {
+            found = true;
+            ret = currentMetric.value();
+            break;
+          }
+        }
+      }
+    }
+    if (!found) {
+      if (defaultValue != null) {
+        return defaultValue;
+      }
+      throw new IndexOutOfBoundsException(metricName);
+    }
+    return ret;
+  }
+
+  /**
+   * Checks if the given record was generated by my WASB file system instance.
+   * @param currentRecord The metrics record to check.
+   * @return
+   */
+  private boolean wasGeneratedByMe(MetricsRecord currentRecord) {
+    String myFsId = fs.getInstrumentation().getFileSystemInstanceId().toString();
+    for (MetricsTag currentTag : currentRecord.tags()) {
+      if (currentTag.name().equalsIgnoreCase("wasbFileSystemId")) {
+        return currentTag.value().equals(myFsId);
+      }
+    }
+    return false;
+  }
+
 
   /**
    * Gets the blob reference to the given blob key.
@@ -236,7 +293,6 @@ public final class AzureBlobStorageTestA
 
   public static AzureBlobStorageTestAccount createOutOfBandStore(
       int uploadBlockSize, int downloadBlockSize) throws Exception {
-
     CloudBlobContainer container = null;
     Configuration conf = createTestConfiguration();
     CloudStorageAccount account = createTestAccount(conf);
@@ -262,11 +318,25 @@ public final class AzureBlobStorageTestA
     // Set account URI and initialize Azure file system.
     URI accountUri = createAccountUri(accountName, containerName);
 
+    // Set up instrumentation.
+    //
+    AzureFileSystemMetricsSystem.fileSystemStarted();
+    String sourceName = NativeAzureFileSystem.newMetricsSourceName();
+    String sourceDesc = "Azure Storage Volume File System metrics";
+
+    AzureFileSystemInstrumentation instrumentation =
+        DefaultMetricsSystem.instance().register(sourceName,
+                sourceDesc, new AzureFileSystemInstrumentation(conf));
+
+    AzureFileSystemMetricsSystem.registerSource(
+        sourceName, sourceDesc, instrumentation);
+    
+    
     // Create a new AzureNativeFileSystemStore object.
     AzureNativeFileSystemStore testStorage = new AzureNativeFileSystemStore();
 
     // Initialize the store with the throttling feedback interfaces.
-    testStorage.initialize(accountUri, conf);
+    testStorage.initialize(accountUri, conf, instrumentation);
 
     // Create test account initializing the appropriate member variables.
     AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(
@@ -722,5 +792,20 @@ public final class AzureBlobStorageTestA
   public MockStorageInterface getMockStorage() {
     return mockStorage;
   }
+  
+  public static class StandardCollector implements MetricsSink {
+    @Override
+    public void init(SubsetConfiguration conf) {
+    }
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+      addRecord(record);
+    }
+
+    @Override
+    public void flush() {
+    }
+  }
  
 }

Modified: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java?rev=1605187&r1=1605186&r2=1605187&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java Tue Jun 24 20:52:44 2014
@@ -33,11 +33,9 @@ import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.junit.Test;
 
 import com.microsoft.windowsazure.storage.OperationContext;

Added: hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java?rev=1605187&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java Tue Jun 24 20:52:44 2014
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.metrics;
+
+import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_BYTES_READ;
+import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_BYTES_WRITTEN;
+import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_RAW_BYTES_DOWNLOADED;
+import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_RAW_BYTES_UPLOADED;
+import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_WEB_RESPONSES;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+public final class AzureMetricsTestUtil {
+  public static long getLongGaugeValue(AzureFileSystemInstrumentation instrumentation,
+      String gaugeName) {
+	  return getLongGauge(gaugeName, getMetrics(instrumentation));
+  }
+  
+  /**
+   * Gets the current value of the given counter.
+   */
+  public static long getLongCounterValue(AzureFileSystemInstrumentation instrumentation,
+      String counterName) {
+    return getLongCounter(counterName, getMetrics(instrumentation));
+  }
+
+
+  /**
+   * Gets the current value of the wasb_bytes_written_last_second counter.
+   */
+  public static long getCurrentBytesWritten(AzureFileSystemInstrumentation instrumentation) {
+    return getLongGaugeValue(instrumentation, WASB_BYTES_WRITTEN);
+  }
+
+  /**
+   * Gets the current value of the wasb_bytes_read_last_second counter.
+   */
+  public static long getCurrentBytesRead(AzureFileSystemInstrumentation instrumentation) {
+    return getLongGaugeValue(instrumentation, WASB_BYTES_READ);
+  }
+
+  /**
+   * Gets the current value of the wasb_raw_bytes_uploaded counter.
+   */
+  public static long getCurrentTotalBytesWritten(
+      AzureFileSystemInstrumentation instrumentation) {
+    return getLongCounterValue(instrumentation, WASB_RAW_BYTES_UPLOADED);
+  }
+
+  /**
+   * Gets the current value of the wasb_raw_bytes_downloaded counter.
+   */
+  public static long getCurrentTotalBytesRead(
+      AzureFileSystemInstrumentation instrumentation) {
+    return getLongCounterValue(instrumentation, WASB_RAW_BYTES_DOWNLOADED);
+  }
+
+  /**
+   * Gets the current value of the asv_web_responses counter.
+   */
+  public static long getCurrentWebResponses(
+      AzureFileSystemInstrumentation instrumentation) {
+    return getLongCounter(WASB_WEB_RESPONSES, getMetrics(instrumentation));
+  }
+}



Mime
View raw message