hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [20/52] [abbrv] hadoop git commit: HADOOP-13446. Support running isolated unit tests separate from AWS integration tests. Contributed by Chris Nauroth.
Date Fri, 26 Aug 2016 20:35:24 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContext.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContext.java
deleted file mode 100644
index 34d78a5..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContext.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.s3a.fileContext;
-
-import org.apache.hadoop.fs.TestFileContext;
-
-/**
- * Implementation of TestFileContext for S3a
- */
-public class TestS3AFileContext extends TestFileContext{
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextCreateMkdir.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextCreateMkdir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextCreateMkdir.java
deleted file mode 100644
index b0c4d84..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextCreateMkdir.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.s3a.fileContext;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContextCreateMkdirBaseTest;
-import org.apache.hadoop.fs.s3a.S3ATestUtils;
-import org.junit.Before;
-
-/**
- * Extends FileContextCreateMkdirBaseTest for a S3a FileContext
- */
-public class TestS3AFileContextCreateMkdir
-        extends FileContextCreateMkdirBaseTest {
-
-  @Before
-  public void setUp() throws IOException, Exception {
-    Configuration conf = new Configuration();
-    fc = S3ATestUtils.createTestFileContext(conf);
-    super.setUp();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextMainOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextMainOperations.java
deleted file mode 100644
index 4d200d1..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextMainOperations.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.s3a.fileContext;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContextMainOperationsBaseTest;
-import org.apache.hadoop.fs.s3a.S3ATestUtils;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * S3A implementation of FileContextMainOperationsBaseTest
- */
-public class TestS3AFileContextMainOperations
-        extends FileContextMainOperationsBaseTest {
-
-  @Before
-  public void setUp() throws IOException, Exception {
-    Configuration conf = new Configuration();
-    fc = S3ATestUtils.createTestFileContext(conf);
-    super.setUp();
-  }
-
-  @Override
-  protected boolean listCorruptedBlocksSupported() {
-    return false;
-  }
-
-  @Test
-  @Ignore
-  public void testCreateFlagAppendExistingFile() throws IOException {
-    //append not supported, so test removed
-  }
-
-  @Test
-  @Ignore
-  public void testCreateFlagCreateAppendExistingFile() throws IOException {
-    //append not supported, so test removed
-  }
-
-  @Test
-  @Ignore
-  public void testSetVerifyChecksum() throws IOException {
-    //checksums ignored, so test removed
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextStatistics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextStatistics.java
deleted file mode 100644
index a9f4848..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextStatistics.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.s3a.fileContext;
-
-import java.net.URI;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FCStatisticsBaseTest;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.s3a.S3ATestUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-
-/**
- * S3a implementation of FCStatisticsBaseTest
- */
-public class TestS3AFileContextStatistics extends FCStatisticsBaseTest {
-
-  @Before
-  public void setUp() throws Exception {
-    Configuration conf = new Configuration();
-    fc = S3ATestUtils.createTestFileContext(conf);
-    fc.mkdir(fileContextTestHelper.getTestRootPath(fc, "test"), FileContext.DEFAULT_PERM, true);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    fc.delete(fileContextTestHelper.getTestRootPath(fc, "test"), true);
-  }
-
-  @Override
-  protected void verifyReadBytes(FileSystem.Statistics stats) {
-    // one blockSize for read, one for pread
-    Assert.assertEquals(2 * blockSize, stats.getBytesRead());
-  }
-
-  @Override
-  protected void verifyWrittenBytes(FileSystem.Statistics stats) {
-    //No extra bytes are written
-    Assert.assertEquals(blockSize, stats.getBytesWritten());
-  }
-
-  @Override
-  protected URI getFsUri() {
-    return fc.getHomeDirectory().toUri();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextURI.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextURI.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextURI.java
deleted file mode 100644
index 3da7b19..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextURI.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.s3a.fileContext;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContextURIBase;
-import org.apache.hadoop.fs.s3a.S3ATestUtils;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * S3a implementation of FileContextURIBase
- */
-public class TestS3AFileContextURI extends FileContextURIBase {
-
-  @Before
-  public void setUp() throws IOException, Exception {
-    Configuration conf = new Configuration();
-    fc1 = S3ATestUtils.createTestFileContext(conf);
-    fc2 = S3ATestUtils.createTestFileContext(conf); //different object, same FS
-    super.setUp();
-  }
-
-  @Test
-  @Ignore
-  public void testFileStatus() throws IOException {
-    //test disabled (the statistics tested with this method are not relevant for an S3FS)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextUtil.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextUtil.java
deleted file mode 100644
index 666f4c2..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextUtil.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.s3a.fileContext;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContextUtilBase;
-import org.apache.hadoop.fs.s3a.S3ATestUtils;
-import org.junit.Before;
-
-/**
- * S3A implementation of FileContextUtilBase
- */
-public class TestS3AFileContextUtil extends FileContextUtilBase {
-
-  @Before
-  public void setUp() throws IOException, Exception {
-    Configuration conf = new Configuration();
-    fc = S3ATestUtils.createTestFileContext(conf);
-    super.setUp();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteFilesOneByOne.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteFilesOneByOne.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteFilesOneByOne.java
new file mode 100644
index 0000000..a375664
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteFilesOneByOne.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Tests file deletion with multi-delete disabled.
+ */
+public class ITestS3ADeleteFilesOneByOne extends ITestS3ADeleteManyFiles {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration configuration = super.createConfiguration();
+    configuration.setBoolean(Constants.ENABLE_MULTI_DELETE, false);
+    return configuration;
+  }
+
+  @Override
+  @Test
+  public void testOpenCreate() throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
new file mode 100644
index 0000000..208c491
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Test some scalable operations related to file renaming and deletion.
+ */
+public class ITestS3ADeleteManyFiles extends S3AScaleTestBase {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3ADeleteManyFiles.class);
+
+  /**
+   * CAUTION: If this test starts failing, please make sure that the
+   * {@link org.apache.hadoop.fs.s3a.Constants#MAX_THREADS} configuration is not
+   * set too low. Alternatively, consider reducing the
+   * <code>scale.test.operation.count</code> parameter in
+   * <code>getOperationCount()</code>.
+   *
+   * @see #getOperationCount()
+   */
+  @Test
+  public void testBulkRenameAndDelete() throws Throwable {
+    final Path scaleTestDir = getTestPath();
+    final Path srcDir = new Path(scaleTestDir, "src");
+    final Path finalDir = new Path(scaleTestDir, "final");
+    final long count = getOperationCount();
+    ContractTestUtils.rm(fs, scaleTestDir, true, false);
+
+    fs.mkdirs(srcDir);
+    fs.mkdirs(finalDir);
+
+    int testBufferSize = fs.getConf()
+        .getInt(ContractTestUtils.IO_CHUNK_BUFFER_SIZE,
+            ContractTestUtils.DEFAULT_IO_CHUNK_BUFFER_SIZE);
+    // use Executor to speed up file creation
+    ExecutorService exec = Executors.newFixedThreadPool(16);
+    final ExecutorCompletionService<Boolean> completionService =
+        new ExecutorCompletionService<>(exec);
+    try {
+      final byte[] data = ContractTestUtils.dataset(testBufferSize, 'a', 'z');
+
+      for (int i = 0; i < count; ++i) {
+        final String fileName = "foo-" + i;
+        completionService.submit(new Callable<Boolean>() {
+          @Override
+          public Boolean call() throws IOException {
+            ContractTestUtils.createFile(fs, new Path(srcDir, fileName),
+                false, data);
+            return fs.exists(new Path(srcDir, fileName));
+          }
+        });
+      }
+      for (int i = 0; i < count; ++i) {
+        final Future<Boolean> future = completionService.take();
+        try {
+          if (!future.get()) {
+            LOG.warn("cannot create file");
+          }
+        } catch (ExecutionException e) {
+          LOG.warn("Error while uploading file", e.getCause());
+          throw e;
+        }
+      }
+    } finally {
+      exec.shutdown();
+    }
+
+    int nSrcFiles = fs.listStatus(srcDir).length;
+    fs.rename(srcDir, finalDir);
+    assertEquals(nSrcFiles, fs.listStatus(finalDir).length);
+    ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
+        new Path(srcDir, "foo-" + 0));
+    ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
+        new Path(srcDir, "foo-" + count / 2));
+    ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
+        new Path(srcDir, "foo-" + (count - 1)));
+    ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
+        new Path(finalDir, "foo-" + 0));
+    ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
+        new Path(finalDir, "foo-" + count/2));
+    ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
+        new Path(finalDir, "foo-" + (count-1)));
+
+    ContractTestUtils.assertDeleted(fs, finalDir, true, false);
+  }
+
+  @Test
+  public void testOpenCreate() throws IOException {
+    Path dir = new Path("/tests3a");
+    ContractTestUtils.createAndVerifyFile(fs, dir, 1024);
+    ContractTestUtils.createAndVerifyFile(fs, dir, 5 * 1024 * 1024);
+    ContractTestUtils.createAndVerifyFile(fs, dir, 20 * 1024 * 1024);
+
+
+    /*
+    Enable to test the multipart upload
+    try {
+      ContractTestUtils.createAndVerifyFile(fs, dir,
+          (long)6 * 1024 * 1024 * 1024);
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
+    */
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
new file mode 100644
index 0000000..b5f4eb3
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+
+/**
+ * Test the performance of listing files/directories.
+ */
+public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ITestS3ADirectoryPerformance.class);
+
+  @Test
+  public void testListOperations() throws Throwable {
+    describe("Test recursive list operations");
+    final Path scaleTestDir = getTestPath();
+    final Path listDir = new Path(scaleTestDir, "lists");
+
+    // scale factor.
+    int scale = getConf().getInt(KEY_DIRECTORY_COUNT, DEFAULT_DIRECTORY_COUNT);
+    int width = scale;
+    int depth = scale;
+    int files = scale;
+    MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
+    MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS);
+    MetricDiff listContinueRequests =
+        new MetricDiff(fs, OBJECT_CONTINUE_LIST_REQUESTS);
+    MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES);
+    MetricDiff getFileStatusCalls =
+        new MetricDiff(fs, INVOCATION_GET_FILE_STATUS);
+    NanoTimer createTimer = new NanoTimer();
+    TreeScanResults created =
+        createSubdirs(fs, listDir, depth, width, files, 0);
+    // add some empty directories
+    int emptyDepth = 1 * scale;
+    int emptyWidth = 3 * scale;
+
+    created.add(createSubdirs(fs, listDir, emptyDepth, emptyWidth, 0,
+        0, "empty", "f-", ""));
+    createTimer.end("Time to create %s", created);
+    LOG.info("Time per operation: {}",
+        toHuman(createTimer.nanosPerOperation(created.totalCount())));
+    printThenReset(LOG,
+        metadataRequests,
+        listRequests,
+        listContinueRequests,
+        listStatusCalls,
+        getFileStatusCalls);
+
+    describe("Listing files via treewalk");
+    try {
+      // Scan the directory via an explicit tree walk.
+      // This is the baseline for any listing speedups.
+      NanoTimer treeWalkTimer = new NanoTimer();
+      TreeScanResults treewalkResults = treeWalk(fs, listDir);
+      treeWalkTimer.end("List status via treewalk of %s", created);
+
+      printThenReset(LOG,
+          metadataRequests,
+          listRequests,
+          listContinueRequests,
+          listStatusCalls,
+          getFileStatusCalls);
+      assertEquals("Files found in listFiles(recursive=true) " +
+              " created=" + created + " listed=" + treewalkResults,
+          created.getFileCount(), treewalkResults.getFileCount());
+
+      describe("Listing files via listFiles(recursive=true)");
+      // listFiles() does the recursion internally
+      NanoTimer listFilesRecursiveTimer = new NanoTimer();
+
+      TreeScanResults listFilesResults = new TreeScanResults(
+          fs.listFiles(listDir, true));
+
+      listFilesRecursiveTimer.end("listFiles(recursive=true) of %s", created);
+      assertEquals("Files found in listFiles(recursive=true) " +
+          " created=" + created  + " listed=" + listFilesResults,
+          created.getFileCount(), listFilesResults.getFileCount());
+
+      // only two list operations should have taken place
+      print(LOG,
+          metadataRequests,
+          listRequests,
+          listContinueRequests,
+          listStatusCalls,
+          getFileStatusCalls);
+      assertEquals(listRequests.toString(), 2, listRequests.diff());
+      reset(metadataRequests,
+          listRequests,
+          listContinueRequests,
+          listStatusCalls,
+          getFileStatusCalls);
+
+
+    } finally {
+      describe("deletion");
+      // deletion at the end of the run
+      NanoTimer deleteTimer = new NanoTimer();
+      fs.delete(listDir, true);
+      deleteTimer.end("Deleting directory tree");
+      printThenReset(LOG,
+          metadataRequests,
+          listRequests,
+          listContinueRequests,
+          listStatusCalls,
+          getFileStatusCalls);
+    }
+  }
+
+  @Test
+  public void testTimeToStatEmptyDirectory() throws Throwable {
+    describe("Time to stat an empty directory");
+    Path path = new Path(getTestPath(), "empty");
+    fs.mkdirs(path);
+    timeToStatPath(path);
+  }
+
+  @Test
+  public void testTimeToStatNonEmptyDirectory() throws Throwable {
+    describe("Time to stat a non-empty directory");
+    Path path = new Path(getTestPath(), "dir");
+    fs.mkdirs(path);
+    touch(fs, new Path(path, "file"));
+    timeToStatPath(path);
+  }
+
+  @Test
+  public void testTimeToStatFile() throws Throwable {
+    describe("Time to stat a simple file");
+    Path path = new Path(getTestPath(), "file");
+    touch(fs, path);
+    timeToStatPath(path);
+  }
+
+  @Test
+  public void testTimeToStatRoot() throws Throwable {
+    describe("Time to stat the root path");
+    timeToStatPath(new Path("/"));
+  }
+
+  private void timeToStatPath(Path path) throws IOException {
+    describe("Timing getFileStatus(\"%s\")", path);
+    MetricDiff metadataRequests =
+        new MetricDiff(fs, Statistic.OBJECT_METADATA_REQUESTS);
+    MetricDiff listRequests =
+        new MetricDiff(fs, Statistic.OBJECT_LIST_REQUESTS);
+    long attempts = getOperationCount();
+    NanoTimer timer = new NanoTimer();
+    for (long l = 0; l < attempts; l++) {
+      fs.getFileStatus(path);
+    }
+    timer.end("Time to execute %d getFileStatusCalls", attempts);
+    LOG.info("Time per call: {}", toHuman(timer.nanosPerOperation(attempts)));
+    LOG.info("metadata: {}", metadataRequests);
+    LOG.info("metadata per operation {}", metadataRequests.diff() / attempts);
+    LOG.info("listObjects: {}", listRequests);
+    LOG.info("listObjects: per operation {}", listRequests.diff() / attempts);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
new file mode 100644
index 0000000..e2163c5
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.util.LineReader;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Look at the performance of S3a operations.
+ */
+public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ITestS3AInputStreamPerformance.class);
+
+  private S3AFileSystem s3aFS;
+  private Path testData;
+  private S3AFileStatus testDataStatus;
+  private FSDataInputStream in;
+  private S3AInstrumentation.InputStreamStatistics streamStatistics;
+  public static final int BLOCK_SIZE = 32 * 1024;
+  public static final int BIG_BLOCK_SIZE = 256 * 1024;
+
+  /** Tests only run if the there is a named test file that can be read. */
+  private boolean testDataAvailable = true;
+  private String assumptionMessage = "test file";
+
+  /**
+   * Open the FS and the test data. The input stream is always set up here.
+   * @throws IOException IO Problems.
+   */
+  @Before
+  public void openFS() throws IOException {
+    Configuration conf = getConf();
+    conf.setInt(SOCKET_SEND_BUFFER, 16 * 1024);
+    conf.setInt(SOCKET_RECV_BUFFER, 16 * 1024);
+    String testFile =  conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE);
+    if (testFile.isEmpty()) {
+      assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE;
+      testDataAvailable = false;
+    } else {
+      S3ATestUtils.useCSVDataEndpoint(conf);
+      testData = new Path(testFile);
+      Path path = this.testData;
+      bindS3aFS(path);
+      try {
+        testDataStatus = s3aFS.getFileStatus(this.testData);
+      } catch (IOException e) {
+        LOG.warn("Failed to read file {} specified in {}",
+            testFile, KEY_CSVTEST_FILE, e);
+        throw e;
+      }
+    }
+  }
+
+  private void bindS3aFS(Path path) throws IOException {
+    s3aFS = (S3AFileSystem) FileSystem.newInstance(path.toUri(), getConf());
+  }
+
+  /**
+   * Cleanup: close the stream, close the FS.
+   */
+  @After
+  public void cleanup() {
+    describe("cleanup");
+    IOUtils.closeStream(in);
+    IOUtils.closeStream(s3aFS);
+  }
+
+  /**
+   * Declare that the test requires the CSV test dataset.
+   */
+  private void requireCSVTestData() {
+    Assume.assumeTrue(assumptionMessage, testDataAvailable);
+  }
+
+  /**
+   * Open the test file with the read buffer specified in the setting.
+   * {@link #KEY_READ_BUFFER_SIZE}; use the {@code Normal} policy
+   * @return the stream, wrapping an S3a one
+   * @throws IOException IO problems
+   */
+  FSDataInputStream openTestFile() throws IOException {
+    return openTestFile(S3AInputPolicy.Normal, 0);
+  }
+
+  /**
+   * Open the test file with the read buffer specified in the setting
+   * {@link #KEY_READ_BUFFER_SIZE}.
+   * This includes the {@link #requireCSVTestData()} assumption; so
+   * if called before any FS op, will automatically skip the test
+   * if the CSV file is absent.
+   *
+   * @param inputPolicy input policy to use
+   * @param readahead readahead/buffer size
+   * @return the stream, wrapping an S3a one
+   * @throws IOException IO problems
+   */
+  FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead)
+      throws IOException {
+    requireCSVTestData();
+    return openDataFile(s3aFS, this.testData, inputPolicy, readahead);
+  }
+
+  /**
+   * Open a test file with the read buffer specified in the setting
+   * {@link #KEY_READ_BUFFER_SIZE}.
+   *
+   * @param path path to open
+   * @param inputPolicy input policy to use
+   * @param readahead readahead/buffer size
+   * @return the stream, wrapping an S3a one
+   * @throws IOException IO problems
+   */
+  private FSDataInputStream openDataFile(S3AFileSystem fs,
+      Path path,
+      S3AInputPolicy inputPolicy,
+      long readahead) throws IOException {
+    int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE,
+        DEFAULT_READ_BUFFER_SIZE);
+    S3AInputPolicy policy = fs.getInputPolicy();
+    fs.setInputPolicy(inputPolicy);
+    try {
+      FSDataInputStream stream = fs.open(path, bufferSize);
+      if (readahead >= 0) {
+        stream.setReadahead(readahead);
+      }
+      streamStatistics = getInputStreamStatistics(stream);
+      return stream;
+    } finally {
+      fs.setInputPolicy(policy);
+    }
+  }
+
+  /**
+   * Assert that the stream was only ever opened once.
+   */
+  protected void assertStreamOpenedExactlyOnce() {
+    assertOpenOperationCount(1);
+  }
+
+  /**
+   * Make an assertion count about the number of open operations.
+   * @param expected the expected number
+   */
+  private void assertOpenOperationCount(long expected) {
+    assertEquals("open operations in\n" + in,
+        expected, streamStatistics.openOperations);
+  }
+
+  /**
+   * Log how long an IOP took, by dividing the total time by the
+   * count of operations, printing in a human-readable form.
+   * @param operation operation being measured
+   * @param timer timing data
+   * @param count IOP count.
+   */
+  protected void logTimePerIOP(String operation,
+      NanoTimer timer,
+      long count) {
+    LOG.info("Time per {}: {} nS",
+        operation, toHuman(timer.duration() / count));
+  }
+
+  @Test
+  public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
+    requireCSVTestData();
+    int blockSize = _1MB;
+    describe("Open the test file %s and read it in blocks of size %d",
+        testData, blockSize);
+    long len = testDataStatus.getLen();
+    in = openTestFile();
+    byte[] block = new byte[blockSize];
+    NanoTimer timer2 = new NanoTimer();
+    long count = 0;
+    // implicitly rounding down here
+    long blockCount = len / blockSize;
+    for (long i = 0; i < blockCount; i++) {
+      int offset = 0;
+      int remaining = blockSize;
+      NanoTimer blockTimer = new NanoTimer();
+      int reads = 0;
+      while (remaining > 0) {
+        int bytesRead = in.read(block, offset, remaining);
+        reads++;
+        if (bytesRead == 1) {
+          break;
+        }
+        remaining -= bytesRead;
+        offset += bytesRead;
+        count += bytesRead;
+      }
+      blockTimer.end("Reading block %d in %d reads", i, reads);
+    }
+    timer2.end("Time to read %d bytes in %d blocks", len, blockCount);
+    bandwidth(timer2, count);
+    logStreamStatistics();
+  }
+
+  @Test
+  public void testLazySeekEnabled() throws Throwable {
+    describe("Verify that seeks do not trigger any IO");
+    in = openTestFile();
+    long len = testDataStatus.getLen();
+    NanoTimer timer = new NanoTimer();
+    long blockCount = len / BLOCK_SIZE;
+    for (long i = 0; i < blockCount; i++) {
+      in.seek(in.getPos() + BLOCK_SIZE - 1);
+    }
+    in.seek(0);
+    blockCount++;
+    timer.end("Time to execute %d seeks", blockCount);
+    logTimePerIOP("seek()", timer, blockCount);
+    logStreamStatistics();
+    assertOpenOperationCount(0);
+    assertEquals("bytes read", 0, streamStatistics.bytesRead);
+  }
+
+  @Test
+  public void testReadaheadOutOfRange() throws Throwable {
+    try {
+      in = openTestFile();
+      in.setReadahead(-1L);
+      fail("Stream should have rejected the request "+ in);
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testReadWithNormalPolicy() throws Throwable {
+    describe("Read big blocks with a big readahead");
+    executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2,
+        S3AInputPolicy.Normal);
+    assertStreamOpenedExactlyOnce();
+  }
+
+  @Test
+  public void testDecompressionSequential128K() throws Throwable {
+    describe("Decompress with a 128K readahead");
+    executeDecompression(128 * 1024, S3AInputPolicy.Sequential);
+    assertStreamOpenedExactlyOnce();
+  }
+
+  /**
+   * Execute a decompression + line read with the given input policy.
+   * @param readahead byte readahead
+   * @param inputPolicy read policy
+   * @throws IOException IO Problems
+   */
+  private void executeDecompression(long readahead,
+      S3AInputPolicy inputPolicy) throws IOException {
+    CompressionCodecFactory factory
+        = new CompressionCodecFactory(getConf());
+    CompressionCodec codec = factory.getCodec(testData);
+    long bytesRead = 0;
+    int lines = 0;
+
+    FSDataInputStream objectIn = openTestFile(inputPolicy, readahead);
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    try (LineReader lineReader = new LineReader(
+        codec.createInputStream(objectIn), getConf())) {
+      Text line = new Text();
+      int read;
+      while ((read = lineReader.readLine(line)) > 0) {
+        bytesRead += read;
+        lines++;
+      }
+    } catch (EOFException eof) {
+      // done
+    }
+    timer.end("Time to read %d lines [%d bytes expanded, %d raw]" +
+        " with readahead = %d",
+        lines,
+        bytesRead,
+        testDataStatus.getLen(),
+        readahead);
+    logTimePerIOP("line read", timer, lines);
+    logStreamStatistics();
+  }
+
+  private void logStreamStatistics() {
+    LOG.info(String.format("Stream Statistics%n{}"), streamStatistics);
+  }
+
+  /**
+   * Execute a seek+read sequence.
+   * @param blockSize block size for seeks
+   * @param readahead what the readahead value of the stream should be
+   * @throws IOException IO problems
+   */
+  protected void executeSeekReadSequence(long blockSize,
+      long readahead,
+      S3AInputPolicy policy) throws IOException {
+    in = openTestFile(policy, readahead);
+    long len = testDataStatus.getLen();
+    NanoTimer timer = new NanoTimer();
+    long blockCount = len / blockSize;
+    LOG.info("Reading {} blocks, readahead = {}",
+        blockCount, readahead);
+    for (long i = 0; i < blockCount; i++) {
+      in.seek(in.getPos() + blockSize - 1);
+      // this is the read
+      assertTrue(in.read() >= 0);
+    }
+    timer.end("Time to execute %d seeks of distance %d with readahead = %d",
+        blockCount,
+        blockSize,
+        readahead);
+    logTimePerIOP("seek(pos + " + blockCount+"); read()", timer, blockCount);
+    LOG.info("Effective bandwidth {} MB/S",
+        timer.bandwidthDescription(streamStatistics.bytesRead -
+            streamStatistics.bytesSkippedOnSeek));
+    logStreamStatistics();
+  }
+
+  public static final int _4K = 4 * 1024;
+  public static final int _8K = 8 * 1024;
+  public static final int _16K = 16 * 1024;
+  public static final int _32K = 32 * 1024;
+  public static final int _64K = 64 * 1024;
+  public static final int _128K = 128 * 1024;
+  public static final int _256K = 256 * 1024;
+  public static final int _1MB = 1024 * 1024;
+  public static final int _2MB = 2 * _1MB;
+  public static final int _10MB = _1MB * 10;
+  public static final int _5MB = _1MB * 5;
+
+  private static final int[][] RANDOM_IO_SEQUENCE = {
+      {_2MB, _128K},
+      {_128K, _128K},
+      {_5MB, _64K},
+      {_1MB, _1MB},
+  };
+
+  @Test
+  public void testRandomIORandomPolicy() throws Throwable {
+    executeRandomIO(S3AInputPolicy.Random, (long) RANDOM_IO_SEQUENCE.length);
+    assertEquals("streams aborted in " + streamStatistics,
+        0, streamStatistics.aborted);
+  }
+
+  @Test
+  public void testRandomIONormalPolicy() throws Throwable {
+    long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
+    executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
+    assertEquals("streams aborted in " + streamStatistics,
+        4, streamStatistics.aborted);
+  }
+
+  /**
+   * Execute the random IO {@code readFully(pos, bytes[])} sequence defined by
+   * {@link #RANDOM_IO_SEQUENCE}. The stream is closed afterwards; that's used
+   * in the timing too
+   * @param policy read policy
+   * @param expectedOpenCount expected number of stream openings
+   * @throws IOException IO problems
+   * @return the timer
+   */
+  private ContractTestUtils.NanoTimer executeRandomIO(S3AInputPolicy policy,
+      long expectedOpenCount)
+      throws IOException {
+    describe("Random IO with policy \"%s\"", policy);
+    byte[] buffer = new byte[_1MB];
+    long totalBytesRead = 0;
+
+    in = openTestFile(policy, 0);
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    for (int[] action : RANDOM_IO_SEQUENCE) {
+      int position = action[0];
+      int range = action[1];
+      in.readFully(position, buffer, 0, range);
+      totalBytesRead += range;
+    }
+    int reads = RANDOM_IO_SEQUENCE.length;
+    timer.end("Time to execute %d reads of total size %d bytes",
+        reads,
+        totalBytesRead);
+    in.close();
+    assertOpenOperationCount(expectedOpenCount);
+    logTimePerIOP("byte read", timer, totalBytesRead);
+    LOG.info("Effective bandwidth {} MB/S",
+        timer.bandwidthDescription(streamStatistics.bytesRead -
+            streamStatistics.bytesSkippedOnSeek));
+    logStreamStatistics();
+    return timer;
+  }
+
+  S3AInputStream getS3aStream() {
+    return (S3AInputStream) in.getWrappedStream();
+  }
+
+  @Test
+  public void testRandomReadOverBuffer() throws Throwable {
+    describe("read over a buffer, making sure that the requests" +
+        " spans readahead ranges");
+    int datasetLen = _32K;
+    Path dataFile = new Path(getTestPath(), "testReadOverBuffer.bin");
+    byte[] sourceData = dataset(datasetLen, 0, 64);
+    // relies on the field 'fs' referring to the R/W FS
+    writeDataset(fs, dataFile, sourceData, datasetLen, _16K, true);
+    byte[] buffer = new byte[datasetLen];
+    int readahead = _8K;
+    int halfReadahead = _4K;
+    in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead);
+
+    LOG.info("Starting initial reads");
+    S3AInputStream s3aStream = getS3aStream();
+    assertEquals(readahead, s3aStream.getReadahead());
+    byte[] oneByte = new byte[1];
+    assertEquals(1, in.read(0, oneByte, 0, 1));
+    // make some assertions about the current state
+    assertEquals("remaining in\n" + in,
+        readahead - 1, s3aStream.remainingInCurrentRequest());
+    assertEquals("range start in\n" + in,
+        0, s3aStream.getContentRangeStart());
+    assertEquals("range finish in\n" + in,
+        readahead, s3aStream.getContentRangeFinish());
+
+    assertStreamOpenedExactlyOnce();
+
+    describe("Starting sequence of positioned read calls over\n%s", in);
+    NanoTimer readTimer = new NanoTimer();
+    int currentPos = halfReadahead;
+    int offset = currentPos;
+    int bytesRead = 0;
+    int readOps = 0;
+
+    // make multiple read() calls
+    while (bytesRead < halfReadahead) {
+      int length = buffer.length - offset;
+      int read = in.read(currentPos, buffer, offset, length);
+      bytesRead += read;
+      offset += read;
+      readOps++;
+      assertEquals("open operations on request #" + readOps
+              + " after reading " + bytesRead
+              + " current position in stream " + currentPos
+              + " in\n" + fs
+              + "\n " + in,
+          1, streamStatistics.openOperations);
+      for (int i = currentPos; i < currentPos + read; i++) {
+        assertEquals("Wrong value from byte " + i,
+            sourceData[i], buffer[i]);
+      }
+      currentPos += read;
+    }
+    assertStreamOpenedExactlyOnce();
+    // assert at the end of the original block
+    assertEquals(readahead, currentPos);
+    readTimer.end("read %d in %d operations", bytesRead, readOps);
+    bandwidth(readTimer, bytesRead);
+    LOG.info("Time per byte(): {} nS",
+        toHuman(readTimer.nanosPerOperation(bytesRead)));
+    LOG.info("Time per read(): {} nS",
+        toHuman(readTimer.nanosPerOperation(readOps)));
+
+    describe("read last byte");
+    // read one more
+    int read = in.read(currentPos, buffer, bytesRead, 1);
+    assertTrue("-1 from last read", read >= 0);
+    assertOpenOperationCount(2);
+    assertEquals("Wrong value from read ", sourceData[currentPos],
+        (int) buffer[currentPos]);
+    currentPos++;
+
+
+    // now scan all the way to the end of the file, using single byte read()
+    // calls
+    describe("read() to EOF over \n%s", in);
+    long readCount = 0;
+    NanoTimer timer = new NanoTimer();
+    LOG.info("seeking");
+    in.seek(currentPos);
+    LOG.info("reading");
+    while(currentPos < datasetLen) {
+      int r = in.read();
+      assertTrue("Negative read() at position " + currentPos + " in\n" + in,
+          r >= 0);
+      buffer[currentPos] = (byte)r;
+      assertEquals("Wrong value from read from\n" + in,
+          sourceData[currentPos], r);
+      currentPos++;
+      readCount++;
+    }
+    timer.end("read %d bytes", readCount);
+    bandwidth(timer, readCount);
+    LOG.info("Time per read(): {} nS",
+        toHuman(timer.nanosPerOperation(readCount)));
+
+    assertEquals("last read in " + in, -1, in.read());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java
deleted file mode 100644
index 5e07dcb..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.scale;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Test some scalable operations related to file renaming and deletion.
- */
-public class TestS3ADeleteManyFiles extends S3AScaleTestBase {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestS3ADeleteManyFiles.class);
-
-  /**
-   * CAUTION: If this test starts failing, please make sure that the
-   * {@link org.apache.hadoop.fs.s3a.Constants#MAX_THREADS} configuration is not
-   * set too low. Alternatively, consider reducing the
-   * <code>scale.test.operation.count</code> parameter in
-   * <code>getOperationCount()</code>.
-   *
-   * @see #getOperationCount()
-   */
-  @Test
-  public void testBulkRenameAndDelete() throws Throwable {
-    final Path scaleTestDir = getTestPath();
-    final Path srcDir = new Path(scaleTestDir, "src");
-    final Path finalDir = new Path(scaleTestDir, "final");
-    final long count = getOperationCount();
-    ContractTestUtils.rm(fs, scaleTestDir, true, false);
-
-    fs.mkdirs(srcDir);
-    fs.mkdirs(finalDir);
-
-    int testBufferSize = fs.getConf()
-        .getInt(ContractTestUtils.IO_CHUNK_BUFFER_SIZE,
-            ContractTestUtils.DEFAULT_IO_CHUNK_BUFFER_SIZE);
-    // use Executor to speed up file creation
-    ExecutorService exec = Executors.newFixedThreadPool(16);
-    final ExecutorCompletionService<Boolean> completionService =
-        new ExecutorCompletionService<>(exec);
-    try {
-      final byte[] data = ContractTestUtils.dataset(testBufferSize, 'a', 'z');
-
-      for (int i = 0; i < count; ++i) {
-        final String fileName = "foo-" + i;
-        completionService.submit(new Callable<Boolean>() {
-          @Override
-          public Boolean call() throws IOException {
-            ContractTestUtils.createFile(fs, new Path(srcDir, fileName),
-                false, data);
-            return fs.exists(new Path(srcDir, fileName));
-          }
-        });
-      }
-      for (int i = 0; i < count; ++i) {
-        final Future<Boolean> future = completionService.take();
-        try {
-          if (!future.get()) {
-            LOG.warn("cannot create file");
-          }
-        } catch (ExecutionException e) {
-          LOG.warn("Error while uploading file", e.getCause());
-          throw e;
-        }
-      }
-    } finally {
-      exec.shutdown();
-    }
-
-    int nSrcFiles = fs.listStatus(srcDir).length;
-    fs.rename(srcDir, finalDir);
-    assertEquals(nSrcFiles, fs.listStatus(finalDir).length);
-    ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
-        new Path(srcDir, "foo-" + 0));
-    ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
-        new Path(srcDir, "foo-" + count / 2));
-    ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
-        new Path(srcDir, "foo-" + (count - 1)));
-    ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
-        new Path(finalDir, "foo-" + 0));
-    ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
-        new Path(finalDir, "foo-" + count/2));
-    ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
-        new Path(finalDir, "foo-" + (count-1)));
-
-    ContractTestUtils.assertDeleted(fs, finalDir, true, false);
-  }
-
-  @Test
-  public void testOpenCreate() throws IOException {
-    Path dir = new Path("/tests3a");
-    ContractTestUtils.createAndVerifyFile(fs, dir, 1024);
-    ContractTestUtils.createAndVerifyFile(fs, dir, 5 * 1024 * 1024);
-    ContractTestUtils.createAndVerifyFile(fs, dir, 20 * 1024 * 1024);
-
-
-    /*
-    Enable to test the multipart upload
-    try {
-      ContractTestUtils.createAndVerifyFile(fs, dir,
-          (long)6 * 1024 * 1024 * 1024);
-    } catch (IOException e) {
-      fail(e.getMessage());
-    }
-    */
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java
deleted file mode 100644
index 35ea3ad..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.scale;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.Statistic;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import static org.apache.hadoop.fs.s3a.Statistic.*;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
-
-/**
- * Test the performance of listing files/directories.
- */
-public class TestS3ADirectoryPerformance extends S3AScaleTestBase {
-  private static final Logger LOG = LoggerFactory.getLogger(
-      TestS3ADirectoryPerformance.class);
-
-  @Test
-  public void testListOperations() throws Throwable {
-    describe("Test recursive list operations");
-    final Path scaleTestDir = getTestPath();
-    final Path listDir = new Path(scaleTestDir, "lists");
-
-    // scale factor.
-    int scale = getConf().getInt(KEY_DIRECTORY_COUNT, DEFAULT_DIRECTORY_COUNT);
-    int width = scale;
-    int depth = scale;
-    int files = scale;
-    MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
-    MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS);
-    MetricDiff listContinueRequests =
-        new MetricDiff(fs, OBJECT_CONTINUE_LIST_REQUESTS);
-    MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES);
-    MetricDiff getFileStatusCalls =
-        new MetricDiff(fs, INVOCATION_GET_FILE_STATUS);
-    NanoTimer createTimer = new NanoTimer();
-    TreeScanResults created =
-        createSubdirs(fs, listDir, depth, width, files, 0);
-    // add some empty directories
-    int emptyDepth = 1 * scale;
-    int emptyWidth = 3 * scale;
-
-    created.add(createSubdirs(fs, listDir, emptyDepth, emptyWidth, 0,
-        0, "empty", "f-", ""));
-    createTimer.end("Time to create %s", created);
-    LOG.info("Time per operation: {}",
-        toHuman(createTimer.nanosPerOperation(created.totalCount())));
-    printThenReset(LOG,
-        metadataRequests,
-        listRequests,
-        listContinueRequests,
-        listStatusCalls,
-        getFileStatusCalls);
-
-    describe("Listing files via treewalk");
-    try {
-      // Scan the directory via an explicit tree walk.
-      // This is the baseline for any listing speedups.
-      NanoTimer treeWalkTimer = new NanoTimer();
-      TreeScanResults treewalkResults = treeWalk(fs, listDir);
-      treeWalkTimer.end("List status via treewalk of %s", created);
-
-      printThenReset(LOG,
-          metadataRequests,
-          listRequests,
-          listContinueRequests,
-          listStatusCalls,
-          getFileStatusCalls);
-      assertEquals("Files found in listFiles(recursive=true) " +
-              " created=" + created + " listed=" + treewalkResults,
-          created.getFileCount(), treewalkResults.getFileCount());
-
-      describe("Listing files via listFiles(recursive=true)");
-      // listFiles() does the recursion internally
-      NanoTimer listFilesRecursiveTimer = new NanoTimer();
-
-      TreeScanResults listFilesResults = new TreeScanResults(
-          fs.listFiles(listDir, true));
-
-      listFilesRecursiveTimer.end("listFiles(recursive=true) of %s", created);
-      assertEquals("Files found in listFiles(recursive=true) " +
-          " created=" + created  + " listed=" + listFilesResults,
-          created.getFileCount(), listFilesResults.getFileCount());
-
-      // only two list operations should have taken place
-      print(LOG,
-          metadataRequests,
-          listRequests,
-          listContinueRequests,
-          listStatusCalls,
-          getFileStatusCalls);
-      assertEquals(listRequests.toString(), 2, listRequests.diff());
-      reset(metadataRequests,
-          listRequests,
-          listContinueRequests,
-          listStatusCalls,
-          getFileStatusCalls);
-
-
-    } finally {
-      describe("deletion");
-      // deletion at the end of the run
-      NanoTimer deleteTimer = new NanoTimer();
-      fs.delete(listDir, true);
-      deleteTimer.end("Deleting directory tree");
-      printThenReset(LOG,
-          metadataRequests,
-          listRequests,
-          listContinueRequests,
-          listStatusCalls,
-          getFileStatusCalls);
-    }
-  }
-
-  @Test
-  public void testTimeToStatEmptyDirectory() throws Throwable {
-    describe("Time to stat an empty directory");
-    Path path = new Path(getTestPath(), "empty");
-    fs.mkdirs(path);
-    timeToStatPath(path);
-  }
-
-  @Test
-  public void testTimeToStatNonEmptyDirectory() throws Throwable {
-    describe("Time to stat a non-empty directory");
-    Path path = new Path(getTestPath(), "dir");
-    fs.mkdirs(path);
-    touch(fs, new Path(path, "file"));
-    timeToStatPath(path);
-  }
-
-  @Test
-  public void testTimeToStatFile() throws Throwable {
-    describe("Time to stat a simple file");
-    Path path = new Path(getTestPath(), "file");
-    touch(fs, path);
-    timeToStatPath(path);
-  }
-
-  @Test
-  public void testTimeToStatRoot() throws Throwable {
-    describe("Time to stat the root path");
-    timeToStatPath(new Path("/"));
-  }
-
-  private void timeToStatPath(Path path) throws IOException {
-    describe("Timing getFileStatus(\"%s\")", path);
-    MetricDiff metadataRequests =
-        new MetricDiff(fs, Statistic.OBJECT_METADATA_REQUESTS);
-    MetricDiff listRequests =
-        new MetricDiff(fs, Statistic.OBJECT_LIST_REQUESTS);
-    long attempts = getOperationCount();
-    NanoTimer timer = new NanoTimer();
-    for (long l = 0; l < attempts; l++) {
-      fs.getFileStatus(path);
-    }
-    timer.end("Time to execute %d getFileStatusCalls", attempts);
-    LOG.info("Time per call: {}", toHuman(timer.nanosPerOperation(attempts)));
-    LOG.info("metadata: {}", metadataRequests);
-    LOG.info("metadata per operation {}", metadataRequests.diff() / attempts);
-    LOG.info("listObjects: {}", listRequests);
-    LOG.info("listObjects: per operation {}", listRequests.diff() / attempts);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
deleted file mode 100644
index d6d9d66..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.scale;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.apache.hadoop.fs.s3a.S3AFileStatus;
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.fs.s3a.S3AInputPolicy;
-import org.apache.hadoop.fs.s3a.S3AInputStream;
-import org.apache.hadoop.fs.s3a.S3AInstrumentation;
-import org.apache.hadoop.fs.s3a.S3ATestUtils;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.util.LineReader;
-import org.junit.After;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
-import static org.apache.hadoop.fs.s3a.Constants.*;
-
-/**
- * Look at the performance of S3a operations.
- */
-public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
-  private static final Logger LOG = LoggerFactory.getLogger(
-      TestS3AInputStreamPerformance.class);
-
-  private S3AFileSystem s3aFS;
-  private Path testData;
-  private S3AFileStatus testDataStatus;
-  private FSDataInputStream in;
-  private S3AInstrumentation.InputStreamStatistics streamStatistics;
-  public static final int BLOCK_SIZE = 32 * 1024;
-  public static final int BIG_BLOCK_SIZE = 256 * 1024;
-
-  /** Tests only run if the there is a named test file that can be read. */
-  private boolean testDataAvailable = true;
-  private String assumptionMessage = "test file";
-
-  /**
-   * Open the FS and the test data. The input stream is always set up here.
-   * @throws IOException IO Problems.
-   */
-  @Before
-  public void openFS() throws IOException {
-    Configuration conf = getConf();
-    conf.setInt(SOCKET_SEND_BUFFER, 16 * 1024);
-    conf.setInt(SOCKET_RECV_BUFFER, 16 * 1024);
-    String testFile =  conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE);
-    if (testFile.isEmpty()) {
-      assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE;
-      testDataAvailable = false;
-    } else {
-      S3ATestUtils.useCSVDataEndpoint(conf);
-      testData = new Path(testFile);
-      Path path = this.testData;
-      bindS3aFS(path);
-      try {
-        testDataStatus = s3aFS.getFileStatus(this.testData);
-      } catch (IOException e) {
-        LOG.warn("Failed to read file {} specified in {}",
-            testFile, KEY_CSVTEST_FILE, e);
-        throw e;
-      }
-    }
-  }
-
-  private void bindS3aFS(Path path) throws IOException {
-    s3aFS = (S3AFileSystem) FileSystem.newInstance(path.toUri(), getConf());
-  }
-
-  /**
-   * Cleanup: close the stream, close the FS.
-   */
-  @After
-  public void cleanup() {
-    describe("cleanup");
-    IOUtils.closeStream(in);
-    IOUtils.closeStream(s3aFS);
-  }
-
-  /**
-   * Declare that the test requires the CSV test dataset.
-   */
-  private void requireCSVTestData() {
-    Assume.assumeTrue(assumptionMessage, testDataAvailable);
-  }
-
-  /**
-   * Open the test file with the read buffer specified in the setting.
-   * {@link #KEY_READ_BUFFER_SIZE}; use the {@code Normal} policy
-   * @return the stream, wrapping an S3a one
-   * @throws IOException IO problems
-   */
-  FSDataInputStream openTestFile() throws IOException {
-    return openTestFile(S3AInputPolicy.Normal, 0);
-  }
-
-  /**
-   * Open the test file with the read buffer specified in the setting
-   * {@link #KEY_READ_BUFFER_SIZE}.
-   * This includes the {@link #requireCSVTestData()} assumption; so
-   * if called before any FS op, will automatically skip the test
-   * if the CSV file is absent.
-   *
-   * @param inputPolicy input policy to use
-   * @param readahead readahead/buffer size
-   * @return the stream, wrapping an S3a one
-   * @throws IOException IO problems
-   */
-  FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead)
-      throws IOException {
-    requireCSVTestData();
-    return openDataFile(s3aFS, this.testData, inputPolicy, readahead);
-  }
-
-  /**
-   * Open a test file with the read buffer specified in the setting
-   * {@link #KEY_READ_BUFFER_SIZE}.
-   *
-   * @param path path to open
-   * @param inputPolicy input policy to use
-   * @param readahead readahead/buffer size
-   * @return the stream, wrapping an S3a one
-   * @throws IOException IO problems
-   */
-  private FSDataInputStream openDataFile(S3AFileSystem fs,
-      Path path,
-      S3AInputPolicy inputPolicy,
-      long readahead) throws IOException {
-    int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE,
-        DEFAULT_READ_BUFFER_SIZE);
-    S3AInputPolicy policy = fs.getInputPolicy();
-    fs.setInputPolicy(inputPolicy);
-    try {
-      FSDataInputStream stream = fs.open(path, bufferSize);
-      if (readahead >= 0) {
-        stream.setReadahead(readahead);
-      }
-      streamStatistics = getInputStreamStatistics(stream);
-      return stream;
-    } finally {
-      fs.setInputPolicy(policy);
-    }
-  }
-
-  /**
-   * Assert that the stream was only ever opened once.
-   */
-  protected void assertStreamOpenedExactlyOnce() {
-    assertOpenOperationCount(1);
-  }
-
-  /**
-   * Make an assertion count about the number of open operations.
-   * @param expected the expected number
-   */
-  private void assertOpenOperationCount(long expected) {
-    assertEquals("open operations in\n" + in,
-        expected, streamStatistics.openOperations);
-  }
-
-  /**
-   * Log how long an IOP took, by dividing the total time by the
-   * count of operations, printing in a human-readable form.
-   * @param operation operation being measured
-   * @param timer timing data
-   * @param count IOP count.
-   */
-  protected void logTimePerIOP(String operation,
-      NanoTimer timer,
-      long count) {
-    LOG.info("Time per {}: {} nS",
-        operation, toHuman(timer.duration() / count));
-  }
-
-  @Test
-  public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
-    requireCSVTestData();
-    int blockSize = _1MB;
-    describe("Open the test file %s and read it in blocks of size %d",
-        testData, blockSize);
-    long len = testDataStatus.getLen();
-    in = openTestFile();
-    byte[] block = new byte[blockSize];
-    NanoTimer timer2 = new NanoTimer();
-    long count = 0;
-    // implicitly rounding down here
-    long blockCount = len / blockSize;
-    for (long i = 0; i < blockCount; i++) {
-      int offset = 0;
-      int remaining = blockSize;
-      NanoTimer blockTimer = new NanoTimer();
-      int reads = 0;
-      while (remaining > 0) {
-        int bytesRead = in.read(block, offset, remaining);
-        reads ++;
-        if (bytesRead == 1) {
-          break;
-        }
-        remaining -= bytesRead;
-        offset += bytesRead;
-        count += bytesRead;
-      }
-      blockTimer.end("Reading block %d in %d reads", i, reads);
-    }
-    timer2.end("Time to read %d bytes in %d blocks", len, blockCount );
-    bandwidth(timer2, count);
-    logStreamStatistics();
-  }
-
-  @Test
-  public void testLazySeekEnabled() throws Throwable {
-    describe("Verify that seeks do not trigger any IO");
-    in = openTestFile();
-    long len = testDataStatus.getLen();
-    NanoTimer timer = new NanoTimer();
-    long blockCount = len / BLOCK_SIZE;
-    for (long i = 0; i < blockCount; i++) {
-      in.seek(in.getPos() + BLOCK_SIZE - 1);
-    }
-    in.seek(0);
-    blockCount++;
-    timer.end("Time to execute %d seeks", blockCount);
-    logTimePerIOP("seek()", timer, blockCount);
-    logStreamStatistics();
-    assertOpenOperationCount(0);
-    assertEquals("bytes read", 0, streamStatistics.bytesRead);
-  }
-
-  @Test
-  public void testReadaheadOutOfRange() throws Throwable {
-    try {
-      in = openTestFile();
-      in.setReadahead(-1L);
-      fail("Stream should have rejected the request "+ in);
-    } catch (IllegalArgumentException e) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testReadWithNormalPolicy() throws Throwable {
-    describe("Read big blocks with a big readahead");
-    executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2,
-        S3AInputPolicy.Normal);
-    assertStreamOpenedExactlyOnce();
-  }
-
-  @Test
-  public void testDecompressionSequential128K() throws Throwable {
-    describe("Decompress with a 128K readahead");
-    executeDecompression(128 * 1024, S3AInputPolicy.Sequential);
-    assertStreamOpenedExactlyOnce();
-  }
-
-  /**
-   * Execute a decompression + line read with the given input policy.
-   * @param readahead byte readahead
-   * @param inputPolicy read policy
-   * @throws IOException IO Problems
-   */
-  private void executeDecompression(long readahead,
-      S3AInputPolicy inputPolicy) throws IOException {
-    CompressionCodecFactory factory
-        = new CompressionCodecFactory(getConf());
-    CompressionCodec codec = factory.getCodec(testData);
-    long bytesRead = 0;
-    int lines = 0;
-
-    FSDataInputStream objectIn = openTestFile(inputPolicy, readahead);
-    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
-    try (LineReader lineReader = new LineReader(
-        codec.createInputStream(objectIn), getConf())) {
-      Text line = new Text();
-      int read;
-      while ((read = lineReader.readLine(line)) > 0) {
-        bytesRead += read;
-        lines++;
-      }
-    } catch (EOFException eof) {
-      // done
-    }
-    timer.end("Time to read %d lines [%d bytes expanded, %d raw]" +
-        " with readahead = %d",
-        lines,
-        bytesRead,
-        testDataStatus.getLen(),
-        readahead);
-    logTimePerIOP("line read", timer, lines);
-    logStreamStatistics();
-  }
-
-  private void logStreamStatistics() {
-    LOG.info(String.format("Stream Statistics%n{}"), streamStatistics);
-  }
-
-  /**
-   * Execute a seek+read sequence.
-   * @param blockSize block size for seeks
-   * @param readahead what the readahead value of the stream should be
-   * @throws IOException IO problems
-   */
-  protected void executeSeekReadSequence(long blockSize,
-      long readahead,
-      S3AInputPolicy policy) throws IOException {
-    in = openTestFile(policy, readahead);
-    long len = testDataStatus.getLen();
-    NanoTimer timer = new NanoTimer();
-    long blockCount = len / blockSize;
-    LOG.info("Reading {} blocks, readahead = {}",
-        blockCount, readahead);
-    for (long i = 0; i < blockCount; i++) {
-      in.seek(in.getPos() + blockSize - 1);
-      // this is the read
-      assertTrue(in.read() >= 0);
-    }
-    timer.end("Time to execute %d seeks of distance %d with readahead = %d",
-        blockCount,
-        blockSize,
-        readahead);
-    logTimePerIOP("seek(pos + " + blockCount+"); read()", timer, blockCount);
-    LOG.info("Effective bandwidth {} MB/S",
-        timer.bandwidthDescription(streamStatistics.bytesRead -
-            streamStatistics.bytesSkippedOnSeek));
-    logStreamStatistics();
-  }
-
-  public static final int _4K = 4 * 1024;
-  public static final int _8K = 8 * 1024;
-  public static final int _16K = 16 * 1024;
-  public static final int _32K = 32 * 1024;
-  public static final int _64K = 64 * 1024;
-  public static final int _128K = 128 * 1024;
-  public static final int _256K = 256 * 1024;
-  public static final int _1MB = 1024 * 1024;
-  public static final int _2MB = 2 * _1MB;
-  public static final int _10MB = _1MB * 10;
-  public static final int _5MB = _1MB * 5;
-
-  private static final int[][] RANDOM_IO_SEQUENCE = {
-      {_2MB, _128K},
-      {_128K, _128K},
-      {_5MB, _64K},
-      {_1MB, _1MB},
-  };
-
-  @Test
-  public void testRandomIORandomPolicy() throws Throwable {
-    executeRandomIO(S3AInputPolicy.Random, (long) RANDOM_IO_SEQUENCE.length);
-    assertEquals("streams aborted in " + streamStatistics,
-        0, streamStatistics.aborted);
-  }
-
-  @Test
-  public void testRandomIONormalPolicy() throws Throwable {
-    long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
-    executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
-    assertEquals("streams aborted in " + streamStatistics,
-        4, streamStatistics.aborted);
-  }
-
-  /**
-   * Execute the random IO {@code readFully(pos, bytes[])} sequence defined by
-   * {@link #RANDOM_IO_SEQUENCE}. The stream is closed afterwards; that's used
-   * in the timing too
-   * @param policy read policy
-   * @param expectedOpenCount expected number of stream openings
-   * @throws IOException IO problems
-   * @return the timer
-   */
-  private ContractTestUtils.NanoTimer executeRandomIO(S3AInputPolicy policy,
-      long expectedOpenCount)
-      throws IOException {
-    describe("Random IO with policy \"%s\"", policy);
-    byte[] buffer = new byte[_1MB];
-    long totalBytesRead = 0;
-
-    in = openTestFile(policy, 0);
-    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
-    for (int[] action : RANDOM_IO_SEQUENCE) {
-      int position = action[0];
-      int range = action[1];
-      in.readFully(position, buffer, 0, range);
-      totalBytesRead += range;
-    }
-    int reads = RANDOM_IO_SEQUENCE.length;
-    timer.end("Time to execute %d reads of total size %d bytes",
-        reads,
-        totalBytesRead);
-    in.close();
-    assertOpenOperationCount(expectedOpenCount);
-    logTimePerIOP("byte read", timer, totalBytesRead);
-    LOG.info("Effective bandwidth {} MB/S",
-        timer.bandwidthDescription(streamStatistics.bytesRead -
-            streamStatistics.bytesSkippedOnSeek));
-    logStreamStatistics();
-    return timer;
-  }
-
-  S3AInputStream getS3aStream() {
-    return (S3AInputStream) in.getWrappedStream();
-  }
-
-  @Test
-  public void testRandomReadOverBuffer() throws Throwable {
-    describe("read over a buffer, making sure that the requests" +
-        " spans readahead ranges");
-    int datasetLen = _32K;
-    Path dataFile = new Path(getTestPath(), "testReadOverBuffer.bin");
-    byte[] sourceData = dataset(datasetLen, 0, 64);
-    // relies on the field 'fs' referring to the R/W FS
-    writeDataset(fs, dataFile, sourceData, datasetLen, _16K, true);
-    byte[] buffer = new byte[datasetLen];
-    int readahead = _8K;
-    int halfReadahead = _4K;
-    in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead);
-
-    LOG.info("Starting initial reads");
-    S3AInputStream s3aStream = getS3aStream();
-    assertEquals(readahead, s3aStream.getReadahead());
-    byte[] oneByte = new byte[1];
-    assertEquals(1, in.read(0, oneByte, 0, 1));
-    // make some assertions about the current state
-    assertEquals("remaining in\n" + in,
-        readahead - 1, s3aStream.remainingInCurrentRequest());
-    assertEquals("range start in\n" + in,
-        0, s3aStream.getContentRangeStart());
-    assertEquals("range finish in\n" + in,
-        readahead, s3aStream.getContentRangeFinish());
-
-    assertStreamOpenedExactlyOnce();
-
-    describe("Starting sequence of positioned read calls over\n%s", in);
-    NanoTimer readTimer = new NanoTimer();
-    int currentPos = halfReadahead;
-    int offset = currentPos;
-    int bytesRead = 0;
-    int readOps = 0;
-
-    // make multiple read() calls
-    while (bytesRead < halfReadahead) {
-      int length = buffer.length - offset;
-      int read = in.read(currentPos, buffer, offset, length);
-      bytesRead += read;
-      offset += read;
-      readOps++;
-      assertEquals("open operations on request #" + readOps
-              + " after reading " + bytesRead
-              + " current position in stream " + currentPos
-              + " in\n" + fs
-              + "\n " + in,
-          1, streamStatistics.openOperations);
-      for (int i = currentPos; i < currentPos + read; i++) {
-        assertEquals("Wrong value from byte " + i,
-            sourceData[i], buffer[i]);
-      }
-      currentPos += read;
-    }
-    assertStreamOpenedExactlyOnce();
-    // assert at the end of the original block
-    assertEquals(readahead, currentPos);
-    readTimer.end("read %d in %d operations", bytesRead, readOps);
-    bandwidth(readTimer, bytesRead);
-    LOG.info("Time per byte(): {} nS",
-        toHuman(readTimer.nanosPerOperation(bytesRead)));
-    LOG.info("Time per read(): {} nS",
-        toHuman(readTimer.nanosPerOperation(readOps)));
-
-    describe("read last byte");
-    // read one more
-    int read = in.read(currentPos, buffer, bytesRead, 1);
-    assertTrue("-1 from last read", read >= 0);
-    assertOpenOperationCount(2);
-    assertEquals("Wrong value from read ", sourceData[currentPos],
-        (int) buffer[currentPos]);
-    currentPos++;
-
-
-    // now scan all the way to the end of the file, using single byte read()
-    // calls
-    describe("read() to EOF over \n%s", in);
-    long readCount = 0;
-    NanoTimer timer = new NanoTimer();
-    LOG.info("seeking");
-    in.seek(currentPos);
-    LOG.info("reading");
-    while(currentPos < datasetLen) {
-      int r = in.read();
-      assertTrue("Negative read() at position " + currentPos + " in\n" + in,
-          r >= 0);
-      buffer[currentPos] = (byte)r;
-      assertEquals("Wrong value from read from\n" + in,
-          sourceData[currentPos], r);
-      currentPos++;
-      readCount++;
-    }
-    timer.end("read %d bytes", readCount);
-    bandwidth(timer, readCount);
-    LOG.info("Time per read(): {} nS",
-        toHuman(timer.nanosPerOperation(readCount)));
-
-    assertEquals("last read in " + in, -1, in.read());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java
new file mode 100644
index 0000000..ca57da6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.EnumSet;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * S3A tests through the {@link FileContext} API.
+ */
+public class ITestS3A {
+  private FileContext fc;
+
+  @Rule
+  public final Timeout testTimeout = new Timeout(90000);
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    fc = S3ATestUtils.createTestFileContext(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fc != null) {
+      fc.delete(getTestPath(), true);
+    }
+  }
+
+  protected Path getTestPath() {
+    return new Path("/tests3afc");
+  }
+
+  @Test
+  public void testS3AStatus() throws Exception {
+    FsStatus fsStatus = fc.getFsStatus(null);
+    assertNotNull(fsStatus);
+    assertTrue("Used capacity should be positive: " + fsStatus.getUsed(),
+        fsStatus.getUsed() >= 0);
+    assertTrue("Remaining capacity should be positive: " + fsStatus
+            .getRemaining(),
+        fsStatus.getRemaining() >= 0);
+    assertTrue("Capacity should be positive: " + fsStatus.getCapacity(),
+        fsStatus.getCapacity() >= 0);
+  }
+
+  @Test
+  public void testS3ACreateFileInSubDir() throws Exception {
+    Path dirPath = getTestPath();
+    fc.mkdir(dirPath, FileContext.DIR_DEFAULT_PERM, true);
+    Path filePath = new Path(dirPath, "file");
+    try (FSDataOutputStream file = fc.create(filePath, EnumSet.of(CreateFlag
+        .CREATE))) {
+      file.write(666);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java
new file mode 100644
index 0000000..772d8c7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.fs.s3a.yarn;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+
+import org.junit.After;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests that S3A is usable through a YARN application.
+ */
+public class ITestS3AMiniYarnCluster {
+
+  private final Configuration conf = new YarnConfiguration();
+  private S3AFileSystem fs;
+  private MiniYARNCluster yarnCluster;
+  private final String rootPath = "/tests/MiniClusterWordCount/";
+
+  @Before
+  public void beforeTest() throws IOException {
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    fs.mkdirs(new Path(rootPath + "input/"));
+
+    yarnCluster = new MiniYARNCluster("MiniClusterWordCount", // testName
+            1, // number of node managers
+            1, // number of local log dirs per node manager
+            1); // number of hdfs dirs per node manager
+    yarnCluster.init(conf);
+    yarnCluster.start();
+  }
+
+  @After
+  public void afterTest() throws IOException {
+    fs.delete(new Path(rootPath), true);
+    yarnCluster.stop();
+  }
+
+  @Test
+  public void testWithMiniCluster() throws Exception {
+    Path input = new Path(rootPath + "input/in.txt");
+    input = input.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path output = new Path(rootPath + "output/");
+    output = output.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+    writeStringToFile(input, "first line\nsecond line\nthird line");
+
+    Job job = Job.getInstance(conf, "word count");
+    job.setJarByClass(WordCount.class);
+    job.setMapperClass(WordCount.TokenizerMapper.class);
+    job.setCombinerClass(WordCount.IntSumReducer.class);
+    job.setReducerClass(WordCount.IntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+
+    int exitCode = (job.waitForCompletion(true) ? 0 : 1);
+    assertEquals("Returned error code.", 0, exitCode);
+
+    assertTrue(fs.exists(new Path(output, "_SUCCESS")));
+    String outputAsStr = readStringFromFile(new Path(output, "part-r-00000"));
+    Map<String, Integer> resAsMap = getResultAsMap(outputAsStr);
+
+    assertEquals(4, resAsMap.size());
+    assertEquals(1, (int) resAsMap.get("first"));
+    assertEquals(1, (int) resAsMap.get("second"));
+    assertEquals(1, (int) resAsMap.get("third"));
+    assertEquals(3, (int) resAsMap.get("line"));
+  }
+
+  /**
+   * helper method.
+   */
+  private Map<String, Integer> getResultAsMap(String outputAsStr)
+      throws IOException {
+    Map<String, Integer> result = new HashMap<>();
+    for (String line : outputAsStr.split("\n")) {
+      String[] tokens = line.split("\t");
+      result.put(tokens[0], Integer.parseInt(tokens[1]));
+    }
+    return result;
+  }
+
+  /**
+   * helper method.
+   */
+  private void writeStringToFile(Path path, String string) throws IOException {
+    FileContext fc = S3ATestUtils.createTestFileContext(conf);
+    try (FSDataOutputStream file = fc.create(path,
+            EnumSet.of(CreateFlag.CREATE))) {
+      file.write(string.getBytes());
+    }
+  }
+
+  /**
+   * helper method.
+   */
+  private String readStringFromFile(Path path) {
+    try (FSDataInputStream in = fs.open(path)) {
+      long bytesLen = fs.getFileStatus(path).getLen();
+      byte[] buffer = new byte[(int) bytesLen];
+      IOUtils.readFully(in, buffer, 0, buffer.length);
+      return new String(buffer);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to read from [" + path + "]", e);
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message