spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sitalkedia <...@git.apache.org>
Subject [GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Date Tue, 29 Aug 2017 19:29:38 GMT
Github user sitalkedia commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18317#discussion_r135889304
  
    --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java ---
    @@ -0,0 +1,292 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.io;
    +
    +import com.google.common.base.Preconditions;
    +import org.apache.spark.storage.StorageUtils;
    +
    +import javax.annotation.concurrent.GuardedBy;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.ByteBuffer;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +/**
    + * {@link InputStream} implementation which asynchronously reads ahead from the underlying
input
    + * stream when specified amount of data has been read from the current buffer. It does
it by maintaining
    + * two buffer - active buffer and read ahead buffer. Active buffer contains data which
should be returned
    + * when a read() call is issued. The read ahead buffer is used to asynchronously read
from the underlying
    + * input stream and once the current active buffer is exhausted, we flip the two buffers
so that we can
    + * start reading from the read ahead buffer without being blocked in disk I/O.
    + */
    +public class ReadAheadInputStream extends InputStream {
    +
    +  private ReentrantLock stateChangeLock = new ReentrantLock();
    +
    +  @GuardedBy("stateChangeLock")
    +  private ByteBuffer activeBuffer;
    +
    +  @GuardedBy("stateChangeLock")
    +  private ByteBuffer readAheadBuffer;
    +
    +  @GuardedBy("stateChangeLock")
    +  private boolean endOfStream;
    +
    +  @GuardedBy("stateChangeLock")
    +  // true if async read is in progress
    +  private boolean isReadInProgress;
    +
    +  @GuardedBy("stateChangeLock")
    +  // true if read is aborted due to an exception in reading from underlying input stream.
    +  private boolean isReadAborted;
    +
    +  @GuardedBy("stateChangeLock")
    +  private Exception readException;
    +
    +  // If the remaining data size in the current buffer is below this threshold,
    +  // we issue an async read from the underlying input stream.
    +  private final int readAheadThresholdInBytes;
    +
    +  private final InputStream underlyingInputStream;
    +
    +  private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    +
    +  private final Condition asyncReadComplete = stateChangeLock.newCondition();
    +
    +  private final byte[] oneByte = new byte[1];
    +
    +  /**
    +   * Creates a <code>ReadAheadInputStream</code> with the specified buffer
size and read-ahead
    +   * threshold
    +   *
    +   * @param   inputStream                 The underlying input stream.
    +   * @param   bufferSizeInBytes           The buffer size.
    +   * @param   readAheadThresholdInBytes   If the active buffer has less data than the
read-ahead
    +   *                                      threshold, an async read is triggered.
    +   */
    +  public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes)
{
    +    Preconditions.checkArgument(bufferSizeInBytes > 0,
    +            "bufferSizeInBytes should be greater than 0");
    +    Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
    +                    readAheadThresholdInBytes < bufferSizeInBytes,
    +            "readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes"
);
    +    activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
    +    readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
    +    this.readAheadThresholdInBytes = readAheadThresholdInBytes;
    +    this.underlyingInputStream = inputStream;
    +    activeBuffer.flip();
    +    readAheadBuffer.flip();
    +  }
    +
    +  private boolean isEndOfStream() {
    +    if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 &&
endOfStream) {
    +      return true;
    +    }
    +    return  false;
    +  }
    +
    +
    +  private void readAsync(final ByteBuffer byteBuffer) throws IOException {
    +    stateChangeLock.lock();
    +    if (endOfStream || isReadInProgress) {
    +      stateChangeLock.unlock();
    +      return;
    +    }
    +    byteBuffer.position(0);
    +    byteBuffer.flip();
    +    isReadInProgress = true;
    +    stateChangeLock.unlock();
    +    executorService.execute(() -> {
    +      byte[] arr;
    +      stateChangeLock.lock();
    +      arr = byteBuffer.array();
    +      stateChangeLock.unlock();
    +      // Please note that it is safe to release the lock and read into the read ahead
buffer
    +      // because either of following two conditions will hold - 1. The active buffer
has
    +      // data available to read so the reader will not read from the read ahead buffer.
    +      // 2. This is the first time read is called or the active buffer is exhausted,
    +      // in that case the reader waits for this async read to complete.
    +      // So there is no race condition in both the situations.
    +      int nRead = 0;
    +      while (nRead == 0) {
    +        try {
    +          nRead = underlyingInputStream.read(arr);
    +          if (nRead < 0) {
    +            // We hit end of the underlying input stream
    +            break;
    +          }
    +        } catch (Exception e) {
    +          stateChangeLock.lock();
    +          // We hit a read exception, which should be propagated to the reader
    +          // in the next read() call.
    +          isReadAborted = true;
    +          readException = e;
    +          stateChangeLock.unlock();
    +        }
    +      }
    +      stateChangeLock.lock();
    +      if (nRead < 0) {
    +        endOfStream = true;
    +      }
    +      else {
    +        // fill the byte buffer
    +        byteBuffer.limit(nRead);
    +      }
    +      isReadInProgress = false;
    +      signalAsyncReadComplete();
    +      stateChangeLock.unlock();
    +    });
    +  }
    +
    +  private void signalAsyncReadComplete() {
    +    stateChangeLock.lock();
    +    try {
    +      asyncReadComplete.signalAll();
    +    } finally {
    +      stateChangeLock.unlock();
    +    }
    +  }
    +
    +  private void waitForAsyncReadComplete() {
    +    stateChangeLock.lock();
    +    try {
    +      asyncReadComplete.await();
    +    } catch (InterruptedException e) {
    +    } finally {
    +      stateChangeLock.unlock();
    +    }
    +  }
    +
    +  @Override
    +  public synchronized int read() throws IOException {
    +    int val = read(oneByte, 0, 1);
    +    if (val == -1) {
    +      return -1;
    +    }
    +    return oneByte[0] & 0xFF;
    +  }
    +
    +  @Override
    +  public synchronized int read(byte[] b, int offset, int len) throws IOException {
    --- End diff --
    
    There are few reasons I am hesitant to remove the synchronized from the public apis.
    
    1. Some variables like executorService, underlyingInputStream are not protected by the
statechange lock. There might be a race condition when we try to execute executorService.execute
call at the same time shutdown the executor service in close function call.
    2. I am not sure what will be the behavior if multiple threads are waiting on  asyncReadComplete.await()
call. Is the signaling thread going to wake only one of them? If yes, is it going to do it
in the order? If it is not guaranteed to wake up the callers in order, then the read calls
which are issued from different threads at different point of time are not guaranteed to read
them in order. Is that acceptable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message