hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mackror...@apache.org
Subject [35/45] hadoop git commit: HADOOP-15664. ABFS: Reduce test run time via parallelization and grouping. Contributed by Da Zhou.
Date Sun, 23 Sep 2018 03:24:32 GMT
HADOOP-15664. ABFS: Reduce test run time via parallelization and grouping.
Contributed by Da Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4410eacb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4410eacb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4410eacb

Branch: refs/heads/trunk
Commit: 4410eacba7862ec24173356fe3fd468fd79aeb8f
Parents: 81dc4a9
Author: Thomas Marquardt <tmarq@microsoft.com>
Authored: Sat Sep 1 20:39:34 2018 +0000
Committer: Thomas Marquardt <tmarq@microsoft.com>
Committed: Mon Sep 17 19:54:01 2018 +0000

----------------------------------------------------------------------
 hadoop-tools/hadoop-azure/pom.xml               | 350 ++++++++++++++++++-
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java |   8 +-
 .../fs/azurebfs/services/AbfsOutputStream.java  |   6 +
 .../azure/ITestNativeFileSystemStatistics.java  |  99 ++++++
 .../fs/azure/NativeAzureFileSystemBaseTest.java |  80 +----
 .../fs/azure/integration/AzureTestUtils.java    |  53 ++-
 .../ITestAzureBlobFileSystemE2EScale.java       |  11 +-
 .../ITestAzureBlobFileSystemFileStatus.java     |   3 +
 .../azurebfs/ITestAzureBlobFileSystemFlush.java | 167 +++++----
 .../fs/azurebfs/ITestWasbAbfsCompatibility.java |   2 +-
 10 files changed, 631 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4410eacb/hadoop-tools/hadoop-azure/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index 7152f638..42f4d05 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -253,6 +253,351 @@
 
   <profiles>
     <profile>
