hadoop-hdfs-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (HDDS-1496) Support partial chunk reads and checksum verification
Date Thu, 23 May 2019 19:41:02 GMT

     [ https://issues.apache.org/jira/browse/HDDS-1496?focusedWorklogId=247642&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-247642
]

ASF GitHub Bot logged work on HDDS-1496:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/May/19 19:40
            Start Date: 23/May/19 19:40
    Worklog Time Spent: 10m 
      Work Description: bharatviswa504 commented on pull request #804: HDDS-1496. Support
partial chunk reads and checksum verification
URL: https://github.com/apache/hadoop/pull/804#discussion_r287092731
 
 

 ##########
 File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 ##########
 @@ -43,467 +41,334 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 /**
  * An {@link InputStream} used by the REST service in combination with the
  * SCMClient to read the value of a key from a sequence
  * of container chunks.  All bytes of the key value are stored in container
- * chunks.  Each chunk may contain multiple underlying {@link ByteBuffer}
+ * chunks. Each chunk may contain multiple underlying {@link ByteBuffer}
  * instances.  This class encapsulates all state management for iterating
- * through the sequence of chunks and the sequence of buffers within each chunk.
+ * through the sequence of chunks through {@link ChunkInputStream}.
  */
 public class BlockInputStream extends InputStream implements Seekable {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockInputStream.class);
+
   private static final int EOF = -1;
 
   private final BlockID blockID;
+  private final long length;
+  private Pipeline pipeline;
+  private final long containerKey;
+  private final Token<OzoneBlockTokenIdentifier> token;
+  private final boolean verifyChecksum;
   private final String traceID;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
-  private List<ChunkInfo> chunks;
-  // ChunkIndex points to the index current chunk in the buffers or the the
-  // index of chunk which will be read next into the buffers in
-  // readChunkFromContainer().
+  private boolean initialized = false;
+
+  // List of ChunkInputStreams, one for each chunk in the block
+  private List<ChunkInputStream> chunkStreams;
+
+  // chunkOffsets[i] stores the index of the first data byte in
+  // chunkStream i w.r.t the block data.
+  // Let’s say we have chunk size as 40 bytes. And let's say the parent
+  // block stores data from index 200 and has length 400.
+  // The first 40 bytes of this block will be stored in chunk[0], next 40 in
+  // chunk[1] and so on. But since the chunkOffsets are w.r.t the block only
+  // and not the key, the values in chunkOffsets will be [0, 40, 80,....].
+  private long[] chunkOffsets = null;
+
+  // Index of the chunkStream corresponding to the current postion of the
+  // BlockInputStream i.e offset of the data to be read next from this block
   private int chunkIndex;
-  // ChunkIndexOfCurrentBuffer points to the index of chunk read into the
-  // buffers or index of the last chunk in the buffers. It is updated only
-  // when a new chunk is read from container into the buffers.
-  private int chunkIndexOfCurrentBuffer;
-  private long[] chunkOffset;
-  private List<ByteBuffer> buffers;
-  private int bufferIndex;
-  private long bufferPosition;
-  private final boolean verifyChecksum;
 
