hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dran...@apache.org
Subject hadoop git commit: HDFS-8905. Refactor DFSInputStream#ReaderStrategy. Contributed by Kai Zheng and Sammi Chen
Date Wed, 24 Aug 2016 13:58:22 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk ec252ce0f -> 793447f79


HDFS-8905. Refactor DFSInputStream#ReaderStrategy. Contributed by Kai Zheng and Sammi Chen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/793447f7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/793447f7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/793447f7

Branch: refs/heads/trunk
Commit: 793447f79924c97c2b562d5e41fa85adf19673fe
Parents: ec252ce
Author: Kai Zheng <kai.zheng@intel.com>
Authored: Wed Aug 24 21:57:23 2016 +0800
Committer: Kai Zheng <kai.zheng@intel.com>
Committed: Wed Aug 24 21:57:23 2016 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 235 ++-----------------
 .../hadoop/hdfs/DFSStripedInputStream.java      |  23 +-
 .../org/apache/hadoop/hdfs/ReadStatistics.java  | 106 +++++++++
 .../org/apache/hadoop/hdfs/ReaderStrategy.java  | 215 +++++++++++++++++
 .../hadoop/hdfs/client/HdfsDataInputStream.java |   3 +-
 .../apache/hadoop/hdfs/util/IOUtilsClient.java  |  16 ++
 .../hadoop/hdfs/TestExternalBlockReader.java    |   4 +-
 7 files changed, 374 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/793447f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 6132f83..7a10ba4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -145,94 +146,6 @@ public class DFSInputStream extends FSInputStream
     return extendedReadBuffers;
   }
 