+      <id>parallel-tests-wasb</id>
+      <activation>
+        <property>
+          <name>parallel-tests-wasb</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>create-parallel-tests-dirs</id>
+                <phase>test-compile</phase>
+                <configuration>
+                  <target>
+                    <script language="javascript"><![CDATA[
+                      var baseDirs = [
+                        project.getProperty("test.build.data"),
+                        project.getProperty("test.build.dir"),
+                        project.getProperty("hadoop.tmp.dir")
+                      ];
+                      for (var i in baseDirs) {
+                        for (var j = 1; j <= ${testsThreadCount}; ++j) {
+                          var mkdir = project.createTask("mkdir");
+                          mkdir.setDir(new java.io.File(baseDirs[i], j));
+                          mkdir.perform();
+                        }
+                      }
+                    ]]></script>
+                  </target>
+                </configuration>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>default-test</id>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <forkCount>1</forkCount>
+                  <forkCount>${testsThreadCount}</forkCount>
+                  <reuseForks>false</reuseForks>
+                  <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+                  <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+                  <systemPropertyVariables>
+                    <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+                    <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+                    <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+                    <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
+                    <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
+                    <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                    <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+                    <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
+                  </systemPropertyVariables>
+                  <includes>
+                    <include>**/azure/Test*.java</include>
+                    <include>**/azure/**/Test*.java</include>
+                  </includes>
+                  <excludes>
+                    <exclude>**/azure/**/TestRollingWindowAverage*.java</exclude>
+                  </excludes>
+                </configuration>
+              </execution>
+              <execution>
+                <id>serialized-test-wasb</id>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <forkCount>1</forkCount>
+                  <reuseForks>false</reuseForks>
+                  <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+                  <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+                  <systemPropertyVariables>
+                    <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+                    <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+                    <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+                    <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
+                    <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
+                    <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                    <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+                    <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
+                  </systemPropertyVariables>
+                  <includes>
+                    <include>**/azure/**/TestRollingWindowAverage*.java</include>
+                  </includes>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-failsafe-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>default-integration-test-wasb</id>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+                <configuration>
+                  <forkCount>1</forkCount>
+                  <forkCount>${testsThreadCount}</forkCount>
+                  <reuseForks>false</reuseForks>
+                  <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+                  <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+                  <trimStackTrace>false</trimStackTrace>
+                  <systemPropertyVariables>
+                    <!-- Tell tests that they are being executed in parallel -->
+                    <test.parallel.execution>true</test.parallel.execution>
+                    <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+                    <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+                    <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+
+                    <!-- Due to a Maven quirk, setting this to just -->
+                    <!-- surefire.forkNumber won't do the parameter -->
+                    <!-- substitution.  Putting a prefix in front of it like -->
+                    <!-- "fork-" makes it work. -->
+                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <!-- Propagate scale parameters -->
+                    <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+                    <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
+                    <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
+                    <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                    <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+                    <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
+                  </systemPropertyVariables>
+                  <!-- Some tests cannot run in parallel-->
+                  <includes>
+                    <include>**/azure/ITest*.java</include>
+                    <include>**/azure/**/ITest*.java</include>
+                  </includes>
+                  <excludes>
+                    <exclude>**/azure/ITestNativeFileSystemStatistics.java</exclude>
+                  </excludes>
+                </configuration>
+              </execution>
+              <!-- Do a sequential run for tests that cannot handle -->
+              <!-- parallel execution. -->
+              <execution>
+                <id>sequential-integration-tests-wasb</id>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+                <configuration>
+                  <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+                  <trimStackTrace>false</trimStackTrace>
+                  <systemPropertyVariables>
+                    <test.parallel.execution>false</test.parallel.execution>
+                    <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+                    <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
+                    <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
+                    <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                    <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+                    <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
+                  </systemPropertyVariables>
+                  <includes>
+                    <include>**/azure/ITestNativeFileSystemStatistics.java</include>
+                  </includes>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <profile>
+      <id>parallel-tests-abfs</id>
+      <activation>
+        <property>
+          <name>parallel-tests-abfs</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>create-parallel-tests-dirs</id>
+                <phase>test-compile</phase>
+                <configuration>
+                  <target>
+                    <script language="javascript"><![CDATA[
+                      var baseDirs = [
+                        project.getProperty("test.build.data"),
+                        project.getProperty("test.build.dir"),
+                        project.getProperty("hadoop.tmp.dir")
+                      ];
+                      for (var i in baseDirs) {
+                        for (var j = 1; j <= ${testsThreadCount}; ++j) {
+                          var mkdir = project.createTask("mkdir");
+                          mkdir.setDir(new java.io.File(baseDirs[i], j));
+                          mkdir.perform();
+                        }
+                      }
+                    ]]></script>
+                  </target>
+                </configuration>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>default-test</id>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <forkCount>${testsThreadCount}</forkCount>
+                  <reuseForks>false</reuseForks>
+                  <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+                  <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+                  <systemPropertyVariables>
+                    <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+                    <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+                    <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+                    <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
+                    <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
+                    <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                    <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+                    <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
+                  </systemPropertyVariables>
+                  <includes>
+                    <include>**/azurebfs/Test*.java</include>
+                    <include>**/azurebfs/**/Test*.java</include>
+                  </includes>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-failsafe-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>integration-test-abfs-parallel-classesAndMethods</id>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+                <configuration>
+                  <forkCount>${testsThreadCount}</forkCount>
+                  <reuseForks>true</reuseForks>
+                  <parallel>both</parallel>
+                  <threadCount>${testsThreadCount}</threadCount>
+                  <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+                  <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+                  <trimStackTrace>false</trimStackTrace>
+                  <systemPropertyVariables>
+                    <!-- Tell tests that they are being executed in parallel -->
+                    <test.parallel.execution>true</test.parallel.execution>
+                    <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+                    <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+                    <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+                    <!-- Due to a Maven quirk, setting this to just -->
+                    <!-- surefire.forkNumber won't do the parameter -->
+                    <!-- substitution.  Putting a prefix in front of it like -->
+                    <!-- "fork-" makes it work. -->
+                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <!-- Propagate scale parameters -->
+                    <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+                    <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                  </systemPropertyVariables>
+
+                  <includes>
+                    <include>**/azurebfs/ITest*.java</include>
+                    <include>**/azurebfs/**/ITest*.java</include>
+                  </includes>
+                  <excludes>
+                    <exclude>**/azurebfs/contract/ITest*.java</exclude>
+                    <exclude>**/azurebfs/ITestAzureBlobFileSystemE2EScale.java</exclude>
+                    <exclude>**/azurebfs/ITestAbfsReadWriteAndSeek.java</exclude>
+                    <exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
+                  </excludes>
+
+                </configuration>
+              </execution>
+              <execution>
+                <id>integration-test-abfs-parallel-classes</id>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+                <configuration>
+                  <forkCount>${testsThreadCount}</forkCount>
+                  <reuseForks>false</reuseForks>
+                  <!--NOTICE: hadoop contract tests methods can not be ran in parallel-->
+                  <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+                  <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+                  <trimStackTrace>false</trimStackTrace>
+                  <systemPropertyVariables>
+                    <!-- Tell tests that they are being executed in parallel -->
+                    <test.parallel.execution>true</test.parallel.execution>
+                    <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+                    <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+                    <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+
+                    <!-- Due to a Maven quirk, setting this to just -->
+                    <!-- surefire.forkNumber won't do the parameter -->
+                    <!-- substitution.  Putting a prefix in front of it like -->
+                    <!-- "fork-" makes it work. -->
+                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <!-- Propagate scale parameters -->
+                    <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+                    <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                  </systemPropertyVariables>
+                  <includes>
+                    <include>**/azurebfs/contract/ITest*.java</include>
+                    <include>**/azurebfs/ITestAzureBlobFileSystemE2EScale.java</include>
+                    <include>**/azurebfs/ITestAbfsReadWriteAndSeek.java</include>
+                    <include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include>
+                  </includes>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <profile>
       <id>parallel-tests</id>
       <activation>
         <property>