-  /**
-   * Creates a new BlockInputStream.
-   *
-   * @param blockID block ID of the chunk
-   * @param xceiverClientManager client manager that controls client
-   * @param xceiverClient client to perform container calls
-   * @param chunks list of chunks to read
-   * @param traceID container protocol call traceID
-   * @param verifyChecksum verify checksum
-   * @param initialPosition the initial position of the stream pointer. This
-   *                        position is seeked now if the up-stream was seeked
-   *                        before this was created.
-   */
-  public BlockInputStream(
-      BlockID blockID, XceiverClientManager xceiverClientManager,
-      XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID,
-      boolean verifyChecksum, long initialPosition) throws IOException {
-    this.blockID = blockID;
-    this.traceID = traceID;
-    this.xceiverClientManager = xceiverClientManager;
-    this.xceiverClient = xceiverClient;
-    this.chunks = chunks;
-    this.chunkIndex = 0;
-    this.chunkIndexOfCurrentBuffer = -1;
-    // chunkOffset[i] stores offset at which chunk i stores data in
-    // BlockInputStream
-    this.chunkOffset = new long[this.chunks.size()];
-    initializeChunkOffset();
-    this.buffers = null;
-    this.bufferIndex = 0;
-    this.bufferPosition = -1;
+  // Position of the BlockInputStream is maintainted by this variable till
+  // the stream is initialized. This postion is w.r.t to the block only and
+  // not the key.
+  // For the above example, if we seek to postion 240 before the stream is
+  // initialized, then value of postion will be set to 40.
+  // Once, the stream is initialized, the postion of the stream
+  // will be determined by the current chunkStream and its postion.
+  private long blockPosition = 0;
+
+  // Tracks the chunkIndex corresponding to the last blockPosition so that it
+  // can be reset if a new postion is seeked.
+  private int chunkIndexOfPrevPosition;
+
+  public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+      long containerKey, Token<OzoneBlockTokenIdentifier> token,
+      boolean verifyChecksum, String traceId,
+      XceiverClientManager xceiverClientManager) {
+    this.blockID = blockId;
+    this.length = blockLen;
+    this.pipeline = pipeline;
+    this.containerKey = containerKey;
+    this.token = token;
     this.verifyChecksum = verifyChecksum;
-    if (initialPosition > 0) {
-      // The stream was seeked to a position before the stream was
-      // initialized. So seeking to the position now.
-      seek(initialPosition);
-    }
-  }
-
-  private void initializeChunkOffset() {
-    long tempOffset = 0;
-    for (int i = 0; i < chunks.size(); i++) {
-      chunkOffset[i] = tempOffset;
-      tempOffset += chunks.get(i).getLen();
-    }
+    this.traceID = traceId;
+    this.xceiverClientManager = xceiverClientManager;
   }
 
