spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxwing <...@git.apache.org>
Subject [GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Date Tue, 12 Sep 2017 00:22:13 GMT
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18317#discussion_r138194101
  
    --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java ---
    @@ -0,0 +1,315 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.io;
    +
    +import com.google.common.base.Preconditions;
    +import org.apache.spark.storage.StorageUtils;
    +import org.apache.spark.util.ThreadUtils;
    +
    +import javax.annotation.concurrent.GuardedBy;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.ByteBuffer;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +/**
    + * {@link InputStream} implementation which asynchronously reads ahead from the underlying
input
    + * stream when specified amount of data has been read from the current buffer. It does
it by maintaining
    + * two buffer - active buffer and read ahead buffer. Active buffer contains data which
should be returned
    + * when a read() call is issued. The read ahead buffer is used to asynchronously read
from the underlying
    + * input stream and once the current active buffer is exhausted, we flip the two buffers
so that we can
    + * start reading from the read ahead buffer without being blocked in disk I/O.
    + */
    +public class ReadAheadInputStream extends InputStream {
    +
    +  private ReentrantLock stateChangeLock = new ReentrantLock();
    +
    +  @GuardedBy("stateChangeLock")
    +  private ByteBuffer activeBuffer;
    +
    +  @GuardedBy("stateChangeLock")
    +  private ByteBuffer readAheadBuffer;
    +
    +  @GuardedBy("stateChangeLock")
    +  private boolean endOfStream;
    +
    +  @GuardedBy("stateChangeLock")
    +  // true if async read is in progress
    +  private boolean readInProgress;
    +
    +  @GuardedBy("stateChangeLock")
    +  // true if read is aborted due to an exception in reading from underlying input stream.
    +  private boolean readAborted;
    +
    +  @GuardedBy("stateChangeLock")
    +  private Exception readException;
    +
    +  // If the remaining data size in the current buffer is below this threshold,
    +  // we issue an async read from the underlying input stream.
    +  private final int readAheadThresholdInBytes;
    +
    +  private final InputStream underlyingInputStream;
    +
    +  private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
    +
    +  private final Condition asyncReadComplete = stateChangeLock.newCondition();
    +
    +  private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(()
-> new byte[1]);
    +
    +  /**
    +   * Creates a <code>ReadAheadInputStream</code> with the specified buffer
size and read-ahead
    +   * threshold
    +   *
    +   * @param   inputStream                 The underlying input stream.
    +   * @param   bufferSizeInBytes           The buffer size.
    +   * @param   readAheadThresholdInBytes   If the active buffer has less data than the
read-ahead
    +   *                                      threshold, an async read is triggered.
    +   */
    +  public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes)
{
    +    Preconditions.checkArgument(bufferSizeInBytes > 0,
    +            "bufferSizeInBytes should be greater than 0");
    +    Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
    +                    readAheadThresholdInBytes < bufferSizeInBytes,
    +            "readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes"
);
    +    activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
    +    readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
    +    this.readAheadThresholdInBytes = readAheadThresholdInBytes;
    +    this.underlyingInputStream = inputStream;
    +    activeBuffer.flip();
    +    readAheadBuffer.flip();
    +  }
    +
    +  private boolean isEndOfStream() {
    +    if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() &&
endOfStream) {
    +      return true;
    +    }
    +    return  false;
    +  }
    +  private void readAsync(final ByteBuffer byteBuffer) throws IOException {
    +    stateChangeLock.lock();
    +    if (endOfStream || readInProgress) {
    +      stateChangeLock.unlock();
    +      return;
    +    }
    +    byteBuffer.position(0);
    +    byteBuffer.flip();
    +    readInProgress = true;
    +    final byte[] arr = byteBuffer.array();
    +    stateChangeLock.unlock();
    +    executorService.execute(new Runnable() {
    +      @Override
    +      public void run() {
    +        // Please note that it is safe to release the lock and read into the read ahead
buffer
    +        // because either of following two conditions will hold - 1. The active buffer
has
    +        // data available to read so the reader will not read from the read ahead buffer.
    +        // 2. This is the first time read is called or the active buffer is exhausted,
    +        // in that case the reader waits for this async read to complete.
    +        // So there is no race condition in both the situations.
    +        boolean handled = false;
    +        int read = 0;
    +        Exception exception = new Exception("Unknown exception in ReadAheadInputStream");
    +        try {
    +          while (true) {
    +            read = underlyingInputStream.read(arr);
    +            if (0 != read) break;
    +          }
    +          handled = true;
    +        } catch (Exception ex) {
    +          exception = ex;
    +        } finally {
    +          stateChangeLock.lock();
    +          if (read < 0 || (exception instanceof EOFException) ) {
    +            endOfStream = true;
    +          } else if (!handled) {
    +            readAborted = true;
    +            readException = exception != null ? exception: new Exception("Unknown exception
in ReadAheadInputStream");
    +          } else {
    +            byteBuffer.limit(read);
    +          }
    +          readInProgress = false;
    +          signalAsyncReadComplete();
    +          stateChangeLock.unlock();
    +        }
    +      }
    +    });
    +  }
    +
    +
    +  private void signalAsyncReadComplete() {
    +    stateChangeLock.lock();
    +    try {
    +      asyncReadComplete.signalAll();
    +    } finally {
    +      stateChangeLock.unlock();
    +    }
    +  }
    +
    +  private void waitForAsyncReadComplete() {
    +    stateChangeLock.lock();
    +    try {
    +      if (readInProgress)
    +      asyncReadComplete.await();
    +    } catch (InterruptedException e) {
    +    } finally {
    +      stateChangeLock.unlock();
    +    }
    +  }
    +
    +  @Override
    +  public int read() throws IOException {
    +    int val = read(oneByte.get(), 0, 1);
    +    if (val == -1) {
    +      return -1;
    +    }
    +    return oneByte.get()[0] & 0xFF;
    +  }
    +
    +  @Override
    +  public int read(byte[] b, int offset, int len) throws IOException {
    +    if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length)
{
    +      throw new IndexOutOfBoundsException();
    +    }
    +    if (len == 0) {
    +      return 0;
    +    }
    +    stateChangeLock.lock();
    +    try {
    +      len = readInternal(b, offset, len);
    +    }
    +    finally {
    +      stateChangeLock.unlock();
    +    }
    +    return len;
    +  }
    +
    +  /**
    +   * Internal read function which should be called only from read() api. The assumption
is that
    +   * the stateChangeLock is already acquired in the caller before calling this function.
    +   */
    +  private int readInternal(byte[] b, int offset, int len) throws IOException {
    +    assert (stateChangeLock.isLocked());
    +    if (!activeBuffer.hasRemaining()) {
    +      if (!readInProgress) {
    +        // This condition will only be triggered for the first time read is called.
    +        readAsync(activeBuffer);
    +      }
    +      waitForAsyncReadComplete();
    +    }
    +    if (readAborted) {
    +      throw new IOException(readException);
    +    }
    +    if (isEndOfStream()) {
    +      return -1;
    +    }
    +    len = Math.min(len, activeBuffer.remaining());
    +    activeBuffer.get(b, offset, len);
    +
    +    if (activeBuffer.remaining() <= readAheadThresholdInBytes && !readAheadBuffer.hasRemaining())
{
    +      readAsync(readAheadBuffer);
    +    }
    +    if (!activeBuffer.hasRemaining()) {
    +      ByteBuffer temp = activeBuffer;
    +      activeBuffer = readAheadBuffer;
    +      readAheadBuffer = temp;
    +    }
    +    return len;
    +  }
    +
    +  @Override
    +  public int available() throws IOException {
    +    stateChangeLock.lock();
    +    // Make sure we have no integer overflow.
    +    int val = (int) Math.min((long) Integer.MAX_VALUE,
    +            (long) activeBuffer.remaining() + readAheadBuffer.remaining());
    +    stateChangeLock.unlock();
    +    return val;
    +  }
    +
    +  @Override
    +  public long skip(long n) throws IOException {
    +    if (n <= 0L) {
    +      return 0L;
    +    }
    +    stateChangeLock.lock();
    +    long skipped;
    +    try {
    +      skipped = skipInternal(n);
    +    } finally {
    +      stateChangeLock.unlock();
    +    }
    +    return skipped;
    +  }
    +
    +  /**
    +   * Internal skip function which should be called only from skip() api. The assumption
is that
    +   * the stateChangeLock is already acquired in the caller before calling this function.
    +   */
    +  private long skipInternal(long n) throws IOException {
    +    assert (stateChangeLock.isLocked());
    +    if (readInProgress) {
    +      waitForAsyncReadComplete();
    +    }
    +    if (available() >= n) {
    +      // we can skip from the internal buffers
    +      int toSkip = (int)n;
    +      if (toSkip <= activeBuffer.remaining()) {
    +        // Only skipping from active buffer is sufficient
    +        activeBuffer.position(toSkip + activeBuffer.position());
    +        return n;
    +      }
    +      // We need to skip from both active buffer and read ahead buffer
    +      toSkip -= activeBuffer.remaining();
    +      activeBuffer.position(0);
    +      activeBuffer.flip();
    +      readAheadBuffer.position(toSkip + readAheadBuffer.position());
    +      // flip the active and read ahead buffer
    +      ByteBuffer temp = activeBuffer;
    +      activeBuffer = readAheadBuffer;
    +      readAheadBuffer = temp;
    +      readAsync(readAheadBuffer);
    +      return n;
    +    }
    +    int skippedBytes = available();
    +    long toSkip = n - skippedBytes;
    +    activeBuffer.position(0);
    +    activeBuffer.flip();
    +    readAheadBuffer.position(0);
    +    readAheadBuffer.flip();
    +    long skippedFromInputStream = underlyingInputStream.skip(toSkip);
    +    readAsync(activeBuffer);
    +    return skippedBytes + skippedFromInputStream;
    +  }
    +
    +  @Override
    +  public void close() throws IOException {
    +    executorService.shutdown();
    +    try {
    +      executorService.awaitTermination(10, TimeUnit.MILLISECONDS);
    +    } catch (InterruptedException e) {
    +    }
    +    underlyingInputStream.close();
    +    stateChangeLock.lock();
    +    try {
    +      StorageUtils.dispose(activeBuffer);
    +      StorageUtils.dispose(readAheadBuffer);
    +    }
    +    finally {
    --- End diff --
    
    nit: use `} finally {`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message