Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A97EF200D20 for ; Tue, 12 Sep 2017 02:22:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A86421609C4; Tue, 12 Sep 2017 00:22:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 328771609CB for ; Tue, 12 Sep 2017 02:22:19 +0200 (CEST) Received: (qmail 79585 invoked by uid 500); 12 Sep 2017 00:22:18 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 79272 invoked by uid 99); 12 Sep 2017 00:22:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Sep 2017 00:22:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27C8EF57D3; Tue, 12 Sep 2017 00:22:13 +0000 (UTC) From: zsxwing To: reviews@spark.apache.org Reply-To: reviews@spark.apache.org References: In-Reply-To: Subject: [GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am... Content-Type: text/plain Message-Id: <20170912002214.27C8EF57D3@git1-us-west.apache.org> Date: Tue, 12 Sep 2017 00:22:13 +0000 (UTC) archived-at: Tue, 12 Sep 2017 00:22:20 -0000 Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138194101 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { + Preconditions.checkArgument(bufferSizeInBytes > 0, + "bufferSizeInBytes should be greater than 0"); + Preconditions.checkArgument(readAheadThresholdInBytes > 0 && + readAheadThresholdInBytes < bufferSizeInBytes, + "readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); + activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); + readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); + this.readAheadThresholdInBytes = readAheadThresholdInBytes; + this.underlyingInputStream = inputStream; + activeBuffer.flip(); + readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { + if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; + } + return false; + } + private void readAsync(final ByteBuffer byteBuffer) throws IOException { + stateChangeLock.lock(); + if (endOfStream || readInProgress) { + stateChangeLock.unlock(); + return; + } + byteBuffer.position(0); + byteBuffer.flip(); + readInProgress = true; + final byte[] arr = byteBuffer.array(); + stateChangeLock.unlock(); + executorService.execute(new Runnable() { + @Override + public void run() { + // Please note that it is safe to release the lock and read into the read ahead buffer + // because either of following two conditions will hold - 1. The active buffer has + // data available to read so the reader will not read from the read ahead buffer. + // 2. This is the first time read is called or the active buffer is exhausted, + // in that case the reader waits for this async read to complete. + // So there is no race condition in both the situations. + boolean handled = false; + int read = 0; + Exception exception = new Exception("Unknown exception in ReadAheadInputStream"); + try { + while (true) { + read = underlyingInputStream.read(arr); + if (0 != read) break; + } + handled = true; + } catch (Exception ex) { + exception = ex; + } finally { + stateChangeLock.lock(); + if (read < 0 || (exception instanceof EOFException) ) { + endOfStream = true; + } else if (!handled) { + readAborted = true; + readException = exception != null ? exception: new Exception("Unknown exception in ReadAheadInputStream"); + } else { + byteBuffer.limit(read); + } + readInProgress = false; + signalAsyncReadComplete(); + stateChangeLock.unlock(); + } + } + }); + } + + + private void signalAsyncReadComplete() { + stateChangeLock.lock(); + try { + asyncReadComplete.signalAll(); + } finally { + stateChangeLock.unlock(); + } + } + + private void waitForAsyncReadComplete() { + stateChangeLock.lock(); + try { + if (readInProgress) + asyncReadComplete.await(); + } catch (InterruptedException e) { + } finally { + stateChangeLock.unlock(); + } + } + + @Override + public int read() throws IOException { + int val = read(oneByte.get(), 0, 1); + if (val == -1) { + return -1; + } + return oneByte.get()[0] & 0xFF; + } + + @Override + public int read(byte[] b, int offset, int len) throws IOException { + if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + stateChangeLock.lock(); + try { + len = readInternal(b, offset, len); + } + finally { + stateChangeLock.unlock(); + } + return len; + } + + /** + * Internal read function which should be called only from read() api. The assumption is that + * the stateChangeLock is already acquired in the caller before calling this function. + */ + private int readInternal(byte[] b, int offset, int len) throws IOException { + assert (stateChangeLock.isLocked()); + if (!activeBuffer.hasRemaining()) { + if (!readInProgress) { + // This condition will only be triggered for the first time read is called. + readAsync(activeBuffer); + } + waitForAsyncReadComplete(); + } + if (readAborted) { + throw new IOException(readException); + } + if (isEndOfStream()) { + return -1; + } + len = Math.min(len, activeBuffer.remaining()); + activeBuffer.get(b, offset, len); + + if (activeBuffer.remaining() <= readAheadThresholdInBytes && !readAheadBuffer.hasRemaining()) { + readAsync(readAheadBuffer); + } + if (!activeBuffer.hasRemaining()) { + ByteBuffer temp = activeBuffer; + activeBuffer = readAheadBuffer; + readAheadBuffer = temp; + } + return len; + } + + @Override + public int available() throws IOException { + stateChangeLock.lock(); + // Make sure we have no integer overflow. + int val = (int) Math.min((long) Integer.MAX_VALUE, + (long) activeBuffer.remaining() + readAheadBuffer.remaining()); + stateChangeLock.unlock(); + return val; + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0L) { + return 0L; + } + stateChangeLock.lock(); + long skipped; + try { + skipped = skipInternal(n); + } finally { + stateChangeLock.unlock(); + } + return skipped; + } + + /** + * Internal skip function which should be called only from skip() api. The assumption is that + * the stateChangeLock is already acquired in the caller before calling this function. + */ + private long skipInternal(long n) throws IOException { + assert (stateChangeLock.isLocked()); + if (readInProgress) { + waitForAsyncReadComplete(); + } + if (available() >= n) { + // we can skip from the internal buffers + int toSkip = (int)n; + if (toSkip <= activeBuffer.remaining()) { + // Only skipping from active buffer is sufficient + activeBuffer.position(toSkip + activeBuffer.position()); + return n; + } + // We need to skip from both active buffer and read ahead buffer + toSkip -= activeBuffer.remaining(); + activeBuffer.position(0); + activeBuffer.flip(); + readAheadBuffer.position(toSkip + readAheadBuffer.position()); + // flip the active and read ahead buffer + ByteBuffer temp = activeBuffer; + activeBuffer = readAheadBuffer; + readAheadBuffer = temp; + readAsync(readAheadBuffer); + return n; + } + int skippedBytes = available(); + long toSkip = n - skippedBytes; + activeBuffer.position(0); + activeBuffer.flip(); + readAheadBuffer.position(0); + readAheadBuffer.flip(); + long skippedFromInputStream = underlyingInputStream.skip(toSkip); + readAsync(activeBuffer); + return skippedBytes + skippedFromInputStream; + } + + @Override + public void close() throws IOException { + executorService.shutdown(); + try { + executorService.awaitTermination(10, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + underlyingInputStream.close(); + stateChangeLock.lock(); + try { + StorageUtils.dispose(activeBuffer); + StorageUtils.dispose(readAheadBuffer); + } + finally { --- End diff -- nit: use `} finally {` --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org For additional commands, e-mail: reviews-help@spark.apache.org