-  public static class ReadStatistics {
-    public ReadStatistics() {
-      clear();
-    }
-
-    public ReadStatistics(ReadStatistics rhs) {
-      this.totalBytesRead = rhs.getTotalBytesRead();
-      this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
-      this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
-      this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
-    }
-
-    /**
-     * @return The total bytes read.  This will always be at least as
-     * high as the other numbers, since it includes all of them.
-     */
-    public long getTotalBytesRead() {
-      return totalBytesRead;
-    }
-
-    /**
-     * @return The total local bytes read.  This will always be at least
-     * as high as totalShortCircuitBytesRead, since all short-circuit
-     * reads are also local.
-     */
-    public long getTotalLocalBytesRead() {
-      return totalLocalBytesRead;
-    }
-
-    /**
-     * @return The total short-circuit local bytes read.
-     */
-    public long getTotalShortCircuitBytesRead() {
-      return totalShortCircuitBytesRead;
-    }
-
-    /**
-     * @return The total number of zero-copy bytes read.
-     */
-    public long getTotalZeroCopyBytesRead() {
-      return totalZeroCopyBytesRead;
-    }
-
-    /**
-     * @return The total number of bytes read which were not local.
-     */
-    public long getRemoteBytesRead() {
-      return totalBytesRead - totalLocalBytesRead;
-    }
-
-    void addRemoteBytes(long amt) {
-      this.totalBytesRead += amt;
-    }
-
-    void addLocalBytes(long amt) {
-      this.totalBytesRead += amt;
-      this.totalLocalBytesRead += amt;
-    }
-
-    void addShortCircuitBytes(long amt) {
-      this.totalBytesRead += amt;
-      this.totalLocalBytesRead += amt;
-      this.totalShortCircuitBytesRead += amt;
-    }
-
-    void addZeroCopyBytes(long amt) {
-      this.totalBytesRead += amt;
-      this.totalLocalBytesRead += amt;
-      this.totalShortCircuitBytesRead += amt;
-      this.totalZeroCopyBytesRead += amt;
-    }
-
-    void clear() {
-      this.totalBytesRead = 0;
-      this.totalLocalBytesRead = 0;
-      this.totalShortCircuitBytesRead = 0;
-      this.totalZeroCopyBytesRead = 0;
-    }
-
-    private long totalBytesRead;
-
-    private long totalLocalBytesRead;
-
-    private long totalShortCircuitBytesRead;
-
-    private long totalZeroCopyBytesRead;
-  }
-
   /**
    * This variable tracks the number of failures since the start of the
    * most recent user-facing operation. That is to say, it should be reset
@@ -767,116 +680,11 @@ public class DFSInputStream extends FSInputStream
     return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
   }
 
-  /**
-   * Wraps different possible read implementations so that readBuffer can be
-   * strategy-agnostic.
-   */
-  interface ReaderStrategy {
-    int doRead(BlockReader blockReader, int off, int len)
-        throws IOException;
-
-    /**
-     * Copy data from the src ByteBuffer into the read buffer.
-     * @param src The src buffer where the data is copied from
-     * @param offset Useful only when the ReadStrategy is based on a byte array.
-     *               Indicate the offset of the byte array for copy.
-     * @param length Useful only when the ReadStrategy is based on a byte array.
-     *               Indicate the length of the data to copy.
-     */
-    int copyFrom(ByteBuffer src, int offset, int length);
-  }
-
-  protected void updateReadStatistics(ReadStatistics readStatistics,
-        int nRead, BlockReader blockReader) {
-    if (nRead <= 0) return;
-    synchronized(infoLock) {
-      if (blockReader.isShortCircuit()) {
-        readStatistics.addShortCircuitBytes(nRead);
-      } else if (blockReader.getNetworkDistance() == 0) {
-        readStatistics.addLocalBytes(nRead);
-      } else {
-        readStatistics.addRemoteBytes(nRead);
-      }
-    }
-  }
-
-  /**
-   * Used to read bytes into a byte[]
-   */
-  private class ByteArrayStrategy implements ReaderStrategy {
-    final byte[] buf;
-
-    public ByteArrayStrategy(byte[] buf) {
-      this.buf = buf;
-    }
-
-    @Override
-    public int doRead(BlockReader blockReader, int off, int len)
-        throws IOException {
-      int nRead = blockReader.read(buf, off, len);
-      updateReadStatistics(readStatistics, nRead, blockReader);
-      dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
-          nRead);
-      return nRead;
-    }
-
-    @Override
-    public int copyFrom(ByteBuffer src, int offset, int length) {
-      ByteBuffer writeSlice = src.duplicate();
-      writeSlice.get(buf, offset, length);
-      return length;
-    }
-  }
-
-  /**
-   * Used to read bytes into a user-supplied ByteBuffer
-   */
-  protected class ByteBufferStrategy implements ReaderStrategy {
-    final ByteBuffer buf;
-    ByteBufferStrategy(ByteBuffer buf) {
-      this.buf = buf;
-    }
-
-    @Override
-    public int doRead(BlockReader blockReader, int off, int len)
-        throws IOException {
-      int oldpos = buf.position();
-      int oldlimit = buf.limit();
-      boolean success = false;
-      try {
-        int ret = blockReader.read(buf);
-        success = true;
-        updateReadStatistics(readStatistics, ret, blockReader);
-        dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
-            ret);
-        if (ret == 0) {
-          DFSClient.LOG.warn("zero");
-        }
-        return ret;
-      } finally {
-        if (!success) {
-          // Reset to original state so that retries work correctly.
-          buf.position(oldpos);
-          buf.limit(oldlimit);
-        }
-      }
-    }
-
-    @Override
-    public int copyFrom(ByteBuffer src, int offset, int length) {
-      ByteBuffer writeSlice = src.duplicate();
-      int remaining = Math.min(buf.remaining(), writeSlice.remaining());
-      writeSlice.limit(writeSlice.position() + remaining);
-      buf.put(writeSlice);
-      return remaining;
-    }
-  }
-
   /* This is a used by regular read() and handles ChecksumExceptions.
    * name readBuffer() is chosen to imply similarity to readBuffer() in
    * ChecksumFileSystem
    */
