hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dran...@apache.org
Subject hadoop git commit: HDFS-8668. Erasure Coding: revisit buffer used for encoding and decoding. Contributed by Sammi Chen
Date Fri, 12 Aug 2016 05:57:25 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0.0-alpha1 0bc0ea22a -> 8f6087775


HDFS-8668. Erasure Coding: revisit buffer used for encoding and decoding. Contributed by Sammi
Chen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f608777
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f608777
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f608777

Branch: refs/heads/branch-3.0.0-alpha1
Commit: 8f60877754cc33a2745392e157011fd32df74866
Parents: 0bc0ea2
Author: Kai Zheng <kai.zheng@intel.com>
Authored: Sat Aug 13 13:54:12 2016 +0800
Committer: Kai Zheng <kai.zheng@intel.com>
Committed: Sat Aug 13 13:54:12 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/io/ElasticByteBufferPool.java |  1 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 52 +++++++++++++++-----
 .../hadoop/hdfs/DFSStripedInputStream.java      | 18 ++++---
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 29 +++++++++--
 .../StripedBlockChecksumReconstructor.java      |  3 +-
 .../erasurecode/StripedBlockReader.java         |  4 ++
 .../erasurecode/StripedBlockReconstructor.java  |  4 +-
 .../erasurecode/StripedBlockWriter.java         | 22 +++++++--
 .../datanode/erasurecode/StripedReader.java     | 11 ++++-
 .../erasurecode/StripedReconstructor.java       | 13 ++++-
 .../datanode/erasurecode/StripedWriter.java     |  8 +++
 .../hadoop/hdfs/TestDFSStripedInputStream.java  |  8 +++
 .../hadoop/hdfs/TestDFSStripedOutputStream.java |  8 +++
 .../TestDFSStripedOutputStreamWithFailure.java  | 11 ++++-
 .../hadoop/hdfs/TestReconstructStripedFile.java | 11 ++++-
 15 files changed, 169 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
index 694fcbe..c35d608 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
@@ -101,6 +101,7 @@ public final class ElasticByteBufferPool implements ByteBufferPool {
 
   @Override
   public synchronized void putBuffer(ByteBuffer buffer) {
+    buffer.clear();
     TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
     while (true) {
       Key key = new Key(buffer.capacity(), System.nanoTime());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index cc919da..93aee0e 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.EnumSet;
 import java.util.concurrent.atomic.AtomicReference;
@@ -393,11 +394,47 @@ public class DFSOutputStream extends FSOutputSummer
   @Override
   protected synchronized void writeChunk(byte[] b, int offset, int len,
       byte[] checksum, int ckoff, int cklen) throws IOException {
+    writeChunkPrepare(len, ckoff, cklen);
+
+    currentPacket.writeChecksum(checksum, ckoff, cklen);
+    currentPacket.writeData(b, offset, len);
+    currentPacket.incNumChunks();
+    getStreamer().incBytesCurBlock(len);
+
+    // If packet is full, enqueue it for transmission
+    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
+        getStreamer().getBytesCurBlock() == blockSize) {
+      enqueueCurrentPacketFull();
+    }
+  }
+
+  /* write the data chunk in <code>buffer</code> staring at
+  * <code>buffer.position</code> with
+  * a length of <code>len > 0</code>, and its checksum
+  */
+  protected synchronized void writeChunk(ByteBuffer buffer, int len,
+      byte[] checksum, int ckoff, int cklen) throws IOException {
+    writeChunkPrepare(len, ckoff, cklen);
+
+    currentPacket.writeChecksum(checksum, ckoff, cklen);
+    currentPacket.writeData(buffer, len);
+    currentPacket.incNumChunks();
+    getStreamer().incBytesCurBlock(len);
+
+    // If packet is full, enqueue it for transmission
+    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
+            getStreamer().getBytesCurBlock() == blockSize) {
+      enqueueCurrentPacketFull();
+    }
+  }
+
+  private synchronized void writeChunkPrepare(int buflen,
+      int ckoff, int cklen) throws IOException {
     dfsClient.checkOpen();
     checkClosed();
 
-    if (len > bytesPerChecksum) {
-      throw new IOException("writeChunk() buffer size is " + len +
+    if (buflen > bytesPerChecksum) {
+      throw new IOException("writeChunk() buffer size is " + buflen +
                             " is larger than supported  bytesPerChecksum " +
                             bytesPerChecksum);
     }
@@ -414,17 +451,6 @@ public class DFSOutputStream extends FSOutputSummer
           currentPacket.getSeqno(), src, packetSize, chunksPerPacket,
           getStreamer().getBytesCurBlock() + ", " + this);
     }
-
-    currentPacket.writeChecksum(checksum, ckoff, cklen);
-    currentPacket.writeData(b, offset, len);
-    currentPacket.incNumChunks();
-    getStreamer().incBytesCurBlock(len);
-
-    // If packet is full, enqueue it for transmission
-    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
-        getStreamer().getBytesCurBlock() == blockSize) {
-      enqueueCurrentPacketFull();
-    }
   }
 
   void enqueueCurrentPacket() throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 1bdbc32..d93863c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -35,12 +35,12 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 
 import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
