hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject git commit: HDFS-7276. Limit the number of byte arrays used by DFSOutputStream and provide a mechanism for recycling arrays.
Date Sat, 01 Nov 2014 18:30:02 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 5d4d11400 -> a4dca4867


HDFS-7276. Limit the number of byte arrays used by DFSOutputStream and provide a mechanism for recycling arrays.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a4dca486
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a4dca486
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a4dca486

Branch: refs/heads/branch-2.6
Commit: a4dca48676341df423fb083128ffc4d6a71771df
Parents: 5d4d114
Author: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Authored: Sat Nov 1 11:22:13 2014 -0700
Committer: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Committed: Sat Nov 1 11:29:23 2014 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/ClientContext.java   |  10 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  30 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  21 +-
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  80 ++-
 .../hadoop/hdfs/util/ByteArrayManager.java      | 419 ++++++++++++
 .../hadoop/hdfs/util/TestByteArrayManager.java  | 635 +++++++++++++++++++
 7 files changed, 1166 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dca486/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index af892d9..98a8aaf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -300,6 +300,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-7313. Support optional configuration of AES cipher suite on
     DataTransferProtocol. (cnauroth)
 
+    HDFS-7276. Limit the number of byte arrays used by DFSOutputStream and
+    provide a mechanism for recycling arrays. (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dca486/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index f55aff5..e106fca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -84,6 +85,9 @@ public class ClientContext {
    */
   private volatile boolean disableLegacyBlockReaderLocal = false;
 
+  /** Creating byte[] for {@link DFSOutputStream}. */
+  private final ByteArrayManager byteArrayManager;  
+
   /**
    * Whether or not we complained about a DFSClient fetching a CacheContext that
    * didn't match its config values yet.
@@ -105,6 +109,8 @@ public class ClientContext {
           new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
     this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
     this.domainSocketFactory = new DomainSocketFactory(conf);
+
+    this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf);
   }
 
   public static String confAsString(Conf conf) {
@@ -204,4 +210,8 @@ public class ClientContext {
   public DomainSocketFactory getDomainSocketFactory() {
     return domainSocketFactory;
   }
+
+  public ByteArrayManager getByteArrayManager() {
+    return byteArrayManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dca486/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 8915dec..ad24a0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -56,8 +56,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
@@ -193,6 +191,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
@@ -288,6 +287,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     final ChecksumOpt defaultChecksumOpt;
     final int writePacketSize;
     final int writeMaxPackets;
+    final ByteArrayManager.Conf writeByteArrayManagerConf;
     final int socketTimeout;
     final int socketCacheCapacity;
     final long socketCacheExpiry;
@@ -358,8 +358,30 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       /** dfs.write.packet.size is an internal config variable */
       writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
           DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
-      writeMaxPackets = conf.getInt(DFS_CLIENT_WRITE_MAX_PACKETS_KEY,
-          DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT);
+      writeMaxPackets = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT);
+      
+      final boolean byteArrayManagerEnabled = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT);
+      if (!byteArrayManagerEnabled) {
+        writeByteArrayManagerConf = null;
+      } else {
+        final int countThreshold = conf.getInt(
+            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY,
+            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT);
+        final int countLimit = conf.getInt(
+            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY,
+            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT);
+        final long countResetTimePeriodMs = conf.getLong(
+            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY,
+            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
+        writeByteArrayManagerConf = new ByteArrayManager.Conf(
+            countThreshold, countLimit, countResetTimePeriodMs); 
+      }
+      
+      
       defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
           DFS_BLOCK_SIZE_DEFAULT);
       defaultReplication = (short) conf.getInt(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dca486/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 4982c71..fd313bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -50,10 +50,27 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,... 
   public static final String  DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
   public static final String  DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
-  public static final String  DFS_CLIENT_WRITE_MAX_PACKETS_KEY = "dfs.client.write.max-packets";
-  public static final int     DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT = 80;
+  public static final String  DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY = "dfs.client.write.max-packets-in-flight";
+  public static final int     DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT = 80;
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
   public static final int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+  public static final String  DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY
+      = "dfs.client.write.byte-array-manager.enabled";
+  public static final boolean DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT
+      = false;
+  public static final String  DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY
+      = "dfs.client.write.byte-array-manager.count-threshold";
+  public static final int     DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT
+      = 128;
+  public static final String  DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY
+      = "dfs.client.write.byte-array-manager.count-limit";
+  public static final int     DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT
+      = 2048;
+  public static final String  DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY
+      = "dfs.client.write.byte-array-manager.count-reset-time-period-ms";
+  public static final long    DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT
+      = 10L * 1000;
+
   public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
   public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
   public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dca486/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index a83c854..6cbf27a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -41,10 +41,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSOutputSummer;
@@ -55,7 +54,6 @@ import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
-import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -83,6 +81,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
@@ -99,6 +98,7 @@ import org.htrace.Trace;
 import org.htrace.TraceScope;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -143,6 +143,7 @@ public class DFSOutputStream extends FSOutputSummer
       CryptoProtocolVersion.supported();
 
   private final DFSClient dfsClient;
+  private final ByteArrayManager byteArrayManager;
   private Socket s;
   // closed is accessed by different threads under different locks.
   private volatile boolean closed = false;
@@ -181,6 +182,33 @@ public class DFSOutputStream extends FSOutputSummer
   private static final BlockStoragePolicySuite blockStoragePolicySuite =
       BlockStoragePolicySuite.createDefaultSuite();
 
+  /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
+  private Packet createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
+      long seqno) throws InterruptedIOException {
+    final byte[] buf;
+    final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
+
+    try {
+      buf = byteArrayManager.newByteArray(bufferSize);
+    } catch (InterruptedException ie) {
+      final InterruptedIOException iioe = new InterruptedIOException(
+          "seqno=" + seqno);
+      iioe.initCause(ie);
+      throw iioe;
+    }
+
+    return new Packet(buf, chunksPerPkt, offsetInBlock, seqno, getChecksumSize());
+  }
+
+  /**
+   * For heartbeat packets, create buffer directly by new byte[]
+   * since heartbeats should not be blocked.
+   */
+  private Packet createHeartbeatPacket() throws InterruptedIOException {
+    final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
+    return new Packet(buf, 0, 0, Packet.HEART_BEAT_SEQNO, getChecksumSize());
+  }
+
   private static class Packet {
     private static final long HEART_BEAT_SEQNO = -1L;
     long seqno; // sequencenumber of buffer in block
@@ -188,7 +216,7 @@ public class DFSOutputStream extends FSOutputSummer
     boolean syncBlock; // this packet forces the current block to disk
     int numChunks; // number of chunks currently in packet
     final int maxChunks; // max chunks in packet
-    final byte[]  buf;
+    private byte[] buf;
     private boolean lastPacketInBlock; // is this the last packet in block?
 
     /**
@@ -211,13 +239,6 @@ public class DFSOutputStream extends FSOutputSummer
     int dataPos;
 
     /**
-     * Create a heartbeat packet.
-     */
-    Packet(int checksumSize) {
-      this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize);
-    }
-    
-    /**
      * Create a new packet.
      * 
      * @param pktSize maximum size of the packet, 
@@ -225,15 +246,15 @@ public class DFSOutputStream extends FSOutputSummer
      * @param chunksPerPkt maximum number of chunks per packet.
      * @param offsetInBlock offset in bytes into the HDFS block.
      */