@@ -417,6 +762,7 @@
                     <exclude>**/ITestWasbRemoteCallHelper.java</exclude>
                     <exclude>**/ITestBlockBlobInputStream.java</exclude>
                     <exclude>**/ITestWasbAbfsCompatibility.java</exclude>
+                    <exclude>**/ITestNativeFileSystemStatistics.java</exclude>
                   </excludes>
                 </configuration>
               </execution>
@@ -452,6 +798,7 @@
                     <include>**/ITestAzureBlobFileSystemRandomRead.java</include>
                     <include>**/ITestWasbRemoteCallHelper.java</include>
                     <include>**/ITestBlockBlobInputStream.java</include>
+                    <include>**/ITestNativeFileSystemStatistics.java</include>
                   </includes>
                 </configuration>
               </execution>
@@ -460,11 +807,12 @@
         </plugins>
       </build>
     </profile>
+
     <profile>
       <id>sequential-tests</id>
       <activation>
         <property>
-          <name>!parallel-tests</name>
+          <name>sequential-tests</name>
         </property>
       </activation>
       <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4410eacb/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 4bde9d8..b809192 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -107,7 +107,11 @@ public class AzureBlobFileSystem extends FileSystem {
 
     if (abfsStore.getAbfsConfiguration().getCreateRemoteFileSystemDuringInitialization()) {
       if (!this.fileSystemExists()) {
-        this.createFileSystem();
+        try {
+          this.createFileSystem();
+        } catch (AzureBlobFileSystemException ex) {
+          checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS);
+        }
       }
     }
 
