Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9D879200BA0 for ; Fri, 14 Oct 2016 11:22:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9BEFA160AD9; Fri, 14 Oct 2016 09:22:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 944C2160ADD for ; Fri, 14 Oct 2016 11:22:25 +0200 (CEST) Received: (qmail 82245 invoked by uid 500); 14 Oct 2016 09:22:24 -0000 Mailing-List: contact common-issues-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-issues@hadoop.apache.org Received: (qmail 82010 invoked by uid 99); 14 Oct 2016 09:22:24 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Oct 2016 09:22:24 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 5ABCF2C4C70 for ; Fri, 14 Oct 2016 09:22:24 +0000 (UTC) Date: Fri, 14 Oct 2016 09:22:24 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: common-issues@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (HADOOP-13560) S3ABlockOutputStream to support huge (many GB) file writes MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 14 Oct 2016 09:22:26 -0000 [ https://issues.apache.org/jira/browse/HADOOP-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15574765#comment-15574765 ] ASF GitHub Bot commented on HADOOP-13560: ----------------------------------------- Github user thodemoor commented on a diff in the pull request: https://github.com/apache/hadoop/pull/130#discussion_r83381587 --- Diff: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java --- @@ -0,0 +1,819 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.util.DirectBufferPool; + +import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*; + +/** + * Set of classes to support output streaming into blocks which are then + * uploaded as partitions. + */ +final class S3ADataBlocks { + + private static final Logger LOG = + LoggerFactory.getLogger(S3ADataBlocks.class); + + private S3ADataBlocks() { + } + + /** + * Validate args to a write command. These are the same validation checks + * expected for any implementation of {@code OutputStream.write()}. + * @param b byte array containing data + * @param off offset in array where to start + * @param len number of bytes to be written + * @throws NullPointerException for a null buffer + * @throws IndexOutOfBoundsException if indices are out of range + */ + static void validateWriteArgs(byte[] b, int off, int len) + throws IOException { + Preconditions.checkNotNull(b); + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException( + "write (b[" + b.length + "], " + off + ", " + len + ')'); + } + } + + /** + * Create a factory. + * @param owner factory owner + * @param name factory name -the option from {@link Constants}. + * @return the factory, ready to be initialized. + * @throws IllegalArgumentException if the name is unknown. + */ + static BlockFactory createFactory(S3AFileSystem owner, + String name) { + switch (name) { + case Constants.FAST_UPLOAD_BUFFER_ARRAY: + return new ArrayBlockFactory(owner); + case Constants.FAST_UPLOAD_BUFFER_DISK: + return new DiskBlockFactory(owner); + case Constants.FAST_UPLOAD_BYTEBUFFER: + return new ByteBufferBlockFactory(owner); + default: + throw new IllegalArgumentException("Unsupported block buffer" + + " \"" + name + '"'); + } + } + + /** + * Base class for block factories. + */ + static abstract class BlockFactory implements Closeable { + + private final S3AFileSystem owner; + + protected BlockFactory(S3AFileSystem owner) { + this.owner = owner; + } + + + /** + * Create a block. + * @param limit limit of the block. + * @return a new block. + */ + abstract DataBlock create(int limit) throws IOException; + + /** + * Implement any close/cleanup operation. + * Base class is a no-op + * @throws IOException -ideally, it shouldn't. + */ + @Override + public void close() throws IOException { + } + + /** + * Owner. + */ + protected S3AFileSystem getOwner() { + return owner; + } + } + + /** + * This represents a block being uploaded. + */ + static abstract class DataBlock implements Closeable { + + enum DestState {Writing, Upload, Closed} + + private volatile DestState state = Writing; + + /** + * Atomically enter a state, verifying current state. + * @param current current state. null means "no check" + * @param next next state + * @throws IllegalStateException if the current state is not as expected + */ + protected synchronized final void enterState(DestState current, + DestState next) + throws IllegalStateException { + verifyState(current); + LOG.debug("{}: entering state {}", this, next); + state = next; + } + + /** + * Verify that the block is in the declared state. + * @param expected expected state. + * @throws IllegalStateException if the DataBlock is in the wrong state + */ + protected final void verifyState(DestState expected) + throws IllegalStateException { + if (expected != null && state != expected) { + throw new IllegalStateException("Expected stream state " + expected + + " -but actual state is " + state + " in " + this); + } + } + + /** + * Current state. + * @return the current state. + */ + final DestState getState() { + return state; + } + + /** + * Return the current data size. + * @return the size of the data + */ + abstract int dataSize(); + + /** + * Predicate to verify that the block has the capacity to write + * the given set of bytes. + * @param bytes number of bytes desired to be written. + * @return true if there is enough space. + */ + abstract boolean hasCapacity(long bytes); + + /** + * Predicate to check if there is data in the block. + * @return true if there is + */ + boolean hasData() { + return dataSize() > 0; + } + + /** + * The remaining capacity in the block before it is full. + * @return the number of bytes remaining. + */ + abstract int remainingCapacity(); + + /** + * Write a series of bytes from the buffer, from the offset. + * Returns the number of bytes written. + * Only valid in the state {@code Writing}. + * Base class verifies the state but does no writing. + * @param buffer buffer + * @param offset offset + * @param length length of write + * @return number of bytes written + * @throws IOException trouble + */ + int write(byte[] buffer, int offset, int length) throws IOException { + verifyState(Writing); + Preconditions.checkArgument(buffer != null, "Null buffer"); + Preconditions.checkArgument(length >= 0, "length is negative"); + Preconditions.checkArgument(offset >= 0, "offset is negative"); + Preconditions.checkArgument( + !(buffer.length - offset < length), + "buffer shorter than amount of data to write"); + return 0; + } + + /** + * Flush the output. + * Only valid in the state {@code Writing}. + * In the base class, this is a no-op + * @throws IOException any IO problem. + */ + void flush() throws IOException { + verifyState(Writing); + } + + /** + * Switch to the upload state and return a stream for uploading. + * Base class calls {@link #enterState(DestState, DestState)} to + * manage the state machine. + * @return the stream + * @throws IOException trouble + */ + InputStream startUpload() throws IOException { + LOG.debug("Start datablock upload"); + enterState(Writing, Upload); + return null; + } + + /** + * Enter the closed state. + * @return true if the class was in any other state, implying that + * the subclass should do its close operations + */ + protected synchronized boolean enterClosedState() { + if (!state.equals(Closed)) { + enterState(null, Closed); + return true; + } else { + return false; + } + } + + @Override + public void close() throws IOException { + if (enterClosedState()) { + LOG.debug("Closed {}", this); + innerClose(); + } + } + + /** + * Inner close logic for subclasses to implement. + */ + protected void innerClose() throws IOException { + + } + + } + + // ==================================================================== + + /** + * Use byte arrays on the heap for storage. + */ + static class ArrayBlockFactory extends BlockFactory { + + ArrayBlockFactory(S3AFileSystem owner) { + super(owner); + } + + @Override + DataBlock create(int limit) throws IOException { + return new ByteArrayBlock(limit); + } + + } + + /** + * Stream to memory via a {@code ByteArrayOutputStream}. + * + * This was taken from {@code S3AFastOutputStream} and has the + * same problem which surfaced there: it consumes heap space + * proportional to the mismatch between writes to the stream and + * the JVM-wide upload bandwidth to the S3 endpoint. --- End diff -- but bounded by ... > S3ABlockOutputStream to support huge (many GB) file writes > ---------------------------------------------------------- > > Key: HADOOP-13560 > URL: https://issues.apache.org/jira/browse/HADOOP-13560 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 2.9.0 > Reporter: Steve Loughran > Assignee: Steve Loughran > Attachments: HADOOP-13560-branch-2-001.patch, HADOOP-13560-branch-2-002.patch, HADOOP-13560-branch-2-003.patch, HADOOP-13560-branch-2-004.patch > > > An AWS SDK [issue|https://github.com/aws/aws-sdk-java/issues/367] highlights that metadata isn't copied on large copies. > 1. Add a test to do that large copy/rname and verify that the copy really works > 2. Verify that metadata makes it over. > Verifying large file rename is important on its own, as it is needed for very large commit operations for committers using rename -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-issues-help@hadoop.apache.org