-    Packet(int pktSize, int chunksPerPkt, long offsetInBlock, 
-                              long seqno, int checksumSize) {
+    private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
+        int checksumSize) {
       this.lastPacketInBlock = false;
       this.numChunks = 0;
       this.offsetInBlock = offsetInBlock;
       this.seqno = seqno;
-      
-      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
-      
+
+      this.buf = buf;
+
       checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
       checksumPos = checksumStart;
       dataStart = checksumStart + (chunksPerPkt * checksumSize);
@@ -304,6 +325,11 @@ public class DFSOutputStream extends FSOutputSummer
         buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
       }
     }
+
+    private void releaseBuffer(ByteArrayManager bam) {
+      bam.release(buf);
+      buf = null;
+    }
     
     // get the packet's last byte's offset in the block
     long getLastByteOffsetBlock() {
@@ -547,7 +573,7 @@ public class DFSOutputStream extends FSOutputSummer
             }
             // get packet to be sent.
             if (dataQueue.isEmpty()) {
-              one = new Packet(getChecksumSize());  // heartbeat packet
+              one = createHeartbeatPacket();
             } else {
               one = dataQueue.getFirst(); // regular data packet
             }
@@ -907,6 +933,8 @@ public class DFSOutputStream extends FSOutputSummer
               lastAckedSeqno = seqno;
               ackQueue.removeFirst();
               dataQueue.notifyAll();