-import org.apache.hadoop.util.DirectBufferPool;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -139,7 +139,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
   }
 
-  private static final DirectBufferPool bufferPool = new DirectBufferPool();
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
 
   private final BlockReaderInfo[] blockReaders;
   private final int cellSize;
@@ -194,9 +194,14 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
   }
 
+  private boolean useDirectBuffer() {
+    return decoder.preferDirectBuffer();
+  }
+
   private void resetCurStripeBuffer() {
     if (curStripeBuf == null) {
-      curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum);
+      curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
+          cellSize * dataBlkNum);
     }
     curStripeBuf.clear();
     curStripeRange = new StripeRange(0, 0);
@@ -204,7 +209,8 @@ public class DFSStripedInputStream extends DFSInputStream {
 
   private ByteBuffer getParityBuffer() {
     if (parityBuf == null) {
-      parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum);
+      parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
+          cellSize * parityBlkNum);
     }
     parityBuf.clear();
     return parityBuf;
@@ -235,11 +241,11 @@ public class DFSStripedInputStream extends DFSInputStream {
   public synchronized void close() throws IOException {
     super.close();
     if (curStripeBuf != null) {
-      bufferPool.returnBuffer(curStripeBuf);
+      BUFFER_POOL.putBuffer(curStripeBuf);
       curStripeBuf = null;
     }
     if (parityBuf != null) {
-      bufferPool.returnBuffer(parityBuf);
+      BUFFER_POOL.putBuffer(parityBuf);
       parityBuf = null;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 85dc749..502e0a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -75,6 +77,9 @@ import org.apache.htrace.core.TraceScope;
  */
 @InterfaceAudience.Private
 public class DFSStripedOutputStream extends DFSOutputStream {
+
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+
   static class MultipleBlockingQueue<T> {
     private final List<BlockingQueue<T>> queues;
 
@@ -208,7 +213,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
 
       buffers = new ByteBuffer[numAllBlocks];
       for (int i = 0; i < buffers.length; i++) {
-        buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
+        buffers[i] = BUFFER_POOL.getBuffer(useDirectBuffer(), cellSize);
       }
     }
 
@@ -236,7 +241,10 @@ public class DFSStripedOutputStream extends DFSOutputStream {
 
     private void release() {
       for (int i = 0; i < numAllBlocks; i++) {
-        byteArrayManager.release(buffers[i].array());
+        if (buffers[i] != null) {
+          BUFFER_POOL.putBuffer(buffers[i]);
+          buffers[i] = null;
+        }
       }
     }
 
@@ -311,6 +319,10 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     setCurrentStreamer(0);
   }
 
+  private boolean useDirectBuffer() {
+    return encoder.preferDirectBuffer();
+  }
+
   StripedDataStreamer getStripedDataStreamer(int i) {
     return streamers.get(i);
   }
@@ -907,11 +919,20 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     if (current.isHealthy()) {
       try {
         DataChecksum sum = getDataChecksum();
-        sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
+        if (buffer.isDirect()) {
+          ByteBuffer directCheckSumBuf =
+              BUFFER_POOL.getBuffer(true, checksumBuf.length);
+          sum.calculateChunkedSums(buffer, directCheckSumBuf);
+          directCheckSumBuf.get(checksumBuf);
+          BUFFER_POOL.putBuffer(directCheckSumBuf);
+        } else {
+          sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
+        }
+
         for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
           int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
           int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
-          super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
+          super.writeChunk(buffer, chunkLen, checksumBuf, ckOffset,
               getChecksumSize());
         }
       } catch(Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
index c7294c7..944ed9d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
@@ -57,6 +57,7 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor
{
   }
 
   private void init() throws IOException {
+    initDecoderIfNecessary();
     getStripedReader().init();
     // allocate buffer to keep the reconstructed block data
     targetBuffer = allocateBuffer(getBufferSize());
@@ -150,8 +151,6 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor
{
   }
 
   private void reconstructTargets(int toReconstructLen) {
-    initDecoderIfNecessary();
-
     ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
 
     ByteBuffer[] outputs = new ByteBuffer[1];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index 140c658..8f976c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -90,6 +90,10 @@ class StripedBlockReader {
     return buffer;
   }
 
+  void freeReadBuffer() {
+    buffer = null;
+  }
+
   void resetBlockReader(long offsetInBlock) {
     this.blockReader = createBlockReader(offsetInBlock);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
index b800bef..9f9f15d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -49,6 +49,8 @@ class StripedBlockReconstructor extends StripedReconstructor
   public void run() {
     getDatanode().incrementXmitsInProgress();
     try {
+      initDecoderIfNecessary();
+
       getStripedReader().init();
 
       stripedWriter.init();
@@ -96,8 +98,6 @@ class StripedBlockReconstructor extends StripedReconstructor
   }
 
   private void reconstructTargets(int toReconstructLen) {
-    initDecoderIfNecessary();
-
     ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
 
     int[] erasedIndices = stripedWriter.getRealTargetIndices();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
index 32e8843..11551e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -65,6 +67,7 @@ class StripedBlockWriter {
   private ByteBuffer targetBuffer;
   private long blockOffset4Target = 0;
   private long seqNo4Target = 0;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
 
   StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode,
                      Configuration conf, ExtendedBlock block,
@@ -87,6 +90,10 @@ class StripedBlockWriter {
     return targetBuffer;
   }
 
+  void freeTargetBuffer() {
+    targetBuffer = null;
+  }
+
   /**
    * Initialize  output/input streams for transferring data to target
    * and send create block request.
@@ -154,9 +161,18 @@ class StripedBlockWriter {
       return;
     }
 
-    stripedWriter.getChecksum().calculateChunkedSums(
-        targetBuffer.array(), 0, targetBuffer.remaining(),
-        stripedWriter.getChecksumBuf(), 0);
+    if (targetBuffer.isDirect()) {
+      ByteBuffer directCheckSumBuf =
+          BUFFER_POOL.getBuffer(true, stripedWriter.getChecksumBuf().length);
+      stripedWriter.getChecksum().calculateChunkedSums(
+          targetBuffer, directCheckSumBuf);
+      directCheckSumBuf.get(stripedWriter.getChecksumBuf());
+      BUFFER_POOL.putBuffer(directCheckSumBuf);
+    } else {
+      stripedWriter.getChecksum().calculateChunkedSums(
+          targetBuffer.array(), 0, targetBuffer.remaining(),
+          stripedWriter.getChecksumBuf(), 0);
+    }
 
     int ckOff = 0;
     while (targetBuffer.remaining() > 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
index e6d4ceb..238c628 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
@@ -180,7 +180,7 @@ class StripedReader {
   }
 
   protected ByteBuffer allocateReadBuffer() {
-    return ByteBuffer.allocate(getBufferSize());
+    return reconstructor.allocateBuffer(getBufferSize());
   }
 
   private void initZeroStrip() {
@@ -421,7 +421,16 @@ class StripedReader {
   }
 
   void close() {
+    if (zeroStripeBuffers != null) {
+      for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
+        reconstructor.freeBuffer(zeroStripeBuffer);
+      }
+    }
+    zeroStripeBuffers = null;
+
     for (StripedBlockReader reader : readers) {
+      reconstructor.freeBuffer(reader.getReadBuffer());
+      reader.freeReadBuffer();
       reader.closeBlockReader();
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 782d091..5641c35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
@@ -102,6 +104,7 @@ abstract class StripedReconstructor {
   private final ErasureCodingPolicy ecPolicy;
   private RawErasureDecoder decoder;
   private final ExtendedBlock blockGroup;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
 
   // position in striped internal block
   private long positionInBlock;
@@ -139,8 +142,16 @@ abstract class StripedReconstructor {
    */
   abstract void reconstruct() throws IOException;
 
+  boolean useDirectBuffer() {
+    return decoder.preferDirectBuffer();
+  }
+
   ByteBuffer allocateBuffer(int length) {
-    return ByteBuffer.allocate(length);
+    return BUFFER_POOL.getBuffer(useDirectBuffer(), length);
+  }
+
+  void freeBuffer(ByteBuffer buffer) {
+    BUFFER_POOL.putBuffer(buffer);
   }
 
   ExtendedBlock getBlock(int i) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
index ca7a3a8..c099bc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
@@ -297,6 +297,14 @@ class StripedWriter {
   }
 
   void close() {
+    for (StripedBlockWriter writer : writers) {
+      ByteBuffer targetBuffer = writer.getTargetBuffer();
+      if (targetBuffer != null) {
+        reconstructor.freeBuffer(targetBuffer);
+        writer.freeTargetBuffer();
+      }
+    }
+
     for (int i = 0; i < targets.length; i++) {
       writers[i].close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index a02a8d6..18c2de9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -32,7 +33,9 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
 import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
+import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 import org.junit.After;
 import org.junit.Assert;
@@ -77,6 +80,11 @@ public class TestDFSStripedInputStream {
   public void setup() throws IOException {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+    if (ErasureCodeNative.isNativeCodeLoaded()) {
+      conf.set(
+          CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY,
+          NativeRSRawErasureCoderFactory.class.getCanonicalName());
+    }
     SimulatedFSDataset.setFactory(conf);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
         DATA_BLK_NUM + PARITY_BLK_NUM).build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index a12a8ce..8d54f08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -25,8 +25,11 @@ import java.util.Collections;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
+import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
@@ -65,6 +68,11 @@ public class TestDFSStripedOutputStream {
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
         false);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+    if (ErasureCodeNative.isNativeCodeLoaded()) {
+      conf.set(
+          CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY,
+          NativeRSRawErasureCoderFactory.class.getCanonicalName());
+    }
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
     fs = cluster.getFileSystem();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index 83b6c58..11036a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
+import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -183,6 +186,11 @@ public class TestDFSStripedOutputStreamWithFailure {
 
   private void setup(Configuration conf) throws IOException {
     final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
+    if (ErasureCodeNative.isNativeCodeLoaded()) {
+      conf.set(
+          CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY,
+          NativeRSRawErasureCoderFactory.class.getCanonicalName());
+    }
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.waitActive();
     dfs = cluster.getFileSystem();
@@ -229,7 +237,8 @@ public class TestDFSStripedOutputStreamWithFailure {
     final HdfsConfiguration conf = newHdfsConfiguration();
 
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
-    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
     // Set short retry timeouts so this test runs faster
     conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
     for (int dn = 0; dn < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; dn += 2) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f608777/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
index 36d2dbd..59e9f87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
@@ -33,6 +33,7 @@ import java.util.Random;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -47,6 +48,8 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
+import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
@@ -86,11 +89,17 @@ public class TestReconstructStripedFile {
   public void setup() throws IOException {
     final Configuration conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
-    conf.setInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
+    conf.setInt(
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
         cellSize - 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
         false);
+    if (ErasureCodeNative.isNativeCodeLoaded()) {
+      conf.set(
+          CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY,
+          NativeRSRawErasureCoderFactory.class.getCanonicalName());
+    }
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();
     cluster.waitActive();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message