hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [45/50] [abbrv] hadoop git commit: HADOOP-13560. S3ABlockOutputStream to support huge (many GB) file writes. Contributed by Steve Loughran
Date Tue, 18 Oct 2016 23:45:31 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
new file mode 100644
index 0000000..a60d084
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressEventType;
+import com.amazonaws.event.ProgressListener;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.util.Progressable;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+
+/**
+ * Scale test which creates a huge file.
+ *
+ * <b>Important:</b> the order in which these tests execute is fixed to
+ * alphabetical order. Test cases are numbered {@code test_123_} to impose
+ * an ordering based on the numbers.
+ *
+ * Having this ordering allows the tests to assume that the huge file
+ * exists. Even so: they should all have a {@link #assumeHugeFileExists()}
+ * check at the start, in case an individual test is executed.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbstractSTestS3AHugeFiles.class);
+  public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB;
+  public static final String DEFAULT_PARTITION_SIZE = "8M";
+  private Path scaleTestDir;
+  private Path hugefile;
+  private Path hugefileRenamed;
+
+  private int uploadBlockSize = DEFAULT_UPLOAD_BLOCKSIZE;
+  private int partitionSize;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+
+    final Path testPath = getTestPath();
+    scaleTestDir = new Path(testPath, "scale");
+    hugefile = new Path(scaleTestDir, "hugefile");
+    hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed");
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    // do nothing. Specifically: do not delete the test dir
+  }
+
+  /**
+   * Note that this can get called before test setup.
+   * @return the configuration to use.
+   */
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    partitionSize = (int)getTestPropertyBytes(conf,
+        KEY_HUGE_PARTITION_SIZE,
+        DEFAULT_PARTITION_SIZE);
+    assertTrue("Partition size too small: " + partitionSize,
+        partitionSize > MULTIPART_MIN_SIZE);
+    conf.setLong(SOCKET_SEND_BUFFER, _1MB);
+    conf.setLong(SOCKET_RECV_BUFFER, _1MB);
+    conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize);
+    conf.setInt(MULTIPART_SIZE, partitionSize);
+    conf.set(USER_AGENT_PREFIX, "STestS3AHugeFileCreate");
+    conf.setBoolean(FAST_UPLOAD, true);
+    conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
+    return conf;
+  }
+
+  /**
+   * The name of the buffering mechanism to use.
+   * @return a buffering mechanism
+   */
+  protected abstract String getBlockOutputBufferName();
+
+  @Test
+  public void test_010_CreateHugeFile() throws IOException {
+    assertFalse("Please run this test sequentially to avoid timeouts" +
+            " and bandwidth problems", isParallelExecution());
+    long filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE,
+        DEFAULT_HUGE_FILESIZE);
+    long filesizeMB = filesize / _1MB;
+
+    // clean up from any previous attempts
+    deleteHugeFile();
+
+    describe("Creating file %s of size %d MB" +
+            " with partition size %d buffered by %s",
+        hugefile, filesizeMB, partitionSize, getBlockOutputBufferName());
+
+    // now do a check of available upload time, with a pessimistic bandwidth
+    // (that of remote upload tests). If the test times out then not only is
+    // the test outcome lost, as the follow-on tests continue, they will
+    // overlap with the ongoing upload test, for much confusion.
+    int timeout = getTestTimeoutSeconds();
+    // assume 1 MB/s upload bandwidth
+    int bandwidth = _1MB;
+    long uploadTime = filesize / bandwidth;
+    assertTrue(String.format("Timeout set in %s seconds is too low;" +
+            " estimating upload time of %d seconds at 1 MB/s." +
+            " Rerun tests with -D%s=%d",
+            timeout, uploadTime, KEY_TEST_TIMEOUT, uploadTime * 2),
+        uploadTime < timeout);
+    assertEquals("File size set in " + KEY_HUGE_FILESIZE + " = " + filesize
+            + " is not a multiple of " + uploadBlockSize,
+        0, filesize % uploadBlockSize);
+
+    byte[] data = new byte[uploadBlockSize];
+    for (int i = 0; i < uploadBlockSize; i++) {
+      data[i] = (byte) (i % 256);
+    }
+
+    long blocks = filesize / uploadBlockSize;
+    long blocksPerMB = _1MB / uploadBlockSize;
+
+    // perform the upload.
+    // there's lots of logging here, so that a tail -f on the output log
+    // can give a view of what is happening.
+    StorageStatistics storageStatistics = fs.getStorageStatistics();
+    String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol();
+    String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol();
+    Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE;
+    Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING;
+
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+
+    long blocksPer10MB = blocksPerMB * 10;
+    ProgressCallback progress = new ProgressCallback(timer);
+    try (FSDataOutputStream out = fs.create(hugefile,
+        true,
+        uploadBlockSize,
+        progress)) {
+
+      for (long block = 1; block <= blocks; block++) {
+        out.write(data);
+        long written = block * uploadBlockSize;
+        // every 10 MB and on file upload @ 100%, print some stats
+        if (block % blocksPer10MB == 0 || written == filesize) {
+          long percentage = written * 100 / filesize;
+          double elapsedTime = timer.elapsedTime() / 1.0e9;
+          double writtenMB = 1.0 * written / _1MB;
+          LOG.info(String.format("[%02d%%] Buffered %.2f MB out of %d MB;" +
+                  " PUT %d bytes (%d pending) in %d operations (%d active);" +
+                  " elapsedTime=%.2fs; write to buffer bandwidth=%.2f MB/s",
+              percentage,
+              writtenMB,
+              filesizeMB,
+              storageStatistics.getLong(putBytes),
+              gaugeValue(putBytesPending),
+              storageStatistics.getLong(putRequests),
+              gaugeValue(putRequestsActive),
+              elapsedTime,
+              writtenMB / elapsedTime));
+        }
+      }
+      // now close the file
+      LOG.info("Closing file and completing write operation");
+      ContractTestUtils.NanoTimer closeTimer
+          = new ContractTestUtils.NanoTimer();
+      out.close();
+      closeTimer.end("time to close() output stream");
+    }
+
+    timer.end("time to write %d MB in blocks of %d",
+        filesizeMB, uploadBlockSize);
+    logFSState();
+    bandwidth(timer, filesize);
+    long putRequestCount = storageStatistics.getLong(putRequests);
+    Long putByteCount = storageStatistics.getLong(putBytes);
+    LOG.info("PUT {} bytes in {} operations; {} MB/operation",
+        putByteCount, putRequestCount,
+        putByteCount / (putRequestCount * _1MB));
+    LOG.info("Time per PUT {} nS",
+        toHuman(timer.nanosPerOperation(putRequestCount)));
+    assertEquals("active put requests in \n" + fs,
+        0, gaugeValue(putRequestsActive));
+    ContractTestUtils.assertPathExists(fs, "Huge file", hugefile);
+    S3AFileStatus status = fs.getFileStatus(hugefile);
+    ContractTestUtils.assertIsFile(hugefile, status);
+    assertEquals("File size in " + status, filesize, status.getLen());
+    progress.verifyNoFailures("Put file " + hugefile + " of size " + filesize);
+  }
+
+  /**
+   * Progress callback from AWS. Likely to come in on a different thread.
+   */
+  private final class ProgressCallback implements Progressable,
+      ProgressListener {
+    private AtomicLong bytesTransferred = new AtomicLong(0);
+    private AtomicInteger failures = new AtomicInteger(0);
+    private final ContractTestUtils.NanoTimer timer;
+
+    private ProgressCallback(NanoTimer timer) {
+      this.timer = timer;
+    }
+
+    @Override
+    public void progress() {
+    }
+
+    @Override
+    public void progressChanged(ProgressEvent progressEvent) {
+      ProgressEventType eventType = progressEvent.getEventType();
+      if (eventType.isByteCountEvent()) {
+        bytesTransferred.addAndGet(progressEvent.getBytesTransferred());
+      }
+      switch (eventType) {
+      case TRANSFER_PART_FAILED_EVENT:
+        // failure
+        failures.incrementAndGet();
+        LOG.warn("Transfer failure");
+        break;
+      case TRANSFER_PART_COMPLETED_EVENT:
+        // completion
+        long elapsedTime = timer.elapsedTime();
+        double elapsedTimeS = elapsedTime / 1.0e9;
+        long written = bytesTransferred.get();
+        long writtenMB = written / _1MB;
+        LOG.info(String.format(
+            "Event %s; total uploaded=%d MB in %.1fs;" +
+                " effective upload bandwidth = %.2f MB/s",
+            progressEvent,
+            writtenMB, elapsedTimeS, writtenMB / elapsedTimeS));
+        break;
+      default:
+        if (eventType.isByteCountEvent()) {
+          LOG.debug("Event {}", progressEvent);
+        } else {
+          LOG.info("Event {}", progressEvent);
+        }
+        break;
+      }
+    }
+
+    @Override
+    public String toString() {
+      String sb = "ProgressCallback{"
+          + "bytesTransferred=" + bytesTransferred +
+          ", failures=" + failures +
+          '}';
+      return sb;
+    }
+
+    private void verifyNoFailures(String operation) {
+      assertEquals("Failures in " + operation +": " + this, 0, failures.get());
+    }
+  }
+
+  void assumeHugeFileExists() throws IOException {
+    ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
+    ContractTestUtils.assertIsFile(fs, hugefile);
+  }
+
+  private void logFSState() {
+    LOG.info("File System state after operation:\n{}", fs);
+  }
+
+  @Test
+  public void test_040_PositionedReadHugeFile() throws Throwable {
+    assumeHugeFileExists();
+    final String encryption = getConf().getTrimmed(
+        SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    boolean encrypted = encryption != null;
+    if (encrypted) {
+      LOG.info("File is encrypted with algorithm {}", encryption);
+    }
+    String filetype = encrypted ? "encrypted file" : "file";
+    describe("Positioned reads of %s %s", filetype, hugefile);
+    S3AFileStatus status = fs.getFileStatus(hugefile);
+    long filesize = status.getLen();
+    int ops = 0;
+    final int bufferSize = 8192;
+    byte[] buffer = new byte[bufferSize];
+    long eof = filesize - 1;
+
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    ContractTestUtils.NanoTimer readAtByte0, readAtByte0Again, readAtEOF;
+    try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) {
+      readAtByte0 = new ContractTestUtils.NanoTimer();
+      in.readFully(0, buffer);
+      readAtByte0.end("time to read data at start of file");
+      ops++;
+
+      readAtEOF = new ContractTestUtils.NanoTimer();
+      in.readFully(eof - bufferSize, buffer);
+      readAtEOF.end("time to read data at end of file");
+      ops++;
+
+      readAtByte0Again = new ContractTestUtils.NanoTimer();
+      in.readFully(0, buffer);
+      readAtByte0Again.end("time to read data at start of file again");
+      ops++;
+      LOG.info("Final stream state: {}", in);
+    }
+    long mb = Math.max(filesize / _1MB, 1);
+
+    logFSState();
+    timer.end("time to performed positioned reads of %s of %d MB ",
+        filetype, mb);
+    LOG.info("Time per positioned read = {} nS",
+        toHuman(timer.nanosPerOperation(ops)));
+  }
+
+  @Test
+  public void test_050_readHugeFile() throws Throwable {
+    assumeHugeFileExists();
+    describe("Reading %s", hugefile);
+    S3AFileStatus status = fs.getFileStatus(hugefile);
+    long filesize = status.getLen();
+    long blocks = filesize / uploadBlockSize;
+    byte[] data = new byte[uploadBlockSize];
+
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) {
+      for (long block = 0; block < blocks; block++) {
+        in.readFully(data);
+      }
+      LOG.info("Final stream state: {}", in);
+    }
+
+    long mb = Math.max(filesize / _1MB, 1);
+    timer.end("time to read file of %d MB ", mb);
+    LOG.info("Time per MB to read = {} nS",
+        toHuman(timer.nanosPerOperation(mb)));
+    bandwidth(timer, filesize);
+    logFSState();
+  }
+
+  @Test
+  public void test_100_renameHugeFile() throws Throwable {
+    assumeHugeFileExists();
+    describe("renaming %s to %s", hugefile, hugefileRenamed);
+    S3AFileStatus status = fs.getFileStatus(hugefile);
+    long filesize = status.getLen();
+    fs.delete(hugefileRenamed, false);
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    fs.rename(hugefile, hugefileRenamed);
+    long mb = Math.max(filesize / _1MB, 1);
+    timer.end("time to rename file of %d MB", mb);
+    LOG.info("Time per MB to rename = {} nS",
+        toHuman(timer.nanosPerOperation(mb)));
+    bandwidth(timer, filesize);
+    logFSState();
+    S3AFileStatus destFileStatus = fs.getFileStatus(hugefileRenamed);
+    assertEquals(filesize, destFileStatus.getLen());
+
+    // rename back
+    ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
+    fs.rename(hugefileRenamed, hugefile);
+    timer2.end("Renaming back");
+    LOG.info("Time per MB to rename = {} nS",
+        toHuman(timer2.nanosPerOperation(mb)));
+    bandwidth(timer2, filesize);
+  }
+
+  @Test
+  public void test_999_DeleteHugeFiles() throws IOException {
+    deleteHugeFile();
+    ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
+
+    fs.delete(hugefileRenamed, false);
+    timer2.end("time to delete %s", hugefileRenamed);
+    ContractTestUtils.rm(fs, getTestPath(), true, true);
+  }
+
+  protected void deleteHugeFile() throws IOException {
+    describe("Deleting %s", hugefile);
+    NanoTimer timer = new NanoTimer();
+    fs.delete(hugefile, false);
+    timer.end("time to delete %s", hugefile);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
index 208c491..4e1a734 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
@@ -116,20 +116,9 @@ public class ITestS3ADeleteManyFiles extends S3AScaleTestBase {
 
   @Test
   public void testOpenCreate() throws IOException {
-    Path dir = new Path("/tests3a");
-    ContractTestUtils.createAndVerifyFile(fs, dir, 1024);
-    ContractTestUtils.createAndVerifyFile(fs, dir, 5 * 1024 * 1024);
-    ContractTestUtils.createAndVerifyFile(fs, dir, 20 * 1024 * 1024);
-
-
-    /*
-    Enable to test the multipart upload
-    try {
-      ContractTestUtils.createAndVerifyFile(fs, dir,
-          (long)6 * 1024 * 1024 * 1024);
-    } catch (IOException e) {
-      fail(e.getMessage());
-    }
-    */
+    final Path scaleTestDir = getTestPath();
+    final Path srcDir = new Path(scaleTestDir, "opencreate");
+    ContractTestUtils.createAndVerifyFile(fs, srcDir, 1024);
+    ContractTestUtils.createAndVerifyFile(fs, srcDir, 50 * 1024);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java
new file mode 100644
index 0000000..d6f15c8
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.s3a.Constants;
+
+/**
+ * Use {@link Constants#FAST_UPLOAD_BUFFER_ARRAY} for buffering.
+ */
+public class ITestS3AHugeFilesArrayBlocks extends AbstractSTestS3AHugeFiles {
+
+  protected String getBlockOutputBufferName() {
+    return Constants.FAST_UPLOAD_BUFFER_ARRAY;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java
new file mode 100644
index 0000000..b1323c4
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.s3a.Constants;
+
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BYTEBUFFER;
+
+/**
+ * Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering.
+ */
+public class ITestS3AHugeFilesByteBufferBlocks
+    extends AbstractSTestS3AHugeFiles {
+
+  protected String getBlockOutputBufferName() {
+    return FAST_UPLOAD_BYTEBUFFER;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java
new file mode 100644
index 0000000..45eef24
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Constants;
+
+/**
+ * Use classic output for writing things; tweaks the configuration to do
+ * this after it has been set up in the superclass.
+ * The generator test has been copied and re
+ */
+public class ITestS3AHugeFilesClassicOutput extends AbstractSTestS3AHugeFiles {
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    conf.setBoolean(Constants.FAST_UPLOAD, false);
+    return conf;
+  }
+
+  protected String getBlockOutputBufferName() {
+    return "classic";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java
new file mode 100644
index 0000000..2be5769
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.s3a.Constants;
+
+/**
+ * Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering.
+ */
+public class ITestS3AHugeFilesDiskBlocks extends AbstractSTestS3AHugeFiles {
+
+  protected String getBlockOutputBufferName() {
+    return Constants.FAST_UPLOAD_BUFFER_DISK;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
index d861a16..af6d468 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
@@ -20,18 +20,18 @@ package org.apache.hadoop.fs.s3a.scale;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.fs.s3a.S3ATestConstants;
-import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TestName;
 import org.junit.rules.Timeout;
@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+
 /**
  * Base class for scale tests; here is where the common scale configuration
  * keys are defined.
@@ -47,71 +49,18 @@ import java.io.InputStream;
 public class S3AScaleTestBase extends Assert implements S3ATestConstants {
 
   @Rule
-  public TestName methodName = new TestName();
+  public final TestName methodName = new TestName();
 
   @Rule
-  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+  public Timeout testTimeout = createTestTimeout();
 
-  @BeforeClass
-  public static void nameThread() {
+  @Before
+  public void nameThread() {
     Thread.currentThread().setName("JUnit");
   }
 
-  /**
-   * The number of operations to perform: {@value}.
-   */
-  public static final String KEY_OPERATION_COUNT =
-      SCALE_TEST + "operation.count";
-
-  /**
-   * The number of directory operations to perform: {@value}.
-   */
-  public static final String KEY_DIRECTORY_COUNT =
-      SCALE_TEST + "directory.count";
-
-  /**
-   * The readahead buffer: {@value}.
-   */
-  public static final String KEY_READ_BUFFER_SIZE =
-      S3A_SCALE_TEST + "read.buffer.size";
-
-  public static final int DEFAULT_READ_BUFFER_SIZE = 16384;
-
-  /**
-   * Key for a multi MB test file: {@value}.
-   */
-  public static final String KEY_CSVTEST_FILE =
-      S3A_SCALE_TEST + "csvfile";
-  /**
-   * Default path for the multi MB test file: {@value}.
-   */
-  public static final String DEFAULT_CSVTEST_FILE
-      = "s3a://landsat-pds/scene_list.gz";
-
-  /**
-   * Endpoint for the S3 CSV/scale tests. This defaults to
-   * being us-east.
-   */
-  public static final String KEY_CSVTEST_ENDPOINT =
-      S3A_SCALE_TEST + "csvfile.endpoint";
-
-  /**
-   * Endpoint for the S3 CSV/scale tests. This defaults to
-   * being us-east.
-   */
-  public static final String DEFAULT_CSVTEST_ENDPOINT =
-      "s3.amazonaws.com";
-
-  /**
-   * The default number of operations to perform: {@value}.
-   */
-  public static final long DEFAULT_OPERATION_COUNT = 2005;
-
-  /**
-   * Default number of directories to create when performing
-   * directory performance/scale tests.
-   */
-  public static final int DEFAULT_DIRECTORY_COUNT = 2;
+  public static final int _1KB = 1024;
+  public static final int _1MB = _1KB * _1KB;
 
   protected S3AFileSystem fs;
 
@@ -120,6 +69,8 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants
{
 
   private Configuration conf;
 
+  private boolean enabled;
+
   /**
    * Configuration generator. May be overridden to inject
    * some custom options.
@@ -137,11 +88,33 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants
{
     return conf;
   }
 
+  /**
+   * Setup. This triggers creation of the configuration.
+   */
   @Before
   public void setUp() throws Exception {
-    conf = createConfiguration();
+    demandCreateConfiguration();
     LOG.debug("Scale test operation count = {}", getOperationCount());
-    fs = S3ATestUtils.createTestFileSystem(conf);
+    // multipart purges are disabled on the scale tests
+    fs = createTestFileSystem(conf, false);
+    // check for the test being enabled
+    enabled = getTestPropertyBool(
+        getConf(),
+        KEY_SCALE_TESTS_ENABLED,
+        DEFAULT_SCALE_TESTS_ENABLED);
+    Assume.assumeTrue("Scale test disabled: to enable set property " +
+        KEY_SCALE_TESTS_ENABLED, enabled);
+  }
+
+  /**
+   * Create the configuration if it is not already set up.
+   * @return the configuration.
+   */
+  private synchronized Configuration demandCreateConfiguration() {
+    if (conf == null) {
+      conf = createConfiguration();
+    }
+    return conf;
   }
 
   @After
@@ -160,7 +133,27 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants
{
   }
 
   /**
-   * Describe a test in the logs
+   * Create the timeout for tests. Some large tests may need a larger value.
+   * @return the test timeout to use
+   */
+  protected Timeout createTestTimeout() {
+    demandCreateConfiguration();
+    return new Timeout(
+        getTestTimeoutSeconds() * 1000);
+  }
+
+  /**
+   * Get the test timeout in seconds.
+   * @return the test timeout as set in system properties or the default.
+   */
+  protected static int getTestTimeoutSeconds() {
+    return getTestPropertyInt(null,
+        KEY_TEST_TIMEOUT,
+        DEFAULT_TEST_TIMEOUT);
+  }
+
+  /**
+   * Describe a test in the logs.
    * @param text text to print
    * @param args arguments to format in the printing
    */
@@ -189,4 +182,30 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants
{
     }
   }
 
+  /**
+   * Get the gauge value of a statistic. Raises an assertion if
+   * there is no such gauge.
+   * @param statistic statistic to look up
+   * @return the value.
+   */
+  public long gaugeValue(Statistic statistic) {
+    S3AInstrumentation instrumentation = fs.getInstrumentation();
+    MutableGaugeLong gauge = instrumentation.lookupGauge(statistic.getSymbol());
+    assertNotNull("No gauge " + statistic
+        + " in " + instrumentation.dump("", " = ", "\n", true), gauge);
+    return gauge.value();
+  }
+
+  protected boolean isEnabled() {
+    return enabled;
+  }
+
+  /**
+   * Flag to indicate that this test is being used sequentially. This
+   * is used by some of the scale tests to validate test time expectations.
+   * @return true if the build indicates this test is being run in parallel.
+   */
+  protected boolean isParallelExecution() {
+    return Boolean.getBoolean(S3ATestConstants.KEY_PARALLEL_TEST_EXECUTION);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message