+
+              one.releaseBuffer(byteArrayManager);
             }
           } catch (Exception e) {
             if (!responderClosed) {
@@ -1657,6 +1685,7 @@ public class DFSOutputStream extends FSOutputSummer
 
     this.dfsclientSlowLogThresholdMs =
       dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
+    this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
   }
 
   /** Construct a new output stream for creating a file. */
@@ -1836,8 +1865,8 @@ public class DFSOutputStream extends FSOutputSummer
     }
 
     if (currentPacket == null) {
-      currentPacket = new Packet(packetSize, chunksPerPacket, 
-          bytesCurBlock, currentSeqno++, getChecksumSize());
+      currentPacket = createPacket(packetSize, chunksPerPacket, 
+          bytesCurBlock, currentSeqno++);
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
             currentPacket.seqno +
@@ -1884,8 +1913,7 @@ public class DFSOutputStream extends FSOutputSummer
       // indicate the end of block and reset bytesCurBlock.
       //
       if (bytesCurBlock == blockSize) {
-        currentPacket = new Packet(0, 0, bytesCurBlock, 
-            currentSeqno++, getChecksumSize());
+        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
@@ -1978,8 +2006,8 @@ public class DFSOutputStream extends FSOutputSummer
             // Nothing to send right now,
             // but sync was requested.
             // Send an empty packet
-            currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock, currentSeqno++, getChecksumSize());
+            currentPacket = createPacket(packetSize, chunksPerPacket,
+                bytesCurBlock, currentSeqno++);
           }
         } else {
           if (isSync && bytesCurBlock > 0) {
@@ -1987,8 +2015,8 @@ public class DFSOutputStream extends FSOutputSummer
             // and the block was partially written,
             // and sync was requested.
             // So send an empty sync packet.
-            currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock, currentSeqno++, getChecksumSize());
+            currentPacket = createPacket(packetSize, chunksPerPacket,
+                bytesCurBlock, currentSeqno++);
           } else {
             // just discard the current packet since it is already been sent.
             currentPacket = null;
@@ -2192,7 +2220,7 @@ public class DFSOutputStream extends FSOutputSummer
 
       if (bytesCurBlock != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize());
+        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dca486/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
new file mode 100644
index 0000000..4751e72
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Manage byte array creation and release. 
+ */
+@InterfaceAudience.Private
+public abstract class ByteArrayManager {
+  static final Log LOG = LogFactory.getLog(ByteArrayManager.class);
+  private static final ThreadLocal<StringBuilder> debugMessage = new ThreadLocal<StringBuilder>() {
+    protected StringBuilder initialValue() {
+      return new StringBuilder();
+    }
+  };
+
+  private static void logDebugMessage() {
+    final StringBuilder b = debugMessage.get();
+    LOG.debug(b);
+    b.setLength(0);
+  }
+
+  static final int MIN_ARRAY_LENGTH = 32;
+  static final byte[] EMPTY_BYTE_ARRAY = {};
+
+  /**
+   * @return the least power of two greater than or equal to n, i.e. return
+   *         the least integer x with x >= n and x a power of two.
+   *
+   * @throws HadoopIllegalArgumentException
+   *           if n <= 0.
+   */
+  public static int leastPowerOfTwo(final int n) {
+    if (n <= 0) {
+      throw new HadoopIllegalArgumentException("n = " + n + " <= 0");
+    }
+
+    final int highestOne = Integer.highestOneBit(n);
+    if (highestOne == n) {
+      return n; // n is a power of two.
+    }
+    final int roundUp = highestOne << 1;
+    if (roundUp < 0) {
+      final long overflow = ((long) highestOne) << 1;
+      throw new ArithmeticException(
+          "Overflow: for n = " + n + ", the least power of two (the least"
+          + " integer x with x >= n and x a power of two) = "
+          + overflow + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE);
+    }
+    return roundUp;
+  }
+
+  /**
+   * A counter with a time stamp so that it is reset automatically
+   * if there is no increment for the time period.
+   */
+  static class Counter {
+    private final long countResetTimePeriodMs;
+    private long count = 0L;
+    private long timestamp = Time.monotonicNow();
+
+    Counter(long countResetTimePeriodMs) {
+      this.countResetTimePeriodMs = countResetTimePeriodMs;
+    }
+
+    synchronized long getCount() {
+      return count;
+    }
+
+    /**
+     * Increment the counter, and reset it if there is no increment
+     * for acertain time period.
+     *
+     * @return the new count.
+     */
+    synchronized long increment() {
+      final long now = Time.monotonicNow();
+      if (now - timestamp > countResetTimePeriodMs) {
+        count = 0; // reset the counter
+      }
+      timestamp = now;
+      return ++count;
+    }
+  }
+
+  /** A map from integers to counters. */
+  static class CounterMap {
+    /** @see ByteArrayManager.Conf#countResetTimePeriodMs */
+    private final long countResetTimePeriodMs;
+    private final Map<Integer, Counter> map = new HashMap<Integer, Counter>();
+
+    private CounterMap(long countResetTimePeriodMs) {
+      this.countResetTimePeriodMs = countResetTimePeriodMs;
+    }
+
+    /**
+     * @return the counter for the given key;
+     *         and create a new counter if it does not exist.
+     */
+    synchronized Counter get(final Integer key, final boolean createIfNotExist) {
+      Counter count = map.get(key);
+      if (count == null && createIfNotExist) {
+        count = new Counter(countResetTimePeriodMs);
+        map.put(key, count);
+      }
+      return count;
+    }
+
+    synchronized void clear() {
+      map.clear();
+    }
+  }
+
+  /** Manage byte arrays with the same fixed length. */
+  static class FixedLengthManager {
+    private final int byteArrayLength;
+    private final int maxAllocated;
+    private final Queue<byte[]> freeQueue = new LinkedList<byte[]>();
+
+    private int numAllocated = 0;
+
+    FixedLengthManager(int arrayLength, int maxAllocated) {
+      this.byteArrayLength = arrayLength;
+      this.maxAllocated = maxAllocated;
+    }
+
+    /**
+     * Allocate a byte array.
+     *
+     * If the number of allocated arrays >= maximum, the current thread is
+     * blocked until the number of allocated arrays drops to below the maximum.
+     * 
+     * The byte array allocated by this method must be returned for recycling
+     * via the {@link FixedLengthManager#recycle(byte[])} method.
+     */
+    synchronized byte[] allocate() throws InterruptedException {
+      if (LOG.isDebugEnabled()) {
+        debugMessage.get().append(", ").append(this);
+      }
+      for(; numAllocated >= maxAllocated;) {
+        if (LOG.isDebugEnabled()) {
+          debugMessage.get().append(": wait ...");
+          logDebugMessage();
+        }
+
+        wait();
+
+        if (LOG.isDebugEnabled()) {
+          debugMessage.get().append("wake up: ").append(this);
+        }
+      }
+      numAllocated++;
+
+      final byte[] array = freeQueue.poll();
+      if (LOG.isDebugEnabled()) {
+        debugMessage.get().append(", recycled? ").append(array != null);
+      }
+      return array != null? array : new byte[byteArrayLength];
+    }
+
+    /**
+     * Recycle the given byte array, which must have the same length as the
+     * array length managed by this object.
+     *
+     * The byte array may or may not be allocated
+     * by the {@link FixedLengthManager#allocate()} method.
+     */
+    synchronized int recycle(byte[] array) {
+      Preconditions.checkNotNull(array);
+      Preconditions.checkArgument(array.length == byteArrayLength);
+      if (LOG.isDebugEnabled()) {
+        debugMessage.get().append(", ").append(this);
+      }
+
+      if (numAllocated == maxAllocated) {
+        if (LOG.isDebugEnabled()) {
+          debugMessage.get().append(", notifyAll");
+        }
+        notifyAll();
+      }
+      numAllocated--;
+      if (numAllocated < 0) {
+        // it is possible to drop below 0 since
+        // some byte arrays may not be created by the allocate() method.
+        numAllocated = 0;
+      }
+
+      if (freeQueue.size() < maxAllocated - numAllocated) {
+        if (LOG.isDebugEnabled()) {
+          debugMessage.get().append(", freeQueue.offer");
+        }
+        freeQueue.offer(array);
+      }
+      return freeQueue.size();
+    }
+
+    @Override
+    public synchronized String toString() {
+      return "[" + byteArrayLength + ": " + numAllocated + "/"
+          + maxAllocated + ", free=" + freeQueue.size() + "]";
+    }
+  }
+
+  /** A map from array lengths to byte array managers. */
+  static class ManagerMap {
+    private final int countLimit;
+    private final Map<Integer, FixedLengthManager> map = new HashMap<Integer, FixedLengthManager>();
+
+    ManagerMap(int countLimit) {
+      this.countLimit = countLimit;
+    }
+
+    /** @return the manager for the given array length. */
+    synchronized FixedLengthManager get(final Integer arrayLength,
+        final boolean createIfNotExist) {
+      FixedLengthManager manager = map.get(arrayLength);
+      if (manager == null && createIfNotExist) {
+        manager = new FixedLengthManager(arrayLength, countLimit);
+        map.put(arrayLength, manager);
+      }
+      return manager;
+    }
+
+    synchronized void clear() {
+      map.clear();
+    }
+  }
+
+  public static class Conf {
+    /**
+     * The count threshold for each array length so that a manager is created
+     * only after the allocation count exceeds the threshold.
+     */
+    private final int countThreshold;
+    /**
+     * The maximum number of arrays allowed for each array length.
+     */
+    private final int countLimit;
+    /**
+     * The time period in milliseconds that the allocation count for each array
+     * length is reset to zero if there is no increment.
+     */
+    private final long countResetTimePeriodMs;
+
+    public Conf(int countThreshold, int countLimit, long countResetTimePeriodMs) {
+      this.countThreshold = countThreshold;
+      this.countLimit = countLimit;
+      this.countResetTimePeriodMs = countResetTimePeriodMs;
+    }
+  }
+
+  /**
+   * Create a byte array for the given length, where the length of
+   * the returned array is larger than or equal to the given length.
+   *
+   * The current thread may be blocked if some resource is unavailable.
+   * 
+   * The byte array created by this method must be released
+   * via the {@link ByteArrayManager#release(byte[])} method.
+   *
+   * @return a byte array with length larger than or equal to the given length.
+   */
+  public abstract byte[] newByteArray(int size) throws InterruptedException;
+  
+  /**
+   * Release the given byte array.
+   * 
+   * The byte array may or may not be created
+   * by the {@link ByteArrayManager#newByteArray(int)} method.
+   * 
+   * @return the number of free array.
+   */
+  public abstract int release(byte[] array);
+
+  public static ByteArrayManager newInstance(Conf conf) {
+    return conf == null? new NewByteArrayWithoutLimit(): new Impl(conf);
+  }
+
+  /**
+   * A dummy implementation which simply calls new byte[].
+   */
+  static class NewByteArrayWithoutLimit extends ByteArrayManager {
+    @Override
+    public byte[] newByteArray(int size) throws InterruptedException {
+      return new byte[size];
+    }
+    
+    @Override
+    public int release(byte[] array) {
+      return 0;
+    }
+  }
+
+  /**
+   * Manage byte array allocation and provide a mechanism for recycling the byte
+   * array objects.
+   */
+  static class Impl extends ByteArrayManager {
+    private final Conf conf;
+  
+    private final CounterMap counters;
+    private final ManagerMap managers;
+  
+    Impl(Conf conf) {
+      this.conf = conf;
+      this.counters = new CounterMap(conf.countResetTimePeriodMs);
+      this.managers = new ManagerMap(conf.countLimit);
+    }
+  
+    /**
+     * Allocate a byte array, where the length of the allocated array
+     * is the least power of two of the given length
+     * unless the given length is less than {@link #MIN_ARRAY_LENGTH}.
+     * In such case, the returned array length is equal to {@link #MIN_ARRAY_LENGTH}.
+     *
+     * If the number of allocated arrays exceeds the capacity,
+     * the current thread is blocked until
+     * the number of allocated arrays drops to below the capacity.
+     * 
+     * The byte array allocated by this method must be returned for recycling
+     * via the {@link ByteArrayManager#recycle(byte[])} method.
+     *
+     * @return a byte array with length larger than or equal to the given length.
+     */
+    @Override
+    public byte[] newByteArray(final int arrayLength) throws InterruptedException {
+      if (LOG.isDebugEnabled()) {
+        debugMessage.get().append("allocate(").append(arrayLength).append(")");
+      }
+  
+      final byte[] array;
+      if (arrayLength == 0) {
+        array = EMPTY_BYTE_ARRAY;
+      } else {
+        final int powerOfTwo = arrayLength <= MIN_ARRAY_LENGTH?
+            MIN_ARRAY_LENGTH: leastPowerOfTwo(arrayLength);
+        final long count = counters.get(powerOfTwo, true).increment();
+        final boolean aboveThreshold = count > conf.countThreshold;
+        // create a new manager only if the count is above threshold.
+        final FixedLengthManager manager = managers.get(powerOfTwo, aboveThreshold);
+  
+        if (LOG.isDebugEnabled()) {
+          debugMessage.get().append(": count=").append(count)
+              .append(aboveThreshold? ", aboveThreshold": ", belowThreshold");
+        }
+        array = manager != null? manager.allocate(): new byte[powerOfTwo];
+      }
+  
+      if (LOG.isDebugEnabled()) {
+        logDebugMessage();
+      }
+      return array;
+    }
+  
+    /**
+     * Recycle the given byte array.
+     * 
+     * The byte array may or may not be allocated
+     * by the {@link ByteArrayManager#allocate(int)} method.
+     */
+    @Override
+    public int release(final byte[] array) {
+      Preconditions.checkNotNull(array);
+      if (LOG.isDebugEnabled()) {
+        debugMessage.get().append("recycle: array.length=").append(array.length);
+      }
+  
+      final int freeQueueSize;
+      if (array.length == 0) {
+        freeQueueSize = -1;
+      } else {
+        final FixedLengthManager manager = managers.get(array.length, false);
+        freeQueueSize = manager == null? -1: manager.recycle(array);
+      }
+  
+      if (LOG.isDebugEnabled()) {
+        debugMessage.get().append(", freeQueueSize=").append(freeQueueSize);
+        logDebugMessage();
+      }
+      return freeQueueSize;
+    }
+  
+    CounterMap getCounters() {
+      return counters;
+    }
+  
+    ManagerMap getManagers() {
+      return managers;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dca486/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
new file mode 100644
index 0000000..289617a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
@@ -0,0 +1,635 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.FixedLengthManager;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.ManagerMap;
+import org.apache.hadoop.util.Time;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test {@link ByteArrayManager}.
+ */
+public class TestByteArrayManager {
+  static {
+    ((Log4JLogger)LogFactory.getLog(ByteArrayManager.class)
+        ).getLogger().setLevel(Level.ALL);
+  }
+
+  static final Log LOG = LogFactory.getLog(TestByteArrayManager.class);
+
+  private static final Comparator<Future<Integer>> CMP = new Comparator<Future<Integer>>() {
+    @Override
+    public int compare(Future<Integer> left, Future<Integer> right) {
+      try {
+        return left.get().intValue() - right.get().intValue();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  };
+
+  @Test
+  public void testCounter() throws Exception {
+    final long countResetTimePeriodMs = 200L;
+    final Counter c = new Counter(countResetTimePeriodMs);
+
+    final int n = DFSUtil.getRandom().nextInt(512) + 512;
+    final List<Future<Integer>> futures = new ArrayList<Future<Integer>>(n);
+    
+    final ExecutorService pool = Executors.newFixedThreadPool(32);
+    try {
+      // increment
+      for(int i = 0; i < n; i++) {
+        futures.add(pool.submit(new Callable<Integer>() {
+          @Override
+          public Integer call() throws Exception {
+            return (int)c.increment();
+          }
+        }));
+      }
+  
+      // sort and wait for the futures
+      Collections.sort(futures, CMP);
+    } finally {
+      pool.shutdown();
+    }
+
+    // check futures
+    Assert.assertEquals(n, futures.size());
+    for(int i = 0; i < n; i++) {
+      Assert.assertEquals(i + 1, futures.get(i).get().intValue());
+    }
+    Assert.assertEquals(n, c.getCount());
+
+    // test auto-reset
+    Thread.sleep(countResetTimePeriodMs + 100);
+    Assert.assertEquals(1, c.increment());
+  }
+
+  
+
+  @Test
+  public void testAllocateRecycle() throws Exception {
+    final int countThreshold = 4;
+    final int countLimit = 8;
+    final long countResetTimePeriodMs = 200L;
+    final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
+        new ByteArrayManager.Conf(
+            countThreshold, countLimit, countResetTimePeriodMs));
+    
+    final CounterMap counters = bam.getCounters();
+    final ManagerMap managers = bam.getManagers();
+    
+    final int[] uncommonArrays = {0, 1, 2, 4, 8, 16, 32, 64};
+    final int arrayLength = 1024;
+
+
+    final Allocator allocator = new Allocator(bam);
+    final Recycler recycler = new Recycler(bam);
+    try {
+      { // allocate within threshold
+        for(int i = 0; i < countThreshold; i++) {
+          allocator.submit(arrayLength);
+        }        
+        waitForAll(allocator.futures);
+  
+        Assert.assertEquals(countThreshold,
+            counters.get(arrayLength, false).getCount());
+        Assert.assertNull(managers.get(arrayLength, false));
+        for(int n : uncommonArrays) {
+          Assert.assertNull(counters.get(n, false));
+          Assert.assertNull(managers.get(n, false));
+        }
+      }
+
+      { // recycle half of the arrays
+        for(int i = 0; i < countThreshold/2; i++) {
+          recycler.submit(removeLast(allocator.futures));
+        }
+
+        for(Future<Integer> f : recycler.furtures) {
+          Assert.assertEquals(-1, f.get().intValue());
+        }
+        recycler.furtures.clear();
+      }
+
+      { // allocate one more
+        allocator.submit(arrayLength).get();
+
+        Assert.assertEquals(countThreshold + 1, counters.get(arrayLength, false).getCount());
+        Assert.assertNotNull(managers.get(arrayLength, false));
+      }
+
+      { // recycle the remaining arrays
+        final int n = allocator.recycleAll(recycler);
+
+        recycler.verify(n);
+      }
+        
+      {
+        // allocate until the maximum.
+        for(int i = 0; i < countLimit; i++) {
+          allocator.submit(arrayLength);
+        }
+        waitForAll(allocator.futures);
+
+        // allocate one more should be blocked
+        final AllocatorThread t = new AllocatorThread(arrayLength, bam);
+        t.start();
+        
+        // check if the thread is waiting, timed wait or runnable.
+        for(int i = 0; i < 5; i++) {
+          Thread.sleep(100);
+          final Thread.State threadState = t.getState();
+          if (threadState != Thread.State.RUNNABLE
+              && threadState != Thread.State.WAITING
+              && threadState != Thread.State.TIMED_WAITING) {
+            Assert.fail("threadState = " + threadState);
+          }
+        }
+
+        // recycle an array
+        recycler.submit(removeLast(allocator.futures));
+        Assert.assertEquals(1, removeLast(recycler.furtures).intValue());
+
+        // check if the thread is unblocked
+        Thread.sleep(100);
+        Assert.assertEquals(Thread.State.TERMINATED, t.getState());
+            
+        // recycle the remaining, the recycle should be full.
+        Assert.assertEquals(countLimit-1, allocator.recycleAll(recycler));
+        recycler.submit(t.array);
+        recycler.verify(countLimit);
+
+        // recycle one more; it should not increase the free queue size
+        Assert.assertEquals(countLimit, bam.release(new byte[arrayLength]));
+      }
+    } finally {
+      allocator.pool.shutdown();
+      recycler.pool.shutdown();
+    }
+  }
+
+  static <T> T removeLast(List<Future<T>> furtures) throws Exception {
+    return remove(furtures, furtures.size() - 1);
+  }
+  static <T> T remove(List<Future<T>> furtures, int i) throws Exception {
+    return furtures.isEmpty()? null: furtures.remove(i).get();
+  }
+  
+  static <T> void waitForAll(List<Future<T>> furtures) throws Exception {
+    for(Future<T> f : furtures) {
+      f.get();
+    }
+  }
+
+  static class AllocatorThread extends Thread {
+    private final ByteArrayManager bam;
+    private final int arrayLength;
+    private byte[] array;
+    
+    AllocatorThread(int arrayLength, ByteArrayManager bam) {
+      this.bam = bam;
+      this.arrayLength = arrayLength;
+    }
+
+    @Override
+    public void run() {
+      try {
+        array = bam.newByteArray(arrayLength);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  static class Allocator {
+    private final ByteArrayManager bam;
+    final ExecutorService pool = Executors.newFixedThreadPool(8);
+    final List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>();
+
+    Allocator(ByteArrayManager bam) {
+      this.bam = bam;
+    }
+    
+    Future<byte[]> submit(final int arrayLength) {
+      final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
+        @Override
+        public byte[] call() throws Exception {
+          final byte[] array = bam.newByteArray(arrayLength);
+          Assert.assertEquals(arrayLength, array.length);
+          return array;
+        }
+      });
+      futures.add(f);
+      return f;
+    }
+    
+    int recycleAll(Recycler recycler) throws Exception {
+      final int n = futures.size();
+      for(Future<byte[]> f : futures) {
+        recycler.submit(f.get());
+      }
+      futures.clear();
+      return n;
+    }
+  }
+
+  static class Recycler {
+    private final ByteArrayManager bam;
+    final ExecutorService pool = Executors.newFixedThreadPool(8);
+    final List<Future<Integer>> furtures = new LinkedList<Future<Integer>>();
+
+    Recycler(ByteArrayManager bam) {
+      this.bam = bam;
+    }
+
+    Future<Integer> submit(final byte[] array) {
+      final Future<Integer> f = pool.submit(new Callable<Integer>() {
+        @Override
+        public Integer call() throws Exception {
+          return bam.release(array);
+        }
+      });
+      furtures.add(f);
+      return f;
+    }
+
+    void verify(final int expectedSize) throws Exception {
+      Assert.assertEquals(expectedSize, furtures.size());
+      Collections.sort(furtures, CMP);
+      for(int i = 0; i < furtures.size(); i++) {
+        Assert.assertEquals(i+1, furtures.get(i).get().intValue());
+      }
+      furtures.clear();
+    }
+  }
+
+
+  @Test
+  public void testByteArrayManager() throws Exception {
+    final int countThreshold = 32;
+    final int countLimit = 64;
+    final long countResetTimePeriodMs = 1000L;
+    final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
+        new ByteArrayManager.Conf(
+            countThreshold, countLimit, countResetTimePeriodMs));
+  
+    final CounterMap counters = bam.getCounters();
+    final ManagerMap managers = bam.getManagers();
+
+    final ExecutorService pool = Executors.newFixedThreadPool(128);
+    
+    final Runner[] runners = new Runner[Runner.NUM_RUNNERS];
+    final Thread[] threads = new Thread[runners.length];
+
+    final int num = 1 << 8;
+    for(int i = 0; i < runners.length; i++) {
+      runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam);
+      threads[i] = runners[i].start(num);
+    }
+    
+    final Thread randomRecycler = new Thread() {
+      @Override
+      public void run() {
+        LOG.info("randomRecycler start");
+        for(int i = 0; shouldRun(); i++) {
+          final int j = DFSUtil.getRandom().nextInt(runners.length);
+          try {
+            runners[j].recycle();
+          } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(this + " has " + e);
+          }
+
+          if ((i & 0xFF) == 0) {
+            sleepMs(100);
+          }
+        }
+        LOG.info("randomRecycler done");
+      }
+      
+      boolean shouldRun() {
+        for(int i = 0; i < runners.length; i++) {
+          if (threads[i].isAlive()) {
+            return true;
+          }
+          if (!runners[i].isEmpty()) {
+            return true;
+          }
+        }
+        return false;
+      }
+    };
+    randomRecycler.start();
+    
+    randomRecycler.join();
+
+    Assert.assertNull(counters.get(0, false));
+    for(int i = 1; i < runners.length; i++) {
+      if (!runners[i].assertionErrors.isEmpty()) {
+        for(AssertionError e : runners[i].assertionErrors) {
+          LOG.error("AssertionError " + i, e);
+        }
+        Assert.fail(runners[i].assertionErrors.size() + " AssertionError(s)");
+      }
+      
+      final int arrayLength = Runner.index2arrayLength(i);
+      final boolean exceedCountThreshold = counters.get(arrayLength, false).getCount() > countThreshold; 
+      final FixedLengthManager m = managers.get(arrayLength, false);
+      if (exceedCountThreshold) {
+        Assert.assertNotNull(m);
+      } else {
+        Assert.assertNull(m);
+      }
+    }
+  }
+
+  static void sleepMs(long ms) {
+    try {
+      Thread.sleep(ms);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail("Sleep is interrupted: " + e);
+    }
+  }
+
+  static class Runner implements Runnable {
+    static final int NUM_RUNNERS = 4;
+
+    static int index2arrayLength(int index) {
+      return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1);
+    }
+
+    private final ByteArrayManager bam;
+    final int maxArrayLength;
+    final int countThreshold;
+    final int maxArrays;
+    final ExecutorService pool;
+    final List<Future<byte[]>> arrays = new ArrayList<Future<byte[]>>();
+
+    final AtomicInteger count = new AtomicInteger();
+    final int p;
+    private int n;
+    
+    final List<AssertionError> assertionErrors = new ArrayList<AssertionError>();
+
+    Runner(int index, int countThreshold, int maxArrays,
+        ExecutorService pool, int p, ByteArrayManager bam) {
+      this.maxArrayLength = index2arrayLength(index);
+      this.countThreshold = countThreshold;
+      this.maxArrays = maxArrays;
+      this.pool = pool;
+      this.p = p;
+      this.bam = bam;
+    }
+
+    boolean isEmpty() {
+      synchronized (arrays) {
+        return arrays.isEmpty();
+      }
+    }
+ 
+    Future<byte[]> submitAllocate() {
+      count.incrementAndGet();
+
+      final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
+        @Override
+        public byte[] call() throws Exception {
+          final int lower = maxArrayLength == ByteArrayManager.MIN_ARRAY_LENGTH?
+              0: maxArrayLength >> 1;
+          final int arrayLength = DFSUtil.getRandom().nextInt(
+              maxArrayLength - lower) + lower + 1;
+          final byte[] array = bam.newByteArray(arrayLength);
+          try {
+            Assert.assertEquals("arrayLength=" + arrayLength + ", lower=" + lower,
+                maxArrayLength, array.length);
+          } catch(AssertionError e) {
+            assertionErrors.add(e);
+          }
+          return array;
+        }
+      });
+      synchronized (arrays) {
+        arrays.add(f);
+      }
+      return f;
+    }
+
+    byte[] removeFirst() throws Exception {
+      synchronized (arrays) {
+        return remove(arrays, 0);
+      }
+    }
+
+    void recycle() throws Exception {
+      final byte[] a = removeFirst();
+      if (a != null) {
+        recycle(a);
+      }
+    }
+
+    int recycle(final byte[] array) {
+      return bam.release(array);
+    }
+
+    Future<Integer> submitRecycle(final byte[] array) {
+      count.decrementAndGet();
+
+      final Future<Integer> f = pool.submit(new Callable<Integer>() {
+        @Override
+        public Integer call() throws Exception {
+          return recycle(array);
+        }
+      });
+      return f;
+    }
+
+    @Override
+    public void run() {
+      for(int i = 0; i < n; i++) {
+        final boolean isAllocate = DFSUtil.getRandom().nextInt(NUM_RUNNERS) < p;
+        if (isAllocate) {
+          submitAllocate();
+        } else {
+          try {
+            final byte[] a = removeFirst();
+            if (a != null) {
+              submitRecycle(a);
+            }
+          } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(this + " has " + e);
+          }
+        }
+
+        if ((i & 0xFF) == 0) {
+          sleepMs(100);
+        }
+      }
+    }
+    
+    Thread start(int n) {
+      this.n = n;
+      final Thread t = new Thread(this);
+      t.start();
+      return t;
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + ": max=" + maxArrayLength
+          + ", count=" + count;
+    }
+  }
+
+  static class NewByteArrayWithLimit extends ByteArrayManager {
+    private final int maxCount;
+    private int count = 0;
+    
+    NewByteArrayWithLimit(int maxCount) {
+      this.maxCount = maxCount;
+    }
+
+    @Override
+    public synchronized byte[] newByteArray(int size) throws InterruptedException {
+      for(; count >= maxCount; ) {
+        wait();
+      }
+      count++;
+      return new byte[size];
+    }
+    
+    @Override
+    public synchronized int release(byte[] array) {
+      if (count == maxCount) {
+        notifyAll();
+      }
+      count--;
+      return 0;
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    ((Log4JLogger)LogFactory.getLog(ByteArrayManager.class)
+        ).getLogger().setLevel(Level.OFF);
+
+    final int arrayLength = 64 * 1024; //64k
+    final int nThreads = 512;
+    final int nAllocations = 1 << 15;
+    final int maxArrays = 1 << 10;
+    final int nTrials = 5;
+
+    System.out.println("arrayLength=" + arrayLength
+        + ", nThreads=" + nThreads
+        + ", nAllocations=" + nAllocations
+        + ", maxArrays=" + maxArrays);
+    
+    final Random ran = DFSUtil.getRandom();
+    final ByteArrayManager[] impls = {
+        new ByteArrayManager.NewByteArrayWithoutLimit(),
+        new NewByteArrayWithLimit(maxArrays),
+        new ByteArrayManager.Impl(new ByteArrayManager.Conf(
+            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT,
+            maxArrays,
+            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT))
+    };
+    final double[] avg = new double[impls.length];
+
+    for(int i = 0; i < impls.length; i++) {
+      double duration = 0;
+      printf("%26s:", impls[i].getClass().getSimpleName());
+      for(int j = 0; j < nTrials; j++) {
+        final int[] sleepTime = new int[nAllocations];
+        for(int k = 0; k < sleepTime.length; k++) {
+          sleepTime[k] = ran.nextInt(100);
+        }
+      
+        final long elapsed = performanceTest(arrayLength, maxArrays, nThreads,
+            sleepTime, impls[i]);
+        duration += elapsed;
+        printf("%5d, ", elapsed);
+      }
+      avg[i] = duration/nTrials;
+      printf("avg=%6.3fs", avg[i]/1000);
+      for(int j = 0; j < i; j++) {
+        printf(" (%6.2f%%)", percentageDiff(avg[j], avg[i]));
+      }
+      printf("\n");
+    }
+  }
+  
+  static double percentageDiff(double original, double newValue) {
+    return (newValue - original)/original*100;
+  }
+  
+  static void printf(String format, Object... args) {
+    System.out.printf(format, args);
+    System.out.flush();
+  }
+  
+  static long performanceTest(final int arrayLength, final int maxArrays,
+      final int nThreads, final int[] sleepTimeMSs, final ByteArrayManager impl)
+          throws Exception {
+    final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
+    final List<Future<Void>> futures = new ArrayList<Future<Void>>(sleepTimeMSs.length);
+    final long startTime = Time.monotonicNow();
+
+    for(int i = 0; i < sleepTimeMSs.length; i++) {
+      final long sleepTime = sleepTimeMSs[i];
+      futures.add(pool.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          byte[] array = impl.newByteArray(arrayLength);
+          sleepMs(sleepTime);
+          impl.release(array);
+          return null;
+        }
+      }));
+    }
+    for(Future<Void> f : futures) {
+      f.get();
+    }
+
+    final long endTime = Time.monotonicNow();
+    pool.shutdown();
+    return endTime - startTime;
+  }
+}


Mime
View raw message