-  @Override
-  public synchronized int read()
-      throws IOException {
-    checkOpen();
-    int available = prepareRead(1);
-    int dataout = EOF;
+  /**
+   * Initialize the BlockInputStream. Get the BlockData (list of chunks) from
+   * the Container and create the ChunkInputStreams for each Chunk in the Block.
+   */
+  public synchronized void initialize() throws IOException {
 
-    if (available == EOF) {
-      Preconditions
-          .checkState(buffers == null); //should have released by now, see below
-    } else {
-      dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
-    }
+    List<ChunkInfo> chunks = getChunkInfos();
 
-    if (blockStreamEOF()) {
-      // consumer might use getPos to determine EOF,
-      // so release buffers when serving the last byte of data
-      releaseBuffers();
-    }
+    if (chunks != null && !chunks.isEmpty()) {
+      intializeChunkInputStreams(chunks);
 
-    return dataout;
-  }
-
-  @Override
-  public synchronized int read(byte[] b, int off, int len) throws IOException {
-    // According to the JavaDocs for InputStream, it is recommended that
-    // subclasses provide an override of bulk read if possible for performance
-    // reasons.  In addition to performance, we need to do it for correctness
-    // reasons.  The Ozone REST service uses PipedInputStream and
-    // PipedOutputStream to relay HTTP response data between a Jersey thread and
-    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
-    // have a subtle dependency (bug?) on the wrapped stream providing separate
-    // implementations of single-byte read and bulk read.  Without this, get key
-    // responses might close the connection before writing all of the bytes
-    // advertised in the Content-Length.
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return 0;
-    }
-    checkOpen();
-    int total = 0;
-    while (len > 0) {
-      int available = prepareRead(len);
-      if (available == EOF) {
-        Preconditions
-            .checkState(buffers == null); //should have been released by now
-        return total != 0 ? total : EOF;
+      if (blockPosition > 0) {
+        // Stream was seeked to blockPosition before initialization. Seek to the
+        // blockPosition now.
+        seek(blockPosition);
       }
-      buffers.get(bufferIndex).get(b, off + total, available);
-      len -= available;
-      total += available;
-    }
-
-    if (blockStreamEOF()) {
-      // smart consumers determine EOF by calling getPos()
-      // so we release buffers when serving the final bytes of data
-      releaseBuffers();
+      initialized = true;
     }
-
-    return total;
   }
 
   /**
-   * Determines if all data in the stream has been consumed.
-   *
-   * @return true if EOF, false if more data is available
+   * Send RPC call to get the block info from the container.
+   * @return List of chunks in this block.
    */
-  protected boolean blockStreamEOF() {
-    if (buffersHaveData() || chunksRemaining()) {
-      return false;
-    } else {
-      // if there are any chunks, we better be at the last chunk for EOF
-      Preconditions.checkState(((chunks == null) || chunks.isEmpty() ||
-              chunkIndex == (chunks.size() - 1)),
-          "EOF detected, but not at the last chunk");
-      return true;
+  private List<ChunkInfo> getChunkInfos() throws IOException {
+    // irrespective of the container state, we will always read via Standalone
+    // protocol.
+    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+      pipeline = Pipeline.newBuilder(pipeline)
+          .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
     }
-  }
-
-  private void releaseBuffers() {
-    //ashes to ashes, dust to dust
-    buffers = null;
-    bufferIndex = 0;
-  }
+    xceiverClient = xceiverClientManager.acquireClient(pipeline);
+    boolean success = false;
+    List<ChunkInfo> chunks;
+    try {
+      LOG.debug("Initializing BlockInputStream for get key to access {} {}",
+          blockID.getContainerID(), containerKey);
 
-  @Override
-  public synchronized void close() {
-    if (xceiverClientManager != null && xceiverClient != null) {
-      xceiverClientManager.releaseClient(xceiverClient, false);
-      xceiverClientManager = null;
-      xceiverClient = null;
+      if (token != null) {
+        UserGroupInformation.getCurrentUser().addToken(token);
+      }
+      DatanodeBlockID datanodeBlockID = blockID
+          .getDatanodeBlockIDProtobuf();
+      GetBlockResponseProto response = ContainerProtocolCalls
+          .getBlock(xceiverClient, datanodeBlockID, traceID);
+
+      chunks = response.getBlockData().getChunksList();
+      success = true;
+    } finally {
+      if (!success) {
+        xceiverClientManager.releaseClient(xceiverClient, false);
+      }
     }
+
+    return chunks;
   }
 
   /**
-   * Checks if the stream is open.  If not, throws an exception.
-   *
-   * @throws IOException if stream is closed
+   * For each chunk in the block, create a ChunkInputStream and compute
+   * its chunkOffset.
    */
-  private synchronized void checkOpen() throws IOException {
-    if (xceiverClient == null) {
-      throw new IOException("BlockInputStream has been closed.");
+  private void intializeChunkInputStreams(List<ChunkInfo> chunks) {
+    this.chunkOffsets = new long[chunks.size()];
+    long tempOffset = 0;
+
+    this.chunkStreams = new ArrayList<>(chunks.size());
+    for (int i = 0; i < chunks.size(); i++) {
+      addStream(chunks.get(i));
+      chunkOffsets[i] = tempOffset;
+      tempOffset += chunks.get(i).getLen();
     }
+
+    this.chunkIndex = 0;
   }
 
   /**
-   * Prepares to read by advancing through chunks and buffers as needed until it
-   * finds data to return or encounters EOF.
-   *
-   * @param len desired length of data to read
-   * @return length of data available to read, possibly less than desired length
+   * Append another ChunkInputStream to the end of the list. Note that the
+   * ChunkInputStream is only created here. The chunk will be read from the
+   * Datanode only when a read operation is performed on for that chunk.
    */
-  private synchronized int prepareRead(int len) throws IOException {
-    for (;;) {
-      if (!buffersAllocated()) {
-        // The current chunk at chunkIndex has not been read from the
-        // container. Read the chunk and put the data into buffers.
-        readChunkFromContainer();
-      }
-      if (buffersHaveData()) {
-        // Data is available from buffers
-        ByteBuffer bb = buffers.get(bufferIndex);
-        return len > bb.remaining() ? bb.remaining() : len;
-      } else if (chunksRemaining()) {
-        // There are additional chunks available.
-        // Read the next chunk in the block.
-        chunkIndex += 1;
-        readChunkFromContainer();
-      } else {
-        // All available input has been consumed.
-        return EOF;
-      }
-    }
+  private synchronized void addStream(ChunkInfo chunkInfo) {
+    chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, traceID,
+        xceiverClient, verifyChecksum));
   }
 
-  private boolean buffersAllocated() {
-    if (buffers == null || buffers.isEmpty()) {
-      return false;
-    }
-    return true;
+  public synchronized long getRemaining() throws IOException {
+    return length - getPos();
   }
 
-  private boolean buffersHaveData() {
-    boolean hasData = false;
-
-    if (buffersAllocated()) {
-      while (bufferIndex < (buffers.size())) {
-        if (buffers.get(bufferIndex).hasRemaining()) {
-          // current buffer has data
-          hasData = true;
-          break;
-        } else {
-          if (buffersRemaining()) {
-            // move to next available buffer
-            ++bufferIndex;
-            Preconditions.checkState(bufferIndex < buffers.size());
-          } else {
-            // no more buffers remaining
-            break;
-          }
-        }
-      }
+  @Override
+  public synchronized int read() throws IOException {
+    byte[] buf = new byte[1];
+    if (read(buf, 0, 1) == EOF) {
+      return EOF;
     }
-
-    return hasData;
-  }
-
-  private boolean buffersRemaining() {
-    return (bufferIndex < (buffers.size() - 1));
+    return Byte.toUnsignedInt(buf[0]);
   }
 
-  private boolean chunksRemaining() {
-    if ((chunks == null) || chunks.isEmpty()) {
-      return false;
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
     }
-    // Check if more chunks are remaining in the stream after chunkIndex
-    if (chunkIndex < (chunks.size() - 1)) {
-      return true;
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
     }
-    // ChunkIndex is the last chunk in the stream. Check if this chunk has
-    // been read from container or not. Return true if chunkIndex has not
-    // been read yet and false otherwise.
-    return chunkIndexOfCurrentBuffer != chunkIndex;
-  }
-
-  /**
-   * Attempts to read the chunk at the specified offset in the chunk list.  If
-   * successful, then the data of the read chunk is saved so that its bytes can
-   * be returned from subsequent read calls.
-   *
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  private synchronized void readChunkFromContainer() throws IOException {
-    // Read the chunk at chunkIndex
-    final ChunkInfo chunkInfo = chunks.get(chunkIndex);
-    List<DatanodeDetails> excludeDns = null;
-    ByteString byteString;
-    List<DatanodeDetails> dnList = getDatanodeList();
-    while (true) {
-      List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
-      byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall);
-      try {
-        if (byteString.size() != chunkInfo.getLen()) {
-          // Bytes read from chunk should be equal to chunk size.
-          throw new IOException(String
-              .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
-                  chunkInfo.getChunkName(), chunkInfo.getLen(),
-                  byteString.size()));
-        }
-        ChecksumData checksumData =
-            ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
-        if (verifyChecksum) {
-          Checksum.verifyChecksum(byteString, checksumData);
-        }
-        break;
-      } catch (IOException ioe) {
-        // we will end up in this situation only if the checksum mismatch
-        // happens or the length of the chunk mismatches.
-        // In this case, read should be retried on a different replica.
-        // TODO: Inform SCM of a possible corrupt container replica here
-        if (excludeDns == null) {
-          excludeDns = new ArrayList<>();
-        }
-        excludeDns.addAll(dnListFromReadChunkCall);
-        if (excludeDns.size() == dnList.size()) {
-          throw ioe;
-        }
-      }
+    if (len == 0) {
+      return 0;
     }
 
-    buffers = byteString.asReadOnlyByteBufferList();
-    bufferIndex = 0;
-    chunkIndexOfCurrentBuffer = chunkIndex;
+    if (!initialized) {
+      initialize();
+    }
 
-    // The bufferIndex and position might need to be adjusted if seek() was
-    // called on the stream before. This needs to be done so that the buffer
-    // position can be advanced to the 'seeked' position.
-    adjustBufferIndex();
-  }
+    checkOpen();
+    int totalReadLen = 0;
+    while (len > 0) {
+      // if we are at the last chunk and have read the entire chunk, return
+      if (chunkStreams.size() == 0 ||
+          (chunkStreams.size() - 1 <= chunkIndex &&
+              chunkStreams.get(chunkIndex)
+                  .getRemaining() == 0)) {
+        return totalReadLen == 0 ? EOF : totalReadLen;
+      }
 
-  /**
-   * Send RPC call to get the chunk from the container.
-   */
-  @VisibleForTesting
-  protected ByteString readChunk(final ChunkInfo chunkInfo,
-      List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
-      throws IOException {
-    XceiverClientReply reply;
-    ReadChunkResponseProto readChunkResponse = null;
-    try {
-      reply = ContainerProtocolCalls
-          .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
-      ContainerProtos.ContainerCommandResponseProto response;
-      response = reply.getResponse().get();
-      ContainerProtocolCalls.validateContainerResponse(response);
-      readChunkResponse = response.getReadChunk();
-      dnListFromReply.addAll(reply.getDatanodes());
-    } catch (IOException e) {
-      if (e instanceof StorageContainerException) {
-        throw e;
+      // Get the current chunkStream and read data from it
+      ChunkInputStream current = chunkStreams.get(chunkIndex);
+      int numBytesToRead = Math.min(len, (int)current.getRemaining());
+      int numBytesRead = current.read(b, off, numBytesToRead);
+      if (numBytesRead != numBytesToRead) {
+        // This implies that there is either data loss or corruption in the
+        // chunk entries. Even EOF in the current stream would be covered in
+        // this case.
+        throw new IOException(String.format(
+            "Inconsistent read for chunkName=%s length=%d numBytesRead=%d",
+            current.getChunkName(), current.getLength(), numBytesRead));
+      }
+      totalReadLen += numBytesRead;
+      off += numBytesRead;
+      len -= numBytesRead;
+      if (current.getRemaining() <= 0 &&
+          ((chunkIndex + 1) < chunkStreams.size())) {
+        chunkIndex += 1;
       }
-      throw new IOException("Unexpected OzoneException: " + e.toString(), e);
-    } catch (ExecutionException | InterruptedException e) {
-      throw new IOException(
-          "Failed to execute ReadChunk command for chunk  " + chunkInfo
-              .getChunkName(), e);
     }
-    return readChunkResponse.getData();
-  }
-
-  @VisibleForTesting
-  protected List<DatanodeDetails> getDatanodeList() {
-    return xceiverClient.getPipeline().getNodes();
+    return totalReadLen;
   }
 
+  /**
+   * Seeks the BlockInputStream to the specified postion. If the stream is 
+   * not initialized, save the seeked postion via blockPosition. Otherwise, 
+   * update the postion in 2 steps:
+   *    1. Updating the chunkIndex to the chunkStream corresponding to the
+   *    seeked postion.
+   *    2. Seek the corresponding chunkStream to the adjusted postion.
+   *
+   * Let’s say we have chunk size as 40 bytes. And let's say the parent block
+   * stores data from index 200 and has length 400. If the key was seeked to
+   * postion 90, then this block will be seeked to postion 90.
+   * When seek(90) is called on this blockStream, then
+   *    1. chunkIndex will be set to 2 (as indices 80 - 120 reside in chunk[2]).
+   *    2. chunkStream[2] will be seeked to postion 10
+   *       (= 90 - chunkOffset[2] (= 80)).
+   */
   @Override
   public synchronized void seek(long pos) throws IOException {
-    if (pos < 0 || (chunks.size() == 0 && pos > 0)
-        || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1)
-        .getLen()) {
-      throw new EOFException("EOF encountered pos: " + pos + " container key: "
-          + blockID.getLocalID());
+    if (!initialized) {
+      // Stream has not been initialized yet. Save the postion so that it
+      // can be seeked when the stream is initialized.
+      blockPosition = pos;
+      return;
     }
 
-    if (pos < chunkOffset[chunkIndex]) {
-      chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
-    } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
-        .getLen()) {
+    checkOpen();
+    if (pos < 0 || pos >= length) {
+      if (pos == 0) {
+        // It is possible for length and pos to be zero in which case
+        // seek should return instead of throwing exception
+        return;
+      }
+      throw new EOFException(
+          "EOF encountered at pos: " + pos + " for block: " + blockID);
+    }
+    Preconditions.assertTrue(chunkIndex >= 0);
+    if (chunkIndex >= chunkStreams.size()) {
+      chunkIndex = Arrays.binarySearch(chunkOffsets, pos);
+    } else if (pos < chunkOffsets[chunkIndex]) {
       chunkIndex =
-          Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos);
+          Arrays.binarySearch(chunkOffsets, 0, chunkIndex, pos);
+    } else if (pos >= chunkOffsets[chunkIndex] + chunkStreams
+        .get(chunkIndex).getLength()) {
+      chunkIndex = Arrays.binarySearch(chunkOffsets,
+          chunkIndex + 1, chunkStreams.size(), pos);
     }
     if (chunkIndex < 0) {
       // Binary search returns -insertionPoint - 1  if element is not present
       // in the array. insertionPoint is the point at which element would be
       // inserted in the sorted array. We need to adjust the chunkIndex
       // accordingly so that chunkIndex = insertionPoint - 1
-      chunkIndex = -chunkIndex -2;
+      chunkIndex = -chunkIndex - 2;
     }
 
-    // The bufferPosition should be adjusted to account for the chunk offset
-    // of the chunk the the pos actually points to.
-    bufferPosition = pos - chunkOffset[chunkIndex];
+    // Reset the previous chunkStream's postion
+    chunkStreams.get(chunkIndexOfPrevPosition).resetPosition();
 
-    // Check if current buffers correspond to the chunk index being seeked
-    // and if the buffers have any data.
-    if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated()) {
-      // Position the buffer to the seeked position.
-      adjustBufferIndex();
-    } else {
-      // Release the current buffers. The next readChunkFromContainer will
-      // read the required chunk and position the buffer to the seeked
-      // position.
-      releaseBuffers();
-    }
+    // seek to the proper offset in the ChunkInputStream
+    chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]);
+    chunkIndexOfPrevPosition = chunkIndex;
   }
 
-  private void adjustBufferIndex() {
-    if (bufferPosition == -1) {
-      // The stream has not been seeked to a position. No need to adjust the
-      // buffer Index and position.
-      return;
+  @Override
+  public long getPos() throws IOException {
+    if (length == 0) {
+      return 0;
     }
-    // The bufferPosition is w.r.t the buffers for current chunk.
-    // Adjust the bufferIndex and position to the seeked position.
-    long tempOffest = 0;
-    for (int i = 0; i < buffers.size(); i++) {
-      if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
-        tempOffest += buffers.get(i).capacity();
-      } else {
-        bufferIndex = i;
-        break;
-      }
+
+    if (!initialized) {
+      // The stream is not initialized yet. Return the blockPosition
+      return blockPosition;
+    } else {
+      return chunkOffsets[chunkIndex] + chunkStreams.get(chunkIndex).getPos();
     }
-    buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
-    // Reset the bufferPosition as the seek() operation has been completed.
-    bufferPosition = -1;
   }
 
   @Override
-  public synchronized long getPos() throws IOException {
-    // position = chunkOffset of current chunk (at chunkIndex) + position of
-    // the buffer corresponding to the chunk.
-    long bufferPos = 0;
-
-    if (bufferPosition >= 0) {
-      // seek has been called but the buffers were empty. Hence, the buffer
-      // position will be advanced after the buffers are filled.
-      // We return the chunkOffset + bufferPosition here as that will be the
-      // position of the buffer pointer after reading the chunk file.
-      bufferPos = bufferPosition;
-
-    } else if (blockStreamEOF()) {
-      // all data consumed, buffers have been released.
-      // get position from the chunk offset and chunk length of last chunk
-      bufferPos = chunks.get(chunkIndex).getLen();
-
-    } else if (buffersAllocated()) {
-      // get position from available buffers of current chunk
-      bufferPos = buffers.get(bufferIndex).position();
+  public boolean seekToNewSource(long targetPos) throws IOException {
 
 Review comment:
   Why here returning false, read Javadoc not completely clear of this?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 247642)
    Time Spent: 3.5h  (was: 3h 20m)

> Support partial chunk reads and checksum verification
> -----------------------------------------------------
>
>                 Key: HDDS-1496
>                 URL: https://issues.apache.org/jira/browse/HDDS-1496
>             Project: Hadoop Distributed Data Store
>          Issue Type: Improvement
>            Reporter: Hanisha Koneru
>            Assignee: Hanisha Koneru
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> BlockInputStream#readChunkFromContainer() reads the whole chunk from disk even if we
need to read only a part of the chunk.
> This Jira aims to improve readChunkFromContainer so that only that part of the chunk
file is read which is needed by client plus the part of chunk file which is required to verify
the checksum.
> For example, lets say the client is reading from index 120 to 450 in the chunk. And let's
say checksum is stored for every 100 bytes in the chunk i.e. the first checksum is for bytes
from index 0 to 99, the next for bytes from index 100 to 199 and so on. To verify bytes from
120 to 450, we would need to read from bytes 100 to 499 so that checksum verification can
be done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-help@hadoop.apache.org


Mime
View raw message