@@ -121,7 +125,7 @@ public class AzureBlobFileSystem extends FileSystem {
     if (UserGroupInformation.isSecurityEnabled()) {
       this.delegationTokenEnabled = abfsStore.getAbfsConfiguration().isDelegationTokenManagerEnabled();
 
-      if(this.delegationTokenEnabled) {
+      if (this.delegationTokenEnabled) {
         LOG.debug("Initializing DelegationTokenManager for {}", uri);
         this.delegationTokenManager = abfsStore.getAbfsConfiguration().getDelegationTokenManager();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4410eacb/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 92e081e..7e43090 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.fs.FSExceptionMessages;
@@ -369,4 +370,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
       this.length = length;
     }
   }
+
+  @VisibleForTesting
+  public synchronized void waitForPendingUploads() throws IOException {
+    waitForTaskToComplete();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4410eacb/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeFileSystemStatistics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeFileSystemStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeFileSystemStatistics.java
new file mode 100644
index 0000000..cbb09dd
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeFileSystemStatistics.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assume.assumeNotNull;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.cleanupTestAccount;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.readStringFromFile;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToFile;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+/**
+ * Because FileSystem.Statistics is per FileSystem, so statistics can not be ran in
+ * parallel, hence in this test file, force them to run in sequential.
+ * */
+public class ITestNativeFileSystemStatistics extends AbstractWasbTestWithTimeout{
+
+  @Test
+  public void test_001_NativeAzureFileSystemMocked() throws Exception {
+    AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount.createMock();
+    assumeNotNull(testAccount);
+    testStatisticsWithAccount(testAccount);
+  }
+
+  @Test
+  public void test_002_NativeAzureFileSystemPageBlobLive() throws Exception {
+    Configuration conf = new Configuration();
+    // Configure the page blob directories key so every file created is a page blob.
+    conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+    // Configure the atomic rename directories key so every folder will have
+    // atomic rename applied.
+    conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+    AzureBlobStorageTestAccount testAccount =  AzureBlobStorageTestAccount.create(conf);
+    assumeNotNull(testAccount);
+    testStatisticsWithAccount(testAccount);
+  }
+
+  @Test
+  public void test_003_NativeAzureFileSystem() throws Exception {
+    AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount.create();
+    assumeNotNull(testAccount);
+    testStatisticsWithAccount(testAccount);
+  }
+
+  private void testStatisticsWithAccount(AzureBlobStorageTestAccount testAccount) throws Exception {
+    assumeNotNull(testAccount);
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    testStatistics(fs);
+    cleanupTestAccount(testAccount);
+  }
+
+  /**
+   * When tests are ran in parallel, this tests will fail because
+   * FileSystem.Statistics is per FileSystem class.
+   */
+  @SuppressWarnings("deprecation")
+  private void testStatistics(NativeAzureFileSystem fs) throws Exception {
+    FileSystem.clearStatistics();
+    FileSystem.Statistics stats = FileSystem.getStatistics("wasb",
+            NativeAzureFileSystem.class);
+    assertEquals(0, stats.getBytesRead());
+    assertEquals(0, stats.getBytesWritten());
+    Path newFile = new Path("testStats");
+    writeStringToFile(fs, newFile, "12345678");
+    assertEquals(8, stats.getBytesWritten());
+    assertEquals(0, stats.getBytesRead());
+    String readBack = readStringFromFile(fs, newFile);
+    assertEquals("12345678", readBack);
+    assertEquals(8, stats.getBytesRead());
+    assertEquals(8, stats.getBytesWritten());
+    assertTrue(fs.delete(newFile, true));
+    assertEquals(8, stats.getBytesRead());
+    assertEquals(8, stats.getBytesWritten());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4410eacb/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
index 726b504..19d370e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
@@ -18,14 +18,10 @@
 
 package org.apache.hadoop.fs.azure;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -51,6 +47,9 @@ import com.microsoft.azure.storage.AccessCondition;
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.CloudBlob;
 
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.readStringFromFile;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToFile;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToStream;
 import static org.apache.hadoop.test.GenericTestUtils.*;
 
 /*
@@ -329,12 +328,12 @@ public abstract class NativeAzureFileSystemBaseTest
     FileSystem localFs = FileSystem.get(new Configuration());
     localFs.delete(localFilePath, true);
     try {
-      writeString(localFs, localFilePath, "Testing");
+      writeStringToFile(localFs, localFilePath, "Testing");
       Path dstPath = methodPath();
       assertTrue(FileUtil.copy(localFs, localFilePath, fs, dstPath, false,
           fs.getConf()));
       assertPathExists("coied from local", dstPath);
-      assertEquals("Testing", readString(fs, dstPath));
+      assertEquals("Testing", readStringFromFile(fs, dstPath));
       fs.delete(dstPath, true);
     } finally {
       localFs.delete(localFilePath, true);
@@ -364,26 +363,6 @@ public abstract class NativeAzureFileSystemBaseTest
   }
 
   @Test
-  public void testStatistics() throws Exception {
-    FileSystem.clearStatistics();
-    FileSystem.Statistics stats = FileSystem.getStatistics("wasb",
-        NativeAzureFileSystem.class);
-    assertEquals(0, stats.getBytesRead());
-    assertEquals(0, stats.getBytesWritten());
-    Path newFile = new Path("testStats");
-    writeString(newFile, "12345678");
-    assertEquals(8, stats.getBytesWritten());
-    assertEquals(0, stats.getBytesRead());
-    String readBack = readString(newFile);
-    assertEquals("12345678", readBack);
-    assertEquals(8, stats.getBytesRead());
-    assertEquals(8, stats.getBytesWritten());
-    assertTrue(fs.delete(newFile, true));
-    assertEquals(8, stats.getBytesRead());
-    assertEquals(8, stats.getBytesWritten());
-  }
-
-  @Test
   public void testUriEncoding() throws Exception {
     fs.create(new Path("p/t%5Fe")).close();
     FileStatus[] listing = fs.listStatus(new Path("p"));
@@ -767,7 +746,7 @@ public abstract class NativeAzureFileSystemBaseTest
     Path renamePendingFile = new Path(renamePendingStr);
     FSDataOutputStream out = fs.create(renamePendingFile, true);
     assertTrue(out != null);
-    writeString(out, renameDescription);
+    writeStringToStream(out, renameDescription);
 
     // Redo the rename operation based on the contents of the -RenamePending.json file.
     // Trigger the redo by checking for existence of the original folder. It must appear
@@ -831,7 +810,7 @@ public abstract class NativeAzureFileSystemBaseTest
     Path renamePendingFile = new Path(renamePendingStr);
     FSDataOutputStream out = fs.create(renamePendingFile, true);
     assertTrue(out != null);
-    writeString(out, pending.makeRenamePendingFileContents());
+    writeStringToStream(out, pending.makeRenamePendingFileContents());
 
     // Redo the rename operation based on the contents of the
     // -RenamePending.json file. Trigger the redo by checking for existence of
@@ -886,7 +865,7 @@ public abstract class NativeAzureFileSystemBaseTest
     Path renamePendingFile = new Path(renamePendingStr);
     FSDataOutputStream out = fs.create(renamePendingFile, true);
     assertTrue(out != null);
-    writeString(out, pending.makeRenamePendingFileContents());
+    writeStringToStream(out, pending.makeRenamePendingFileContents());
 
     // Rename inner folder to simulate the scenario where rename has started and
     // only one directory has been renamed but not the files under it
@@ -1000,7 +979,7 @@ public abstract class NativeAzureFileSystemBaseTest
     Path renamePendingFile = new Path(renamePendingStr);
     FSDataOutputStream out = fs.create(renamePendingFile, true);
     assertTrue(out != null);
-    writeString(out, pending.makeRenamePendingFileContents());
+    writeStringToStream(out, pending.makeRenamePendingFileContents());
 
     try {
       pending.redo();
@@ -1228,7 +1207,7 @@ public abstract class NativeAzureFileSystemBaseTest
       Path renamePendingFile = new Path(renamePendingStr);
       FSDataOutputStream out = fs.create(renamePendingFile, true);
       assertTrue(out != null);
-      writeString(out, renameDescription);
+      writeStringToStream(out, renameDescription);
     }
 
     // set whether a child is present or not
@@ -1488,7 +1467,7 @@ public abstract class NativeAzureFileSystemBaseTest
     Calendar utc = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
     long currentUtcTime = utc.getTime().getTime();
     FileStatus fileStatus = fs.getFileStatus(testPath);
-    final long errorMargin = 10 * 1000; // Give it +/-10 seconds
+    final long errorMargin = 60 * 1000; // Give it +/-60 seconds
     assertTrue("Modification time " +
         new Date(fileStatus.getModificationTime()) + " is not close to now: " +
         utc.getTime(),
@@ -1504,45 +1483,12 @@ public abstract class NativeAzureFileSystemBaseTest
   }
 
   private String readString(Path testFile) throws IOException {
-    return readString(fs, testFile);
+    return readStringFromFile(fs, testFile);
   }
 
-  private String readString(FileSystem fs, Path testFile) throws IOException {
-    FSDataInputStream inputStream = fs.open(testFile);
-    String ret = readString(inputStream);
-    inputStream.close();
-    return ret;
-  }
-
-  private String readString(FSDataInputStream inputStream) throws IOException {
-    BufferedReader reader = new BufferedReader(new InputStreamReader(
-        inputStream));
-    final int BUFFER_SIZE = 1024;
-    char[] buffer = new char[BUFFER_SIZE];
-    int count = reader.read(buffer, 0, BUFFER_SIZE);
-    if (count > BUFFER_SIZE) {
-      throw new IOException("Exceeded buffer size");
-    }
-    inputStream.close();
-    return new String(buffer, 0, count);
-  }
 
   private void writeString(Path path, String value) throws IOException {
-    writeString(fs, path, value);
-  }
-
-  private void writeString(FileSystem fs, Path path, String value)
-      throws IOException {
-    FSDataOutputStream outputStream = fs.create(path, true);
-    writeString(outputStream, value);
-  }
-
-  private void writeString(FSDataOutputStream outputStream, String value)
-      throws IOException {
-    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
-        outputStream));
-    writer.write(value);
-    writer.close();
+    writeStringToFile(fs, path, value);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4410eacb/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestUtils.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestUtils.java
index b438c8e..c46320a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestUtils.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestUtils.java
@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.fs.azure.integration;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.net.URI;
 import java.util.List;
 
@@ -30,12 +34,15 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
 import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
 
+import static org.junit.Assume.assumeTrue;
 
 import static org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.WASB_ACCOUNT_NAME_DOMAIN_SUFFIX_REGEX;
 import static org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.WASB_TEST_ACCOUNT_NAME_WITH_DOMAIN;
@@ -43,7 +50,6 @@ import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.*;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getLongGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.junit.Assume.assumeTrue;
 
 /**
  * Utilities for the Azure tests. Based on {@code S3ATestUtils}, so
@@ -494,4 +500,49 @@ public final class AzureTestUtils extends Assert {
     return accountName;
   }
 
+  /**
+   * Write string into a file.
+   */
+  public static void writeStringToFile(FileSystem fs, Path path, String value)
+          throws IOException {
+    FSDataOutputStream outputStream = fs.create(path, true);
+    writeStringToStream(outputStream, value);
+  }
+
+  /**
+   * Write string into a file.
+   */
+  public static void writeStringToStream(FSDataOutputStream outputStream, String value)
+          throws IOException {
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+            outputStream));
+    writer.write(value);
+    writer.close();
+  }
+
+  /**
+   * Read string from a file.
+   */
+  public static String readStringFromFile(FileSystem fs, Path testFile) throws IOException {
+    FSDataInputStream inputStream = fs.open(testFile);
+    String ret = readStringFromStream(inputStream);
+    inputStream.close();
+    return ret;
+  }
+
+  /**
+   * Read string from stream.
+   */
+  public static String readStringFromStream(FSDataInputStream inputStream) throws IOException {
+    BufferedReader reader = new BufferedReader(new InputStreamReader(
+            inputStream));
+    final int BUFFER_SIZE = 1024;
+    char[] buffer = new char[BUFFER_SIZE];
+    int count = reader.read(buffer, 0, BUFFER_SIZE);
+    if (count > BUFFER_SIZE) {
+      throw new IOException("Exceeded buffer size");
+    }
+    inputStream.close();
+    return new String(buffer, 0, count);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4410eacb/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
index 522b635..7ed9d42 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
@@ -44,7 +44,6 @@ public class ITestAzureBlobFileSystemE2EScale extends
   private static final int BASE_SIZE = 1024;
   private static final int ONE_MB = 1024 * 1024;
   private static final int DEFAULT_WRITE_TIMES = 100;
-  private static final Path TEST_FILE = new Path("ITestAzureBlobFileSystemE2EScale");
 
   public ITestAzureBlobFileSystemE2EScale() {
   }
@@ -52,7 +51,8 @@ public class ITestAzureBlobFileSystemE2EScale extends
   @Test
   public void testWriteHeavyBytesToFileAcrossThreads() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
-    final FSDataOutputStream stream = fs.create(TEST_FILE);
+    final Path testFile = path(methodName.getMethodName());
+    final FSDataOutputStream stream = fs.create(testFile);
     ExecutorService es = Executors.newFixedThreadPool(TEN);
 
     int testWriteBufferSize = 2 * TEN * ONE_THOUSAND * BASE_SIZE;
@@ -81,7 +81,7 @@ public class ITestAzureBlobFileSystemE2EScale extends
     stream.close();
 
     es.shutdownNow();
-    FileStatus fileStatus = fs.getFileStatus(TEST_FILE);
+    FileStatus fileStatus = fs.getFileStatus(testFile);
     assertEquals(testWriteBufferSize * operationCount, fileStatus.getLen());
   }
 
@@ -89,9 +89,10 @@ public class ITestAzureBlobFileSystemE2EScale extends
   public void testReadWriteHeavyBytesToFileWithStatistics() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
     final FileSystem.Statistics abfsStatistics;
+    final Path testFile = path(methodName.getMethodName());
     int testBufferSize;
     final byte[] sourceData;
-    try (FSDataOutputStream stream = fs.create(TEST_FILE)) {
+    try (FSDataOutputStream stream = fs.create(testFile)) {
       abfsStatistics = fs.getFsStatistics();
       abfsStatistics.reset();
 
@@ -103,7 +104,7 @@ public class ITestAzureBlobFileSystemE2EScale extends
 
     final byte[] remoteData = new byte[testBufferSize];
     int bytesRead;
-    try (FSDataInputStream inputStream = fs.open(TEST_FILE, 4 * ONE_MB)) {
+    try (FSDataInputStream inputStream = fs.open(testFile, 4 * ONE_MB)) {
       bytesRead = inputStream.read(remoteData);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4410eacb/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
index 88f77b0..dba10f5 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -53,6 +54,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
     assertEquals("root listing", 0, rootls.length);
   }
 
+  @Ignore("When running against live abfs with Oauth account, this test will fail. Need to check the tenant.")
   @Test
   public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
@@ -86,6 +88,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
     return fileStatus;
   }
 
+  @Ignore("When running against live abfs with Oauth account, this test will fail. Need to check the tenant.")
   @Test
   public void testFolderStatusPermissionsAndOwnerAndGroup() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4410eacb/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
index 7c6bbb5..337f95c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
@@ -18,20 +18,19 @@
 
 package org.apache.hadoop.fs.azurebfs;
 
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.EnumSet;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.io.IOException;
 
-import com.microsoft.azure.storage.blob.BlockEntry;
-import com.microsoft.azure.storage.blob.BlockListingFilter;
-import com.microsoft.azure.storage.blob.CloudBlockBlob;
 import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.hamcrest.core.IsEqual;
 import org.hamcrest.core.IsNot;
 import org.junit.Assume;
@@ -43,11 +42,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
 /**
  * Test flush operation.
+ * This class cannot be run in parallel test mode--check comments in
+ * testWriteHeavyBytesToFileSyncFlush().
  */
 public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   private static final int BASE_SIZE = 1024;
@@ -55,11 +55,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   private static final int TEST_BUFFER_SIZE = 5 * ONE_THOUSAND * BASE_SIZE;
   private static final int ONE_MB = 1024 * 1024;
   private static final int FLUSH_TIMES = 200;
-  private static final int THREAD_SLEEP_TIME = 6000;
+  private static final int THREAD_SLEEP_TIME = 1000;
 
-  private static final Path TEST_FILE_PATH = new Path("/testfile");
   private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
-  private static final int WAITING_TIME = 4000;
+  private static final int WAITING_TIME = 1000;
 
   public ITestAzureBlobFileSystemFlush() {
     super();
@@ -68,8 +67,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   @Test
   public void testAbfsOutputStreamAsyncFlushWithRetainUncommittedData() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
+    final Path testFilePath = path(methodName.getMethodName());
     final byte[] b;
-    try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
+    try (FSDataOutputStream stream = fs.create(testFilePath)) {
       b = new byte[TEST_BUFFER_SIZE];
       new Random().nextBytes(b);
 
@@ -84,7 +84,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
     }
 
     final byte[] r = new byte[TEST_BUFFER_SIZE];
-    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) {
+    try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) {
       while (inputStream.available() != 0) {
         int result = inputStream.read(r);
 
@@ -97,8 +97,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   @Test
   public void testAbfsOutputStreamSyncFlush() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
+    final Path testFilePath = path(methodName.getMethodName());
+
     final byte[] b;
-    try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
+    try (FSDataOutputStream stream = fs.create(testFilePath)) {
       b = new byte[TEST_BUFFER_SIZE];
       new Random().nextBytes(b);
       stream.write(b);
@@ -111,7 +113,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
     }
 
     final byte[] r = new byte[TEST_BUFFER_SIZE];
-    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) {
+    try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) {
       int result = inputStream.read(r);
 
       assertNotEquals(-1, result);
@@ -123,12 +125,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   @Test
   public void testWriteHeavyBytesToFileSyncFlush() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
-    final FileSystem.Statistics abfsStatistics;
+    final Path testFilePath = path(methodName.getMethodName());
     ExecutorService es;
-    try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
-      abfsStatistics = fs.getFsStatistics();
-      abfsStatistics.reset();
-
+    try (FSDataOutputStream stream = fs.create(testFilePath)) {
       es = Executors.newFixedThreadPool(10);
 
       final byte[] b = new byte[TEST_BUFFER_SIZE];
@@ -163,18 +162,18 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
     }
 
     es.shutdownNow();
-    FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
+    FileStatus fileStatus = fs.getFileStatus(testFilePath);
     long expectedWrites = (long) TEST_BUFFER_SIZE * FLUSH_TIMES;
-    assertEquals("Wrong file length in " + fileStatus, expectedWrites, fileStatus.getLen());
-    assertEquals("wrong bytes Written count in " + abfsStatistics,
-        expectedWrites, abfsStatistics.getBytesWritten());
+    assertEquals("Wrong file length in " + testFilePath, expectedWrites, fileStatus.getLen());
   }
 
   @Test
   public void testWriteHeavyBytesToFileAsyncFlush() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
     ExecutorService es = Executors.newFixedThreadPool(10);
-    try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
+
+    final Path testFilePath = path(methodName.getMethodName());
+    try (FSDataOutputStream stream = fs.create(testFilePath)) {
 
       final byte[] b = new byte[TEST_BUFFER_SIZE];
       new Random().nextBytes(b);
@@ -207,54 +206,50 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
     }
 
     es.shutdownNow();
-    FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
+    FileStatus fileStatus = fs.getFileStatus(testFilePath);
     assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
   }
 
   @Test
   public void testFlushWithFlushEnabled() throws Exception {
-    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
-
-    AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
-    String wasbUrl = testAccount.getFileSystem().getName();
-    String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
-    final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
-    // test only valid for non-namespace enabled account
-    Assume.assumeFalse(fs.getIsNamespaceEnabeld());
-
-    byte[] buffer = getRandomBytesArray();
-    CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
-    try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
-      // Wait for write request to be executed
-      Thread.sleep(WAITING_TIME);
-      stream.flush();
-      ArrayList<BlockEntry> blockList = blob.downloadBlockList(
-              BlockListingFilter.COMMITTED, null, null, null);
-      // verify block has been committed
-      assertEquals(1, blockList.size());
-    }
+    testFlush(true);
   }
 
   @Test
   public void testFlushWithFlushDisabled() throws Exception {
+    testFlush(false);
+  }
+
+  private void testFlush(boolean flushEnabled) throws Exception {
     Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
-    AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
-    String wasbUrl = testAccount.getFileSystem().getName();
-    String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
-    final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
-    // test only valid for non-namespace enabled account
-    Assume.assumeFalse(fs.getIsNamespaceEnabeld());
 
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) getFileSystem();
+
+    // Simulate setting "fs.azure.enable.flush" to true or false
+    fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(flushEnabled);
+
+    final Path testFilePath = path(methodName.getMethodName());
     byte[] buffer = getRandomBytesArray();
-    CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
-    try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
-      // Wait for write request to be executed
-      Thread.sleep(WAITING_TIME);
+
+    // The test case must write "fs.azure.write.request.size" bytes
+    // to the stream in order for the data to be uploaded to storage.
+    assertEquals(
+        fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(),
+        buffer.length);
+
+    try (FSDataOutputStream stream = fs.create(testFilePath)) {
+      stream.write(buffer);
+
+      // Write asynchronously uploads data, so we must wait for completion
+      AbfsOutputStream abfsStream = (AbfsOutputStream) stream.getWrappedStream();
+      abfsStream.waitForPendingUploads();
+
+      // Flush commits the data so it can be read.
       stream.flush();
-      ArrayList<BlockEntry> blockList = blob.downloadBlockList(
-              BlockListingFilter.COMMITTED, null, null, null);
-      // verify block has not been committed
-      assertEquals(0, blockList.size());
+
+      // Verify that the data can be read if flushEnabled is true; and otherwise
+      // cannot be read.
+      validate(fs.open(testFilePath), buffer, flushEnabled);
     }
   }
 
@@ -262,9 +257,12 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   public void testHflushWithFlushEnabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
     byte[] buffer = getRandomBytesArray();
-    try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
+    String fileName = UUID.randomUUID().toString();
+    final Path testFilePath = path(fileName);
+
+    try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
       stream.hflush();
-      validate(fs, TEST_FILE_PATH, buffer, true);
+      validate(fs, testFilePath, buffer, true);
     }
   }
 
@@ -272,9 +270,11 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   public void testHflushWithFlushDisabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
     byte[] buffer = getRandomBytesArray();
-    try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
+    final Path testFilePath = path(methodName.getMethodName());
+
+    try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
       stream.hflush();
-      validate(fs, TEST_FILE_PATH, buffer, false);
+      validate(fs, testFilePath, buffer, false);
     }
   }
 
