Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F3E4F1776F for ; Wed, 4 Mar 2015 00:19:01 +0000 (UTC) Received: (qmail 87569 invoked by uid 500); 4 Mar 2015 00:19:01 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 87264 invoked by uid 500); 4 Mar 2015 00:19:01 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 87253 invoked by uid 99); 4 Mar 2015 00:19:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 00:19:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4AF7AE01BD; Wed, 4 Mar 2015 00:19:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stevel@apache.org To: common-commits@hadoop.apache.org Date: Wed, 04 Mar 2015 00:19:02 -0000 Message-Id: <9fd9272aec6e4176b2193e18351f4163@git.apache.org> In-Reply-To: <27f16a9b8e3743c1ac2a4fa8f1eb37a5@git.apache.org> References: <27f16a9b8e3743c1ac2a4fa8f1eb37a5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel) HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/15b7076a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/15b7076a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/15b7076a Branch: refs/heads/trunk Commit: 15b7076ad5f2ae92d231140b2f8cebc392a92c87 Parents: e17e5ba Author: Steve Loughran Authored: Tue Mar 3 16:18:39 2015 -0800 Committer: Steve Loughran Committed: Tue Mar 3 16:18:51 2015 -0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 2 + .../src/main/resources/core-default.xml | 20 +- .../org/apache/hadoop/fs/s3a/Constants.java | 8 + .../hadoop/fs/s3a/S3AFastOutputStream.java | 413 +++++++++++++++++++ .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 24 +- .../src/site/markdown/tools/hadoop-aws/index.md | 46 ++- .../hadoop/fs/s3a/TestS3AFastOutputStream.java | 74 ++++ 7 files changed, 570 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 11785f2..cb5cd4d 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -667,6 +667,8 @@ Release 2.7.0 - UNRELEASED HADOOP-11620. Add support for load balancing across a group of KMS for HA. (Arun Suresh via wang) + HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel) + BUG FIXES HADOOP-11512. Use getTrimmedStrings when reading serialization keys http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 80dd15b..74390d8 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -763,13 +763,13 @@ for ldap providers in the same way as above does. fs.s3a.connection.establish.timeout 5000 - Socket connection setup timeout in seconds. + Socket connection setup timeout in milliseconds. fs.s3a.connection.timeout 50000 - Socket connection timeout in seconds. + Socket connection timeout in milliseconds. @@ -846,6 +846,22 @@ for ldap providers in the same way as above does. + fs.s3a.fast.upload + false + Upload directly from memory instead of buffering to + disk first. Memory usage and parallelism can be controlled as up to + fs.s3a.multipart.size memory is consumed for each (part)upload actively + uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks) + + + + fs.s3a.fast.buffer.size + 1048576 + Size of initial memory buffer in bytes allocated for an + upload. No effect if fs.s3a.fast.upload is false. + + + fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem The implementation class of the S3A Filesystem http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 1d4f67b..e7462dc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -83,6 +83,14 @@ public class Constants { // comma separated list of directories public static final String BUFFER_DIR = "fs.s3a.buffer.dir"; + // should we upload directly from memory rather than using a file buffer + public static final String FAST_UPLOAD = "fs.s3a.fast.upload"; + public static final boolean DEFAULT_FAST_UPLOAD = false; + + //initial size of memory buffer for a fast upload + public static final String FAST_BUFFER_SIZE = "fs.s3a.fast.buffer.size"; + public static final int DEFAULT_FAST_BUFFER_SIZE = 1048576; //1MB + // private | public-read | public-read-write | authenticated-read | // log-delivery-write | bucket-owner-read | bucket-owner-full-control public static final String CANNED_ACL = "fs.s3a.acl.default"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java new file mode 100644 index 0000000..a29c47b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -0,0 +1,413 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadPoolExecutor; + + +/** + * Upload files/parts asap directly from a memory buffer (instead of buffering + * to a file). + *

+ * Uploads are managed low-level rather than through the AWS TransferManager. + * This allows for uploading each part of a multi-part upload as soon as + * the bytes are in memory, rather than waiting until the file is closed. + *

+ * Unstable: statistics and error handling might evolve + */ +@InterfaceStability.Unstable +public class S3AFastOutputStream extends OutputStream { + + private static final Logger LOG = S3AFileSystem.LOG; + private final String key; + private final String bucket; + private final AmazonS3Client client; + private final int partSize; + private final int multiPartThreshold; + private final S3AFileSystem fs; + private final CannedAccessControlList cannedACL; + private final FileSystem.Statistics statistics; + private final String serverSideEncryptionAlgorithm; + private final ProgressListener progressListener; + private final ListeningExecutorService executorService; + private MultiPartUpload multiPartUpload; + private boolean closed; + private ByteArrayOutputStream buffer; + private int bufferLimit; + + + /** + * Creates a fast OutputStream that uploads to S3 from memory. + * For MultiPartUploads, as soon as sufficient bytes have been written to + * the stream a part is uploaded immediately (by using the low-level + * multi-part upload API on the AmazonS3Client). + * + * @param client AmazonS3Client used for S3 calls + * @param fs S3AFilesystem + * @param bucket S3 bucket name + * @param key S3 key name + * @param progress report progress in order to prevent timeouts + * @param statistics track FileSystem.Statistics on the performed operations + * @param cannedACL used CannedAccessControlList + * @param serverSideEncryptionAlgorithm algorithm for server side encryption + * @param partSize size of a single part in a multi-part upload (except + * last part) + * @param multiPartThreshold files at least this size use multi-part upload + * @throws IOException + */ + public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs, + String bucket, String key, Progressable progress, + FileSystem.Statistics statistics, CannedAccessControlList cannedACL, + String serverSideEncryptionAlgorithm, long partSize, + long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor) + throws IOException { + this.bucket = bucket; + this.key = key; + this.client = client; + this.fs = fs; + this.cannedACL = cannedACL; + this.statistics = statistics; + this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; + //Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE + if (partSize > Integer.MAX_VALUE) { + this.partSize = Integer.MAX_VALUE; + LOG.warn("s3a: MULTIPART_SIZE capped to ~2.14GB (maximum allowed size " + + "when using 'FAST_UPLOAD = true')"); + } else { + this.partSize = (int) partSize; + } + if (multiPartThreshold > Integer.MAX_VALUE) { + this.multiPartThreshold = Integer.MAX_VALUE; + LOG.warn("s3a: MIN_MULTIPART_THRESHOLD capped to ~2.14GB (maximum " + + "allowed size when using 'FAST_UPLOAD = true')"); + } else { + this.multiPartThreshold = (int) multiPartThreshold; + } + this.bufferLimit = this.multiPartThreshold; + this.closed = false; + int initialBufferSize = this.fs.getConf() + .getInt(Constants.FAST_BUFFER_SIZE, Constants.DEFAULT_FAST_BUFFER_SIZE); + if (initialBufferSize < 0) { + LOG.warn("s3a: FAST_BUFFER_SIZE should be a positive number. Using " + + "default value"); + initialBufferSize = Constants.DEFAULT_FAST_BUFFER_SIZE; + } else if (initialBufferSize > this.bufferLimit) { + LOG.warn("s3a: automatically adjusting FAST_BUFFER_SIZE to not " + + "exceed MIN_MULTIPART_THRESHOLD"); + initialBufferSize = this.bufferLimit; + } + this.buffer = new ByteArrayOutputStream(initialBufferSize); + this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor); + this.multiPartUpload = null; + this.progressListener = new ProgressableListener(progress); + if (LOG.isDebugEnabled()){ + LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'", + bucket, key); + } + } + + /** + * Writes a byte to the memory buffer. If this causes the buffer to reach + * its limit, the actual upload is submitted to the threadpool. + * @param b the int of which the lowest byte is written + * @throws IOException + */ + @Override + public synchronized void write(int b) throws IOException { + buffer.write(b); + if (buffer.size() == bufferLimit) { + uploadBuffer(); + } + } + + /** + * Writes a range of bytes from to the memory buffer. If this causes the + * buffer to reach its limit, the actual upload is submitted to the + * threadpool and the remainder of the array is written to memory + * (recursively). + * @param b byte array containing + * @param off offset in array where to start + * @param len number of bytes to be written + * @throws IOException + */ + @Override + public synchronized void write(byte b[], int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + if (buffer.size() + len < bufferLimit) { + buffer.write(b, off, len); + } else { + int firstPart = bufferLimit - buffer.size(); + buffer.write(b, off, firstPart); + uploadBuffer(); + this.write(b, off + firstPart, len - firstPart); + } + } + + private synchronized void uploadBuffer() throws IOException { + if (multiPartUpload == null) { + multiPartUpload = initiateMultiPartUpload(); + /* Upload the existing buffer if it exceeds partSize. This possibly + requires multiple parts! */ + final byte[] allBytes = buffer.toByteArray(); + buffer = null; //earlier gc? + if (LOG.isDebugEnabled()) { + LOG.debug("Total length of initial buffer: {}", allBytes.length); + } + int processedPos = 0; + while ((multiPartThreshold - processedPos) >= partSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Initial buffer: processing from byte {} to byte {}", + processedPos, (processedPos + partSize - 1)); + } + multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes, + processedPos, partSize), partSize); + processedPos += partSize; + } + //resize and reset stream + bufferLimit = partSize; + buffer = new ByteArrayOutputStream(bufferLimit); + buffer.write(allBytes, processedPos, multiPartThreshold - processedPos); + } else { + //upload next part + multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer + .toByteArray()), partSize); + buffer.reset(); + } + } + + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + try { + if (multiPartUpload == null) { + putObject(); + } else { + if (buffer.size() > 0) { + //send last part + multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer + .toByteArray()), buffer.size()); + } + final List partETags = multiPartUpload + .waitForAllPartUploads(); + multiPartUpload.complete(partETags); + } + statistics.incrementWriteOps(1); + // This will delete unnecessary fake parent directories + fs.finishedWrite(key); + if (LOG.isDebugEnabled()) { + LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key); + } + } finally { + buffer = null; + super.close(); + } + } + + private ObjectMetadata createDefaultMetadata() { + ObjectMetadata om = new ObjectMetadata(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + return om; + } + + private MultiPartUpload initiateMultiPartUpload() throws IOException { + final ObjectMetadata om = createDefaultMetadata(); + final InitiateMultipartUploadRequest initiateMPURequest = + new InitiateMultipartUploadRequest(bucket, key, om); + initiateMPURequest.setCannedACL(cannedACL); + try { + return new MultiPartUpload( + client.initiateMultipartUpload(initiateMPURequest).getUploadId()); + } catch (AmazonServiceException ase) { + throw new IOException("Unable to initiate MultiPartUpload (server side)" + + ": " + ase, ase); + } catch (AmazonClientException ace) { + throw new IOException("Unable to initiate MultiPartUpload (client side)" + + ": " + ace, ace); + } + } + + private void putObject() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket, + key); + } + final ObjectMetadata om = createDefaultMetadata(); + om.setContentLength(buffer.size()); + final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, + new ByteArrayInputStream(buffer.toByteArray()), om); + putObjectRequest.setCannedAcl(cannedACL); + putObjectRequest.setGeneralProgressListener(progressListener); + ListenableFuture putObjectResult = + executorService.submit(new Callable() { + @Override + public PutObjectResult call() throws Exception { + return client.putObject(putObjectRequest); + } + }); + //wait for completion + try { + putObjectResult.get(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted object upload:" + ie, ie); + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + throw new IOException("Regular upload failed", ee.getCause()); + } + } + + private class MultiPartUpload { + private final String uploadId; + private final List> partETagsFutures; + + public MultiPartUpload(String uploadId) { + this.uploadId = uploadId; + this.partETagsFutures = new ArrayList>(); + if (LOG.isDebugEnabled()) { + LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " + + "id '{}'", bucket, key, uploadId); + } + } + + public void uploadPartAsync(ByteArrayInputStream inputStream, + int partSize) { + final int currentPartNumber = partETagsFutures.size() + 1; + final UploadPartRequest request = + new UploadPartRequest().withBucketName(bucket).withKey(key) + .withUploadId(uploadId).withInputStream(inputStream) + .withPartNumber(currentPartNumber).withPartSize(partSize); + request.setGeneralProgressListener(progressListener); + ListenableFuture partETagFuture = + executorService.submit(new Callable() { + @Override + public PartETag call() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Uploading part {} for id '{}'", currentPartNumber, + uploadId); + } + return client.uploadPart(request).getPartETag(); + } + }); + partETagsFutures.add(partETagFuture); + } + + public List waitForAllPartUploads() throws IOException { + try { + return Futures.allAsList(partETagsFutures).get(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted partUpload:" + ie, ie); + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + //there is no way of recovering so abort + //cancel all partUploads + for (ListenableFuture future : partETagsFutures) { + future.cancel(true); + } + //abort multipartupload + this.abort(); + throw new IOException("Part upload failed in multi-part upload with " + + "id '" +uploadId + "':" + ee, ee); + } + //should not happen? + return null; + } + + public void complete(List partETags) { + if (LOG.isDebugEnabled()) { + LOG.debug("Completing multi-part upload for key '{}', id '{}'", key, + uploadId); + } + final CompleteMultipartUploadRequest completeRequest = + new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags); + client.completeMultipartUpload(completeRequest); + + } + + public void abort() { + LOG.warn("Aborting multi-part upload with id '{}'", uploadId); + try { + client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, + key, uploadId)); + } catch (Exception e2) { + LOG.warn("Unable to abort multipart upload, you may need to purge " + + "uploaded parts: " + e2, e2); + } + } + } + + private static class ProgressableListener implements ProgressListener { + private final Progressable progress; + + public ProgressableListener(Progressable progress) { + this.progress = progress; + } + + public void progressChanged(ProgressEvent progressEvent) { + if (progress != null) { + progress.progress(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 2373e7e..1a30d6f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -88,7 +88,8 @@ public class S3AFileSystem extends FileSystem { private int maxKeys; private long partSize; private TransferManager transfers; - private int partSizeThreshold; + private ThreadPoolExecutor threadPoolExecutor; + private int multiPartThreshold; public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); private CannedAccessControlList cannedACL; private String serverSideEncryptionAlgorithm; @@ -237,7 +238,7 @@ public class S3AFileSystem extends FileSystem { maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, + multiPartThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD); if (partSize < 5 * 1024 * 1024) { @@ -245,9 +246,9 @@ public class S3AFileSystem extends FileSystem { partSize = 5 * 1024 * 1024; } - if (partSizeThreshold < 5 * 1024 * 1024) { + if (multiPartThreshold < 5 * 1024 * 1024) { LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); - partSizeThreshold = 5 * 1024 * 1024; + multiPartThreshold = 5 * 1024 * 1024; } int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); @@ -262,20 +263,20 @@ public class S3AFileSystem extends FileSystem { LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(maxThreads * conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS)); - ThreadPoolExecutor tpe = new ThreadPoolExecutor( + threadPoolExecutor = new ThreadPoolExecutor( coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, newDaemonThreadFactory("s3a-transfer-shared-")); - tpe.allowCoreThreadTimeOut(true); + threadPoolExecutor.allowCoreThreadTimeOut(true); TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); transferConfiguration.setMinimumUploadPartSize(partSize); - transferConfiguration.setMultipartUploadThreshold(partSizeThreshold); + transferConfiguration.setMultipartUploadThreshold(multiPartThreshold); - transfers = new TransferManager(s3, tpe); + transfers = new TransferManager(s3, threadPoolExecutor); transfers.setConfiguration(transferConfiguration); String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL); @@ -391,7 +392,12 @@ public class S3AFileSystem extends FileSystem { if (!overwrite && exists(f)) { throw new FileAlreadyExistsException(f + " already exists"); } - + if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { + return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket, + key, progress, statistics, cannedACL, + serverSideEncryptionAlgorithm, partSize, (long)multiPartThreshold, + threadPoolExecutor), statistics); + } // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this, bucket, key, progress, cannedACL, statistics, http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 8e80b92..bf62634 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -213,13 +213,13 @@ If you do any of these: change your credentials immediately! fs.s3a.connection.establish.timeout 5000 - Socket connection setup timeout in seconds. + Socket connection setup timeout in milliseconds. fs.s3a.connection.timeout 50000 - Socket connection timeout in seconds. + Socket connection timeout in milliseconds. @@ -292,7 +292,7 @@ If you do any of these: change your credentials immediately! fs.s3a.buffer.dir ${hadoop.tmp.dir}/s3a Comma separated list of directories that will be used to buffer file - uploads to. + uploads to. No effect if fs.s3a.fast.upload is true. @@ -301,6 +301,40 @@ If you do any of these: change your credentials immediately! The implementation class of the S3A Filesystem +### S3AFastOutputStream + **Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk** + + + fs.s3a.fast.upload + false + Upload directly from memory instead of buffering to + disk first. Memory usage and parallelism can be controlled as up to + fs.s3a.multipart.size memory is consumed for each (part)upload actively + uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks) + + + + fs.s3a.fast.buffer.size + 1048576 + Size (in bytes) of initial memory buffer allocated for an + upload. No effect if fs.s3a.fast.upload is false. + + +Writes are buffered in memory instead of to a file on local disk. This +removes the throughput bottleneck of the local disk write and read cycle +before starting the actual upload. Furthermore, it allows handling files that +are larger than the remaining local disk space. + +However, non-trivial memory tuning is needed for optimal results and careless +settings could cause memory overflow. Up to `fs.s3a.threads.max` parallel +(part)uploads are active. Furthermore, up to `fs.s3a.max.total.tasks` +additional part(uploads) can be waiting (and thus memory buffers are created). +The memory buffer is uploaded as a single upload if it is not larger than +`fs.s3a.multipart.threshold`. Else, a multi-part upload is initiatated and +parts of size `fs.s3a.multipart.size` are used to protect against overflowing +the available memory. These settings should be tuned to the envisioned +workflow (some large files, many small ones, ...) and the physical +limitations of the machine and cluster (memory, network bandwidth). ## Testing the S3 filesystem clients @@ -334,7 +368,7 @@ each filesystem for its testing. The contents of each bucket will be destroyed during the test process: do not use the bucket for any purpose other than testing. Furthermore, for s3a, all in-progress multi-part uploads to the bucket will be aborted at the -start of a test (by forcing fs.s3a.multipart.purge=true) to clean up the +start of a test (by forcing `fs.s3a.multipart.purge=true`) to clean up the temporary state of previously failed tests. Example: @@ -392,14 +426,14 @@ Example: ## File `contract-test-options.xml` The file `hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml` -must be created and configured for the test fileystems. +must be created and configured for the test filesystems. If a specific file `fs.contract.test.fs.*` test path is not defined for any of the filesystems, those tests will be skipped. The standard S3 authentication details must also be provided. This can be through copy-and-paste of the `auth-keys.xml` credentials, or it can be -through direct XInclude inclustion. +through direct XInclude inclusion. #### s3:// http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java new file mode 100644 index 0000000..e507cf6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +/** + * Tests regular and multi-part upload functionality for S3AFastOutputStream. + * File sizes are kept small to reduce test duration on slow connections + */ +public class TestS3AFastOutputStream { + private FileSystem fs; + + + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024); + conf.setInt(Constants.MULTIPART_SIZE, 5 * 1024 * 1024); + conf.setBoolean(Constants.FAST_UPLOAD, true); + fs = S3ATestUtils.createTestFileSystem(conf); + } + + @After + public void tearDown() throws Exception { + if (fs != null) { + fs.delete(getTestPath(), true); + } + } + + protected Path getTestPath() { + return new Path("/tests3a"); + } + + @Test + public void testRegularUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024); + } + + @Test + public void testMultiPartUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * + 1024); + } +}