-  private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
+  private synchronized int readBuffer(ReaderStrategy reader, int len,
                                       CorruptedBlocks corruptedBlocks)
       throws IOException {
     IOException ioe;
@@ -892,7 +700,7 @@ public class DFSInputStream extends FSInputStream
     while (true) {
       // retry as many times as seekToNewSource allows.
       try {
-        return reader.doRead(blockReader, off, len);
+        return reader.readFromBlock(blockReader, len);
       } catch ( ChecksumException ce ) {
         DFSClient.LOG.warn("Found Checksum error for "
             + getCurrentBlock() + " from " + currentNode
@@ -927,13 +735,14 @@ public class DFSInputStream extends FSInputStream
     }
   }
 
-  protected synchronized int readWithStrategy(ReaderStrategy strategy, int off,
-      int len) throws IOException {
+  protected synchronized int readWithStrategy(ReaderStrategy strategy)
+      throws IOException {
     dfsClient.checkOpen();
     if (closed.get()) {
       throw new IOException("Stream closed");
     }
 
+    int len = strategy.getTargetLength();
     CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
     failures = 0;
     if (pos < getFileLength()) {
@@ -952,7 +761,7 @@ public class DFSInputStream extends FSInputStream
                   locatedBlocks.getFileLength() - pos);
             }
           }
-          int result = readBuffer(strategy, off, realLen, corruptedBlocks);
+          int result = readBuffer(strategy, realLen, corruptedBlocks);
 
           if (result >= 0) {
             pos += result;
@@ -994,11 +803,12 @@ public class DFSInputStream extends FSInputStream
     if (len == 0) {
       return 0;
     }
-    ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
+    ReaderStrategy byteArrayReader =
+        new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient);
     try (TraceScope scope =
              dfsClient.newReaderTraceScope("DFSInputStream#byteArrayRead",
                  src, getPos(), len)) {
-      int retLen = readWithStrategy(byteArrayReader, off, len);
+      int retLen = readWithStrategy(byteArrayReader);
       if (retLen < len) {
         dfsClient.addRetLenToReaderScope(scope, retLen);
       }
@@ -1008,12 +818,13 @@ public class DFSInputStream extends FSInputStream
 
   @Override
   public synchronized int read(final ByteBuffer buf) throws IOException {
-    ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
+    ReaderStrategy byteBufferReader =
+        new ByteBufferStrategy(buf, readStatistics, dfsClient);
     int reqLen = buf.remaining();
     try (TraceScope scope =
              dfsClient.newReaderTraceScope("DFSInputStream#byteBufferRead",
                  src, getPos(), reqLen)){
-      int retLen = readWithStrategy(byteBufferReader, 0, reqLen);
+      int retLen = readWithStrategy(byteBufferReader);
       if (retLen < reqLen) {
         dfsClient.addRetLenToReaderScope(scope, retLen);
       }
@@ -1221,7 +1032,7 @@ public class DFSInputStream extends FSInputStream
         reader = getBlockReader(block, startInBlk, len, datanode.addr,
             datanode.storageType, datanode.info);
         int nread = reader.readAll(buf, offset, len);
-        updateReadStatistics(readStatistics, nread, reader);
+        IOUtilsClient.updateReadStatistics(readStatistics, nread, reader);
         dfsClient.updateFileSystemReadStats(
             reader.getNetworkDistance(), nread);
         if (nread != len) {
@@ -1721,18 +1532,14 @@ public class DFSInputStream extends FSInputStream
    * Get statistics about the reads which this DFSInputStream has done.
    */
   public ReadStatistics getReadStatistics() {
-    synchronized(infoLock) {
-      return new ReadStatistics(readStatistics);
-    }
+    return new ReadStatistics(readStatistics);
   }
 
   /**
    * Clear statistics about the reads which this DFSInputStream has done.
    */
   public void clearReadStatistics() {
-    synchronized(infoLock) {
-      readStatistics.clear();
-    }
+    readStatistics.clear();
   }
 
   public FileEncryptionInfo getFileEncryptionInfo() {
@@ -1759,7 +1566,8 @@ public class DFSInputStream extends FSInputStream
       throws IOException {
     synchronized (infoLock) {
       this.cachingStrategy =
-          new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
+          new CachingStrategy.Builder(this.cachingStrategy).
+              setReadahead(readahead).build();
     }
     closeCurrentBlockReaders();
   }
@@ -1769,7 +1577,8 @@ public class DFSInputStream extends FSInputStream
       throws IOException {
     synchronized (infoLock) {
       this.cachingStrategy =
-          new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
+          new CachingStrategy.Builder(this.cachingStrategy).
+              setDropBehind(dropBehind).build();
     }
     closeCurrentBlockReaders();
   }
@@ -1883,9 +1692,7 @@ public class DFSInputStream extends FSInputStream
       buffer.position((int)blockPos);
       buffer.limit((int)(blockPos + length));
       getExtendedReadBuffers().put(buffer, clientMmap);
-      synchronized (infoLock) {
-        readStatistics.addZeroCopyBytes(length);
-      }
+      readStatistics.addZeroCopyBytes(length);
       DFSClient.LOG.debug("readZeroCopy read {} bytes from offset {} via the "
           + "zero-copy read path.  blockEnd = {}", length, curPos, blockEnd);
       success = true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/793447f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index d93863c..9ca8005 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -359,11 +359,11 @@ public class DFSStripedInputStream extends DFSInputStream {
       ExtendedBlock currentBlock,
       CorruptedBlocks corruptedBlocks)
       throws IOException {
-    final int targetLength = strategy.buf.remaining();
+    final int targetLength = strategy.getTargetLength();
     int length = 0;
     try {
       while (length < targetLength) {
-        int ret = strategy.doRead(blockReader, 0, 0);
+        int ret = strategy.readFromBlock(blockReader);
         if (ret < 0) {
           throw new IOException("Unexpected EOS from the reader");
         }
@@ -425,13 +425,14 @@ public class DFSStripedInputStream extends DFSInputStream {
   }
 
   @Override
-  protected synchronized int readWithStrategy(ReaderStrategy strategy,
-      int off, int len) throws IOException {
+  protected synchronized int readWithStrategy(ReaderStrategy strategy)
+      throws IOException {
     dfsClient.checkOpen();
     if (closed.get()) {
       throw new IOException("Stream closed");
     }
 
+    int len = strategy.getTargetLength();
     CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
     if (pos < getFileLength()) {
       try {
@@ -452,7 +453,7 @@ public class DFSStripedInputStream extends DFSInputStream {
           if (!curStripeRange.include(getOffsetInBlockGroup())) {
             readOneStripe(corruptedBlocks);
           }
-          int ret = copyToTargetBuf(strategy, off + result, realLen - result);
+          int ret = copyToTargetBuf(strategy, realLen - result);
           result += ret;
           pos += ret;
         }
@@ -470,16 +471,14 @@ public class DFSStripedInputStream extends DFSInputStream {
   /**
    * Copy the data from {@link #curStripeBuf} into the given buffer
    * @param strategy the ReaderStrategy containing the given buffer
-   * @param offset the offset of the given buffer. Used only when strategy is
-   *               a ByteArrayStrategy
    * @param length target length
    * @return number of bytes copied
    */
-  private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) {
+  private int copyToTargetBuf(ReaderStrategy strategy, int length) {
     final long offsetInBlk = getOffsetInBlockGroup();
     int bufOffset = getStripedBufOffset(offsetInBlk);
     curStripeBuf.position(bufOffset);
-    return strategy.copyFrom(curStripeBuf, offset,
+    return strategy.readFromBuffer(curStripeBuf,
         Math.min(length, curStripeBuf.remaining()));
   }
 
@@ -700,7 +699,8 @@ public class DFSStripedInputStream extends DFSInputStream {
 
     private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
       if (chunk.byteBuffer != null) {
-        ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
+        ByteBufferStrategy strategy =
+            new ByteBufferStrategy(chunk.byteBuffer, readStatistics, dfsClient);
         return new ByteBufferStrategy[]{strategy};
       } else {
         ByteBufferStrategy[] strategies =
@@ -708,7 +708,8 @@ public class DFSStripedInputStream extends DFSInputStream {
         for (int i = 0; i < strategies.length; i++) {
           ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(),
               chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]);
-          strategies[i] = new ByteBufferStrategy(buffer);
+          strategies[i] =
+              new ByteBufferStrategy(buffer, readStatistics, dfsClient);
         }
         return strategies;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/793447f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java
new file mode 100644
index 0000000..59b1418
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+/**
+ * A utility class that maintains statistics for reading.
+ */
+public class ReadStatistics {
+  private long totalBytesRead;
+  private long totalLocalBytesRead;
+  private long totalShortCircuitBytesRead;
+  private long totalZeroCopyBytesRead;
+
+  public ReadStatistics() {
+    clear();
+  }
+
+  public ReadStatistics(ReadStatistics rhs) {
+    this.totalBytesRead = rhs.getTotalBytesRead();
+    this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
+    this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
+    this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
+  }
+
+  /**
+   * @return The total bytes read.  This will always be at least as
+   * high as the other numbers, since it includes all of them.
+   */
+  public synchronized long getTotalBytesRead() {
+    return totalBytesRead;
+  }
+
+  /**
+   * @return The total local bytes read.  This will always be at least
+   * as high as totalShortCircuitBytesRead, since all short-circuit
+   * reads are also local.
+   */
+  public synchronized long getTotalLocalBytesRead() {
+    return totalLocalBytesRead;
+  }
+
+  /**
+   * @return The total short-circuit local bytes read.
+   */
+  public synchronized long getTotalShortCircuitBytesRead() {
+    return totalShortCircuitBytesRead;
+  }
+
+  /**
+   * @return The total number of zero-copy bytes read.
+   */
+  public synchronized long getTotalZeroCopyBytesRead() {
+    return totalZeroCopyBytesRead;
+  }
+
+  /**
+   * @return The total number of bytes read which were not local.
+   */
+  public synchronized long getRemoteBytesRead() {
+    return totalBytesRead - totalLocalBytesRead;
+  }
+
+  public synchronized void addRemoteBytes(long amt) {
+    this.totalBytesRead += amt;
+  }
+
+  public synchronized void addLocalBytes(long amt) {
+    this.totalBytesRead += amt;
+    this.totalLocalBytesRead += amt;
+  }
+
+  public synchronized void addShortCircuitBytes(long amt) {
+    this.totalBytesRead += amt;
+    this.totalLocalBytesRead += amt;
+    this.totalShortCircuitBytesRead += amt;
+  }
+
+  public synchronized void addZeroCopyBytes(long amt) {
+    this.totalBytesRead += amt;
+    this.totalLocalBytesRead += amt;
+    this.totalShortCircuitBytesRead += amt;
+    this.totalZeroCopyBytesRead += amt;
+  }
+
+  public synchronized void clear() {
+    this.totalBytesRead = 0;
+    this.totalLocalBytesRead = 0;
+    this.totalShortCircuitBytesRead = 0;
+    this.totalZeroCopyBytesRead = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/793447f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java
new file mode 100644
index 0000000..d75a8ef
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
+
+/**
+ * Wraps different possible read implementations so that callers can be
+ * strategy-agnostic.
+ */
+interface ReaderStrategy {
+  /**
+   * Read from a block using the blockReader.
+   * @param blockReader
+   * @return number of bytes read
+   * @throws IOException
+   */
+  int readFromBlock(BlockReader blockReader) throws IOException;
+
+  /**
+   * Read from a block using the blockReader with desired length to read.
+   * @param blockReader
+   * @param length number of bytes desired to read, not ensured
+   * @return number of bytes read
+   * @throws IOException
+   */
+  int readFromBlock(BlockReader blockReader, int length) throws IOException;
+
+  /**
+   * Read or copy from a src buffer.
+   * @param src
+   * @return number of bytes copied
+   * Note: the position of the src buffer is not changed after the call
+   */
+  int readFromBuffer(ByteBuffer src);
+
+  /**
+   * Read or copy length of data bytes from a src buffer with desired length.
+   * @param src
+   * @return number of bytes copied
+   * Note: the position of the src buffer is not changed after the call
+   */
+  int readFromBuffer(ByteBuffer src, int length);
+
+  /**
+   * @return the target read buffer that reads data into.
+   */
+  ByteBuffer getReadBuffer();
+
+  /**
+   * @return the target length to read.
+   */
+  int getTargetLength();
+}
+
+/**
+ * Used to read bytes into a byte array buffer. Note it's not thread-safe
+ * and the behavior is not defined if concurrently operated.
+ */
+class ByteArrayStrategy implements ReaderStrategy {
+  private final DFSClient dfsClient;
+  private final ReadStatistics readStatistics;
+  private final byte[] readBuf;
+  private int offset;
+  private final int targetLength;
+
+  /**
+   * The constructor.
+   * @param readBuf target buffer to read into
+   * @param offset offset into the buffer
+   * @param targetLength target length of data
+   * @param readStatistics statistics counter
+   */
+  public ByteArrayStrategy(byte[] readBuf, int offset, int targetLength,
+                           ReadStatistics readStatistics,
+                           DFSClient dfsClient) {
+    this.readBuf = readBuf;
+    this.offset = offset;
+    this.targetLength = targetLength;
+    this.readStatistics = readStatistics;
+    this.dfsClient = dfsClient;
+  }
+
+  @Override
+  public ByteBuffer getReadBuffer() {
+    return ByteBuffer.wrap(readBuf, offset, targetLength);
+  }
+
+  @Override
+  public int getTargetLength() {
+    return targetLength;
+  }
+
+  @Override
+  public int readFromBlock(BlockReader blockReader) throws IOException {
+    return readFromBlock(blockReader, targetLength);
+  }
+
+  @Override
+  public int readFromBlock(BlockReader blockReader,
+                           int length) throws IOException {
+    int nRead = blockReader.read(readBuf, offset, length);
+    if (nRead > 0) {
+      updateReadStatistics(readStatistics, nRead, blockReader);
+      dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
+          nRead);
+      offset += nRead;
+    }
+    return nRead;
+  }
+
+  @Override
+  public int readFromBuffer(ByteBuffer src) {
+    return readFromBuffer(src, src.remaining());
+  }
+
+  @Override
+  public int readFromBuffer(ByteBuffer src, int length) {
+    ByteBuffer dup = src.duplicate();
+    dup.get(readBuf, offset, length);
+    offset += length;
+    return length;
+  }
+}
+
+/**
+ * Used to read bytes into a user-supplied ByteBuffer. Note it's not thread-safe
+ * and the behavior is not defined if concurrently operated. When read operation
+ * is performed, the position of the underlying byte buffer will move forward as
+ * stated in ByteBufferReadable#read(ByteBuffer buf) method.
+ */
+class ByteBufferStrategy implements ReaderStrategy {
+  private final DFSClient dfsClient;
+  private final ReadStatistics readStatistics;
+  private final ByteBuffer readBuf;
+  private final int targetLength;
+
+  /**
+   * The constructor.
+   * @param readBuf target buffer to read into
+   * @param readStatistics statistics counter
+   */
+  ByteBufferStrategy(ByteBuffer readBuf,
+                     ReadStatistics readStatistics,
+                     DFSClient dfsClient) {
+    this.readBuf = readBuf;
+    this.targetLength = readBuf.remaining();
+    this.readStatistics = readStatistics;
+    this.dfsClient = dfsClient;
+  }
+
+  @Override
+  public ByteBuffer getReadBuffer() {
+    return readBuf;
+  }
+
+  @Override
+  public int readFromBlock(BlockReader blockReader) throws IOException {
+    return readFromBlock(blockReader, readBuf.remaining());
+  }
+
+  @Override
+  public int readFromBlock(BlockReader blockReader,
+                           int length) throws IOException {
+    ByteBuffer tmpBuf = readBuf.duplicate();
+    tmpBuf.limit(tmpBuf.position() + length);
+    int nRead = blockReader.read(readBuf.slice());
+    // Only when data are read, update the position
+    if (nRead > 0) {
+      readBuf.position(readBuf.position() + nRead);
+      updateReadStatistics(readStatistics, nRead, blockReader);
+      dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
+          nRead);
+    }
+
+    return nRead;
+  }
+
+  @Override
+  public int getTargetLength() {
+    return targetLength;
+  }
+
+  @Override
+  public int readFromBuffer(ByteBuffer src) {
+    return readFromBuffer(src, src.remaining());
+  }
+
+  @Override
+  public int readFromBuffer(ByteBuffer src, int length) {
+    ByteBuffer dup = src.duplicate();
+    int newLen = Math.min(readBuf.remaining(), dup.remaining());
+    newLen = Math.min(newLen, length);
+    dup.limit(dup.position() + newLen);
+    readBuf.put(dup);
+    return newLen;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/793447f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
index 8e95451..02e5deb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.crypto.CryptoInputStream;
 import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.ReadStatistics;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -103,7 +104,7 @@ public class HdfsDataInputStream extends FSDataInputStream {
    * be higher than you would expect just by adding up the number of
    * bytes read through HdfsDataInputStream.
    */
-  public DFSInputStream.ReadStatistics getReadStatistics() {
+  public ReadStatistics getReadStatistics() {
     return getDFSInputStream().getReadStatistics();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/793447f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
index 56f8ecc..71596f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.util;
 
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.ReadStatistics;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -43,4 +45,18 @@ public class IOUtilsClient {
     }
   }
 
+  public static void updateReadStatistics(ReadStatistics readStatistics,
+                                      int nRead, BlockReader blockReader) {
+    if (nRead <= 0) {
+      return;
+    }
+
+    if (blockReader.isShortCircuit()) {
+      readStatistics.addShortCircuitBytes(nRead);
+    } else if (blockReader.getNetworkDistance() == 0) {
+      readStatistics.addLocalBytes(nRead);
+    } else {
+      readStatistics.addRemoteBytes(nRead);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/793447f7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
index 5c2b6da..8acf4bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -40,7 +39,8 @@ import java.util.LinkedList;
 import java.util.UUID;
 
 public class TestExternalBlockReader {
-  private static final Log LOG = LogFactory.getLog(TestExternalBlockReader.class);
+  private static final Log LOG =
+          LogFactory.getLog(TestExternalBlockReader.class);
 
   private static long SEED = 1234;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message