@@ -282,9 +282,12 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   public void testHsyncWithFlushEnabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
     byte[] buffer = getRandomBytesArray();
-    try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
+
+    final Path testFilePath = path(methodName.getMethodName());
+
+    try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
       stream.hsync();
-      validate(fs, TEST_FILE_PATH, buffer, true);
+      validate(fs, testFilePath, buffer, true);
     }
   }
 
@@ -292,7 +295,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
     byte[] buffer = getRandomBytesArray();
-    try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
+
+    final Path testFilePath = path(methodName.getMethodName());
+
+    try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
       assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH));
       assertFalse(stream.hasCapability(StreamCapabilities.HSYNC));
       assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
@@ -305,7 +311,8 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
     byte[] buffer = getRandomBytesArray();
-    try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
+    final Path testFilePath = path(methodName.getMethodName());
+    try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
       assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
       assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
       assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
@@ -318,9 +325,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   public void testHsyncWithFlushDisabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
     byte[] buffer = getRandomBytesArray();
-    try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
+    final Path testFilePath = path(methodName.getMethodName());
+    try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
       stream.hsync();
-      validate(fs, TEST_FILE_PATH, buffer, false);
+      validate(fs, testFilePath, buffer, false);
     }
   }
 
@@ -337,11 +345,28 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
     return stream;
   }
 
