From hdfs-issues-return-264058-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Thu May 23 19:41:11 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 25A2818062F for ; Thu, 23 May 2019 21:41:11 +0200 (CEST) Received: (qmail 89103 invoked by uid 500); 23 May 2019 19:41:05 -0000 Mailing-List: contact hdfs-issues-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list hdfs-issues@hadoop.apache.org Received: (qmail 88868 invoked by uid 99); 23 May 2019 19:41:05 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 May 2019 19:41:05 +0000 Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 29661E2CA2 for ; Thu, 23 May 2019 19:41:04 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 6EC3325828 for ; Thu, 23 May 2019 19:41:02 +0000 (UTC) Date: Thu, 23 May 2019 19:41:02 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: hdfs-issues@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Work logged] (HDDS-1496) Support partial chunk reads and checksum verification MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/HDDS-1496?focusedWorklogId=3D2= 47642&page=3Dcom.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpan= el#worklog-247642 ] ASF GitHub Bot logged work on HDDS-1496: ---------------------------------------- Author: ASF GitHub Bot Created on: 23/May/19 19:40 Start Date: 23/May/19 19:40 Worklog Time Spent: 10m=20 Work Description: bharatviswa504 commented on pull request #804: HDDS= -1496. Support partial chunk reads and checksum verification URL: https://github.com/apache/hadoop/pull/804#discussion_r287092731 =20 =20 ########## File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/sto= rage/BlockInputStream.java ########## @@ -43,467 +41,334 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ExecutionException; =20 /** * An {@link InputStream} used by the REST service in combination with the * SCMClient to read the value of a key from a sequence * of container chunks. All bytes of the key value are stored in containe= r - * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} + * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} * instances. This class encapsulates all state management for iterating - * through the sequence of chunks and the sequence of buffers within each = chunk. + * through the sequence of chunks through {@link ChunkInputStream}. */ public class BlockInputStream extends InputStream implements Seekable { =20 + private static final Logger LOG =3D + LoggerFactory.getLogger(BlockInputStream.class); + private static final int EOF =3D -1; =20 private final BlockID blockID; + private final long length; + private Pipeline pipeline; + private final long containerKey; + private final Token token; + private final boolean verifyChecksum; private final String traceID; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; - private List chunks; - // ChunkIndex points to the index current chunk in the buffers or the th= e - // index of chunk which will be read next into the buffers in - // readChunkFromContainer(). + private boolean initialized =3D false; + + // List of ChunkInputStreams, one for each chunk in the block + private List chunkStreams; + + // chunkOffsets[i] stores the index of the first data byte in + // chunkStream i w.r.t the block data. + // Let=E2=80=99s say we have chunk size as 40 bytes. And let's say the p= arent + // block stores data from index 200 and has length 400. + // The first 40 bytes of this block will be stored in chunk[0], next 40 = in + // chunk[1] and so on. But since the chunkOffsets are w.r.t the block on= ly + // and not the key, the values in chunkOffsets will be [0, 40, 80,....]. + private long[] chunkOffsets =3D null; + + // Index of the chunkStream corresponding to the current postion of the + // BlockInputStream i.e offset of the data to be read next from this blo= ck private int chunkIndex; - // ChunkIndexOfCurrentBuffer points to the index of chunk read into the - // buffers or index of the last chunk in the buffers. It is updated only - // when a new chunk is read from container into the buffers. - private int chunkIndexOfCurrentBuffer; - private long[] chunkOffset; - private List buffers; - private int bufferIndex; - private long bufferPosition; - private final boolean verifyChecksum; =20 - /** - * Creates a new BlockInputStream. - * - * @param blockID block ID of the chunk - * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param chunks list of chunks to read - * @param traceID container protocol call traceID - * @param verifyChecksum verify checksum - * @param initialPosition the initial position of the stream pointer. Th= is - * position is seeked now if the up-stream was se= eked - * before this was created. - */ - public BlockInputStream( - BlockID blockID, XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, List chunks, String trace= ID, - boolean verifyChecksum, long initialPosition) throws IOException { - this.blockID =3D blockID; - this.traceID =3D traceID; - this.xceiverClientManager =3D xceiverClientManager; - this.xceiverClient =3D xceiverClient; - this.chunks =3D chunks; - this.chunkIndex =3D 0; - this.chunkIndexOfCurrentBuffer =3D -1; - // chunkOffset[i] stores offset at which chunk i stores data in - // BlockInputStream - this.chunkOffset =3D new long[this.chunks.size()]; - initializeChunkOffset(); - this.buffers =3D null; - this.bufferIndex =3D 0; - this.bufferPosition =3D -1; + // Position of the BlockInputStream is maintainted by this variable till + // the stream is initialized. This postion is w.r.t to the block only an= d + // not the key. + // For the above example, if we seek to postion 240 before the stream is + // initialized, then value of postion will be set to 40. + // Once, the stream is initialized, the postion of the stream + // will be determined by the current chunkStream and its postion. + private long blockPosition =3D 0; + + // Tracks the chunkIndex corresponding to the last blockPosition so that= it + // can be reset if a new postion is seeked. + private int chunkIndexOfPrevPosition; + + public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipelin= e, + long containerKey, Token token, + boolean verifyChecksum, String traceId, + XceiverClientManager xceiverClientManager) { + this.blockID =3D blockId; + this.length =3D blockLen; + this.pipeline =3D pipeline; + this.containerKey =3D containerKey; + this.token =3D token; this.verifyChecksum =3D verifyChecksum; - if (initialPosition > 0) { - // The stream was seeked to a position before the stream was - // initialized. So seeking to the position now. - seek(initialPosition); - } - } - - private void initializeChunkOffset() { - long tempOffset =3D 0; - for (int i =3D 0; i < chunks.size(); i++) { - chunkOffset[i] =3D tempOffset; - tempOffset +=3D chunks.get(i).getLen(); - } + this.traceID =3D traceId; + this.xceiverClientManager =3D xceiverClientManager; } =20 - @Override - public synchronized int read() - throws IOException { - checkOpen(); - int available =3D prepareRead(1); - int dataout =3D EOF; + /** + * Initialize the BlockInputStream. Get the BlockData (list of chunks) f= rom + * the Container and create the ChunkInputStreams for each Chunk in the = Block. + */ + public synchronized void initialize() throws IOException { =20 - if (available =3D=3D EOF) { - Preconditions - .checkState(buffers =3D=3D null); //should have released by now,= see below - } else { - dataout =3D Byte.toUnsignedInt(buffers.get(bufferIndex).get()); - } + List chunks =3D getChunkInfos(); =20 - if (blockStreamEOF()) { - // consumer might use getPos to determine EOF, - // so release buffers when serving the last byte of data - releaseBuffers(); - } + if (chunks !=3D null && !chunks.isEmpty()) { + intializeChunkInputStreams(chunks); =20 - return dataout; - } - - @Override - public synchronized int read(byte[] b, int off, int len) throws IOExcept= ion { - // According to the JavaDocs for InputStream, it is recommended that - // subclasses provide an override of bulk read if possible for perform= ance - // reasons. In addition to performance, we need to do it for correctn= ess - // reasons. The Ozone REST service uses PipedInputStream and - // PipedOutputStream to relay HTTP response data between a Jersey thre= ad and - // a Netty thread. It turns out that PipedInputStream/PipedOutputStre= am - // have a subtle dependency (bug?) on the wrapped stream providing sep= arate - // implementations of single-byte read and bulk read. Without this, g= et key - // responses might close the connection before writing all of the byte= s - // advertised in the Content-Length. - if (b =3D=3D null) { - throw new NullPointerException(); - } - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - if (len =3D=3D 0) { - return 0; - } - checkOpen(); - int total =3D 0; - while (len > 0) { - int available =3D prepareRead(len); - if (available =3D=3D EOF) { - Preconditions - .checkState(buffers =3D=3D null); //should have been released = by now - return total !=3D 0 ? total : EOF; + if (blockPosition > 0) { + // Stream was seeked to blockPosition before initialization. Seek = to the + // blockPosition now. + seek(blockPosition); } - buffers.get(bufferIndex).get(b, off + total, available); - len -=3D available; - total +=3D available; - } - - if (blockStreamEOF()) { - // smart consumers determine EOF by calling getPos() - // so we release buffers when serving the final bytes of data - releaseBuffers(); + initialized =3D true; } - - return total; } =20 /** - * Determines if all data in the stream has been consumed. - * - * @return true if EOF, false if more data is available + * Send RPC call to get the block info from the container. + * @return List of chunks in this block. */ - protected boolean blockStreamEOF() { - if (buffersHaveData() || chunksRemaining()) { - return false; - } else { - // if there are any chunks, we better be at the last chunk for EOF - Preconditions.checkState(((chunks =3D=3D null) || chunks.isEmpty() |= | - chunkIndex =3D=3D (chunks.size() - 1)), - "EOF detected, but not at the last chunk"); - return true; + private List getChunkInfos() throws IOException { + // irrespective of the container state, we will always read via Standa= lone + // protocol. + if (pipeline.getType() !=3D HddsProtos.ReplicationType.STAND_ALONE) { + pipeline =3D Pipeline.newBuilder(pipeline) + .setType(HddsProtos.ReplicationType.STAND_ALONE).build(); } - } - - private void releaseBuffers() { - //ashes to ashes, dust to dust - buffers =3D null; - bufferIndex =3D 0; - } + xceiverClient =3D xceiverClientManager.acquireClient(pipeline); + boolean success =3D false; + List chunks; + try { + LOG.debug("Initializing BlockInputStream for get key to access {} {}= ", + blockID.getContainerID(), containerKey); =20 - @Override - public synchronized void close() { - if (xceiverClientManager !=3D null && xceiverClient !=3D null) { - xceiverClientManager.releaseClient(xceiverClient, false); - xceiverClientManager =3D null; - xceiverClient =3D null; + if (token !=3D null) { + UserGroupInformation.getCurrentUser().addToken(token); + } + DatanodeBlockID datanodeBlockID =3D blockID + .getDatanodeBlockIDProtobuf(); + GetBlockResponseProto response =3D ContainerProtocolCalls + .getBlock(xceiverClient, datanodeBlockID, traceID); + + chunks =3D response.getBlockData().getChunksList(); + success =3D true; + } finally { + if (!success) { + xceiverClientManager.releaseClient(xceiverClient, false); + } } + + return chunks; } =20 /** - * Checks if the stream is open. If not, throws an exception. - * - * @throws IOException if stream is closed + * For each chunk in the block, create a ChunkInputStream and compute + * its chunkOffset. */ - private synchronized void checkOpen() throws IOException { - if (xceiverClient =3D=3D null) { - throw new IOException("BlockInputStream has been closed."); + private void intializeChunkInputStreams(List chunks) { + this.chunkOffsets =3D new long[chunks.size()]; + long tempOffset =3D 0; + + this.chunkStreams =3D new ArrayList<>(chunks.size()); + for (int i =3D 0; i < chunks.size(); i++) { + addStream(chunks.get(i)); + chunkOffsets[i] =3D tempOffset; + tempOffset +=3D chunks.get(i).getLen(); } + + this.chunkIndex =3D 0; } =20 /** - * Prepares to read by advancing through chunks and buffers as needed un= til it - * finds data to return or encounters EOF. - * - * @param len desired length of data to read - * @return length of data available to read, possibly less than desired = length + * Append another ChunkInputStream to the end of the list. Note that the + * ChunkInputStream is only created here. The chunk will be read from th= e + * Datanode only when a read operation is performed on for that chunk. */ - private synchronized int prepareRead(int len) throws IOException { - for (;;) { - if (!buffersAllocated()) { - // The current chunk at chunkIndex has not been read from the - // container. Read the chunk and put the data into buffers. - readChunkFromContainer(); - } - if (buffersHaveData()) { - // Data is available from buffers - ByteBuffer bb =3D buffers.get(bufferIndex); - return len > bb.remaining() ? bb.remaining() : len; - } else if (chunksRemaining()) { - // There are additional chunks available. - // Read the next chunk in the block. - chunkIndex +=3D 1; - readChunkFromContainer(); - } else { - // All available input has been consumed. - return EOF; - } - } + private synchronized void addStream(ChunkInfo chunkInfo) { + chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, traceID, + xceiverClient, verifyChecksum)); } =20 - private boolean buffersAllocated() { - if (buffers =3D=3D null || buffers.isEmpty()) { - return false; - } - return true; + public synchronized long getRemaining() throws IOException { + return length - getPos(); } =20 - private boolean buffersHaveData() { - boolean hasData =3D false; - - if (buffersAllocated()) { - while (bufferIndex < (buffers.size())) { - if (buffers.get(bufferIndex).hasRemaining()) { - // current buffer has data - hasData =3D true; - break; - } else { - if (buffersRemaining()) { - // move to next available buffer - ++bufferIndex; - Preconditions.checkState(bufferIndex < buffers.size()); - } else { - // no more buffers remaining - break; - } - } - } + @Override + public synchronized int read() throws IOException { + byte[] buf =3D new byte[1]; + if (read(buf, 0, 1) =3D=3D EOF) { + return EOF; } - - return hasData; - } - - private boolean buffersRemaining() { - return (bufferIndex < (buffers.size() - 1)); + return Byte.toUnsignedInt(buf[0]); } =20 - private boolean chunksRemaining() { - if ((chunks =3D=3D null) || chunks.isEmpty()) { - return false; + @Override + public synchronized int read(byte[] b, int off, int len) throws IOExcept= ion { + if (b =3D=3D null) { + throw new NullPointerException(); } - // Check if more chunks are remaining in the stream after chunkIndex - if (chunkIndex < (chunks.size() - 1)) { - return true; + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); } - // ChunkIndex is the last chunk in the stream. Check if this chunk has - // been read from container or not. Return true if chunkIndex has not - // been read yet and false otherwise. - return chunkIndexOfCurrentBuffer !=3D chunkIndex; - } - - /** - * Attempts to read the chunk at the specified offset in the chunk list.= If - * successful, then the data of the read chunk is saved so that its byte= s can - * be returned from subsequent read calls. - * - * @throws IOException if there is an I/O error while performing the cal= l - */ - private synchronized void readChunkFromContainer() throws IOException { - // Read the chunk at chunkIndex - final ChunkInfo chunkInfo =3D chunks.get(chunkIndex); - List excludeDns =3D null; - ByteString byteString; - List dnList =3D getDatanodeList(); - while (true) { - List dnListFromReadChunkCall =3D new ArrayList<>(); - byteString =3D readChunk(chunkInfo, excludeDns, dnListFromReadChunkC= all); - try { - if (byteString.size() !=3D chunkInfo.getLen()) { - // Bytes read from chunk should be equal to chunk size. - throw new IOException(String - .format("Inconsistent read for chunk=3D%s len=3D%d bytesRead= =3D%d", - chunkInfo.getChunkName(), chunkInfo.getLen(), - byteString.size())); - } - ChecksumData checksumData =3D - ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); - if (verifyChecksum) { - Checksum.verifyChecksum(byteString, checksumData); - } - break; - } catch (IOException ioe) { - // we will end up in this situation only if the checksum mismatch - // happens or the length of the chunk mismatches. - // In this case, read should be retried on a different replica. - // TODO: Inform SCM of a possible corrupt container replica here - if (excludeDns =3D=3D null) { - excludeDns =3D new ArrayList<>(); - } - excludeDns.addAll(dnListFromReadChunkCall); - if (excludeDns.size() =3D=3D dnList.size()) { - throw ioe; - } - } + if (len =3D=3D 0) { + return 0; } =20 - buffers =3D byteString.asReadOnlyByteBufferList(); - bufferIndex =3D 0; - chunkIndexOfCurrentBuffer =3D chunkIndex; + if (!initialized) { + initialize(); + } =20 - // The bufferIndex and position might need to be adjusted if seek() wa= s - // called on the stream before. This needs to be done so that the buff= er - // position can be advanced to the 'seeked' position. - adjustBufferIndex(); - } + checkOpen(); + int totalReadLen =3D 0; + while (len > 0) { + // if we are at the last chunk and have read the entire chunk, retur= n + if (chunkStreams.size() =3D=3D 0 || + (chunkStreams.size() - 1 <=3D chunkIndex && + chunkStreams.get(chunkIndex) + .getRemaining() =3D=3D 0)) { + return totalReadLen =3D=3D 0 ? EOF : totalReadLen; + } =20 - /** - * Send RPC call to get the chunk from the container. - */ - @VisibleForTesting - protected ByteString readChunk(final ChunkInfo chunkInfo, - List excludeDns, List dnListFromRe= ply) - throws IOException { - XceiverClientReply reply; - ReadChunkResponseProto readChunkResponse =3D null; - try { - reply =3D ContainerProtocolCalls - .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDn= s); - ContainerProtos.ContainerCommandResponseProto response; - response =3D reply.getResponse().get(); - ContainerProtocolCalls.validateContainerResponse(response); - readChunkResponse =3D response.getReadChunk(); - dnListFromReply.addAll(reply.getDatanodes()); - } catch (IOException e) { - if (e instanceof StorageContainerException) { - throw e; + // Get the current chunkStream and read data from it + ChunkInputStream current =3D chunkStreams.get(chunkIndex); + int numBytesToRead =3D Math.min(len, (int)current.getRemaining()); + int numBytesRead =3D current.read(b, off, numBytesToRead); + if (numBytesRead !=3D numBytesToRead) { + // This implies that there is either data loss or corruption in th= e + // chunk entries. Even EOF in the current stream would be covered = in + // this case. + throw new IOException(String.format( + "Inconsistent read for chunkName=3D%s length=3D%d numBytesRead= =3D%d", + current.getChunkName(), current.getLength(), numBytesRead)); + } + totalReadLen +=3D numBytesRead; + off +=3D numBytesRead; + len -=3D numBytesRead; + if (current.getRemaining() <=3D 0 && + ((chunkIndex + 1) < chunkStreams.size())) { + chunkIndex +=3D 1; } - throw new IOException("Unexpected OzoneException: " + e.toString(), = e); - } catch (ExecutionException | InterruptedException e) { - throw new IOException( - "Failed to execute ReadChunk command for chunk " + chunkInfo - .getChunkName(), e); } - return readChunkResponse.getData(); - } - - @VisibleForTesting - protected List getDatanodeList() { - return xceiverClient.getPipeline().getNodes(); + return totalReadLen; } =20 + /** + * Seeks the BlockInputStream to the specified postion. If the stream is= =20 + * not initialized, save the seeked postion via blockPosition. Otherwise= ,=20 + * update the postion in 2 steps: + * 1. Updating the chunkIndex to the chunkStream corresponding to the + * seeked postion. + * 2. Seek the corresponding chunkStream to the adjusted postion. + * + * Let=E2=80=99s say we have chunk size as 40 bytes. And let's say the p= arent block + * stores data from index 200 and has length 400. If the key was seeked = to + * postion 90, then this block will be seeked to postion 90. + * When seek(90) is called on this blockStream, then + * 1. chunkIndex will be set to 2 (as indices 80 - 120 reside in chun= k[2]). + * 2. chunkStream[2] will be seeked to postion 10 + * (=3D 90 - chunkOffset[2] (=3D 80)). + */ @Override public synchronized void seek(long pos) throws IOException { - if (pos < 0 || (chunks.size() =3D=3D 0 && pos > 0) - || pos >=3D chunkOffset[chunks.size() - 1] + chunks.get(chunks.siz= e() - 1) - .getLen()) { - throw new EOFException("EOF encountered pos: " + pos + " container k= ey: " - + blockID.getLocalID()); + if (!initialized) { + // Stream has not been initialized yet. Save the postion so that it + // can be seeked when the stream is initialized. + blockPosition =3D pos; + return; } =20 - if (pos < chunkOffset[chunkIndex]) { - chunkIndex =3D Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos); - } else if (pos >=3D chunkOffset[chunkIndex] + chunks.get(chunkIndex) - .getLen()) { + checkOpen(); + if (pos < 0 || pos >=3D length) { + if (pos =3D=3D 0) { + // It is possible for length and pos to be zero in which case + // seek should return instead of throwing exception + return; + } + throw new EOFException( + "EOF encountered at pos: " + pos + " for block: " + blockID); + } + Preconditions.assertTrue(chunkIndex >=3D 0); + if (chunkIndex >=3D chunkStreams.size()) { + chunkIndex =3D Arrays.binarySearch(chunkOffsets, pos); + } else if (pos < chunkOffsets[chunkIndex]) { chunkIndex =3D - Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), = pos); + Arrays.binarySearch(chunkOffsets, 0, chunkIndex, pos); + } else if (pos >=3D chunkOffsets[chunkIndex] + chunkStreams + .get(chunkIndex).getLength()) { + chunkIndex =3D Arrays.binarySearch(chunkOffsets, + chunkIndex + 1, chunkStreams.size(), pos); } if (chunkIndex < 0) { // Binary search returns -insertionPoint - 1 if element is not pres= ent // in the array. insertionPoint is the point at which element would = be // inserted in the sorted array. We need to adjust the chunkIndex // accordingly so that chunkIndex =3D insertionPoint - 1 - chunkIndex =3D -chunkIndex -2; + chunkIndex =3D -chunkIndex - 2; } =20 - // The bufferPosition should be adjusted to account for the chunk offs= et - // of the chunk the the pos actually points to. - bufferPosition =3D pos - chunkOffset[chunkIndex]; + // Reset the previous chunkStream's postion + chunkStreams.get(chunkIndexOfPrevPosition).resetPosition(); =20 - // Check if current buffers correspond to the chunk index being seeked - // and if the buffers have any data. - if (chunkIndex =3D=3D chunkIndexOfCurrentBuffer && buffersAllocated())= { - // Position the buffer to the seeked position. - adjustBufferIndex(); - } else { - // Release the current buffers. The next readChunkFromContainer will - // read the required chunk and position the buffer to the seeked - // position. - releaseBuffers(); - } + // seek to the proper offset in the ChunkInputStream + chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]); + chunkIndexOfPrevPosition =3D chunkIndex; } =20 - private void adjustBufferIndex() { - if (bufferPosition =3D=3D -1) { - // The stream has not been seeked to a position. No need to adjust t= he - // buffer Index and position. - return; + @Override + public long getPos() throws IOException { + if (length =3D=3D 0) { + return 0; } - // The bufferPosition is w.r.t the buffers for current chunk. - // Adjust the bufferIndex and position to the seeked position. - long tempOffest =3D 0; - for (int i =3D 0; i < buffers.size(); i++) { - if (bufferPosition - tempOffest >=3D buffers.get(i).capacity()) { - tempOffest +=3D buffers.get(i).capacity(); - } else { - bufferIndex =3D i; - break; - } + + if (!initialized) { + // The stream is not initialized yet. Return the blockPosition + return blockPosition; + } else { + return chunkOffsets[chunkIndex] + chunkStreams.get(chunkIndex).getPo= s(); } - buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest))= ; - // Reset the bufferPosition as the seek() operation has been completed= . - bufferPosition =3D -1; } =20 @Override - public synchronized long getPos() throws IOException { - // position =3D chunkOffset of current chunk (at chunkIndex) + positio= n of - // the buffer corresponding to the chunk. - long bufferPos =3D 0; - - if (bufferPosition >=3D 0) { - // seek has been called but the buffers were empty. Hence, the buffe= r - // position will be advanced after the buffers are filled. - // We return the chunkOffset + bufferPosition here as that will be t= he - // position of the buffer pointer after reading the chunk file. - bufferPos =3D bufferPosition; - - } else if (blockStreamEOF()) { - // all data consumed, buffers have been released. - // get position from the chunk offset and chunk length of last chunk - bufferPos =3D chunks.get(chunkIndex).getLen(); - - } else if (buffersAllocated()) { - // get position from available buffers of current chunk - bufferPos =3D buffers.get(bufferIndex).position(); + public boolean seekToNewSource(long targetPos) throws IOException { =20 Review comment: Why here returning false, read Javadoc not completely clear of this? =20 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 247642) Time Spent: 3.5h (was: 3h 20m) > Support partial chunk reads and checksum verification > ----------------------------------------------------- > > Key: HDDS-1496 > URL: https://issues.apache.org/jira/browse/HDDS-1496 > Project: Hadoop Distributed Data Store > Issue Type: Improvement > Reporter: Hanisha Koneru > Assignee: Hanisha Koneru > Priority: Major > Labels: pull-request-available > Time Spent: 3.5h > Remaining Estimate: 0h > > BlockInputStream#readChunkFromContainer() reads the whole chunk from disk= even if we need to read only a part of the chunk. > This Jira aims to improve readChunkFromContainer so that only that part o= f the chunk file is read which is needed by client plus the part of chunk f= ile which is required to verify the checksum. > For example, lets say the client is reading from index 120 to 450 in the = chunk. And let's say checksum is stored for every 100 bytes in the chunk i.= e. the first checksum is for bytes from index 0 to 99, the next for bytes f= rom index 100 to 199 and so on. To verify bytes from 120 to 450, we would n= eed to read from bytes 100 to 499 so that checksum verification can be done= . -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscribe@hadoop.apache.org For additional commands, e-mail: hdfs-issues-help@hadoop.apache.org