-  private AzureBlobStorageTestAccount createWasbTestAccount() throws Exception {
-    return AzureBlobStorageTestAccount.create("", EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
-            this.getConfiguration());
-  }
+  private void validate(InputStream stream, byte[] writeBuffer, boolean isEqual)
+      throws IOException {
+    try {
+      byte[] readBuffer = new byte[writeBuffer.length];
 
+      int numBytesRead = stream.read(readBuffer, 0, readBuffer.length);
+
+      if (isEqual) {
+        assertArrayEquals(
+            "Bytes read do not match bytes written.",
+            writeBuffer,
+            readBuffer);
+      } else {
+        assertThat(
+            "Bytes read unexpectedly match bytes written.",
+            readBuffer,
+            IsNot.not(IsEqual.equalTo(writeBuffer)));
+      }
+    } finally {
+      stream.close();
+    }
+  }
   private void validate(FileSystem fs, Path path, byte[] writeBuffer, boolean isEqual) throws IOException {
     String filePath = path.toUri().toString();
     try (FSDataInputStream inputStream = fs.open(path)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4410eacb/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
index c4bfee2..33a5805 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
@@ -98,7 +98,7 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
     NativeAzureFileSystem wasb = getWasbFileSystem();
 
     for (int i = 0; i< 4; i++) {
-      Path path = new Path("/testfiles/~12/!008/testfile" + i);
+      Path path = new Path("/testReadFile/~12/!008/testfile" + i);
       final FileSystem createFs = createFileWithAbfs[i] ? abfs : wasb;
 
       // Write


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message