hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [4/4] hadoop git commit: HDFS-9168. Move client side unit test to hadoop-hdfs-client. Contributed by Haohui Mai.
Date Wed, 28 Oct 2015 22:56:42 GMT
HDFS-9168. Move client side unit test to hadoop-hdfs-client. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/65f53f24
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/65f53f24
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/65f53f24

Branch: refs/heads/trunk
Commit: 65f53f246ab80e67017f5636f4b04191a631865f
Parents: 73bc65e
Author: Haohui Mai <wheat9@apache.org>
Authored: Wed Oct 28 15:54:37 2015 -0700
Committer: Haohui Mai <wheat9@apache.org>
Committed: Wed Oct 28 15:54:37 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs-client/pom.xml  |  21 +
 .../datatransfer/ReplaceDatanodeOnFailure.java  |  78 ++-
 .../hadoop/fs/TestUrlStreamHandlerFactory.java  |  80 +++
 .../java/org/apache/hadoop/fs/TestXAttr.java    |  94 +++
 .../org/apache/hadoop/hdfs/TestDFSPacket.java   |  94 +++
 .../hadoop/hdfs/TestDefaultNameNodePort.java    |  66 ++
 .../org/apache/hadoop/hdfs/TestPeerCache.java   | 292 +++++++++
 .../hdfs/client/impl/TestLeaseRenewer.java      | 207 ++++++
 .../hadoop/hdfs/protocol/TestExtendedBlock.java |  77 +++
 .../hdfs/shortcircuit/TestShortCircuitShm.java  | 109 ++++
 .../hadoop/hdfs/util/TestByteArrayManager.java  | 644 ++++++++++++++++++
 .../hdfs/util/TestExactSizeInputStream.java     | 129 ++++
 .../hdfs/web/TestByteRangeInputStream.java      | 292 +++++++++
 .../hdfs/web/TestOffsetUrlInputStream.java      |  64 ++
 .../apache/hadoop/hdfs/web/TestTokenAspect.java | 316 +++++++++
 .../hdfs/web/TestURLConnectionFactory.java      |  50 ++
 .../hadoop/hdfs/web/TestWebHDFSOAuth2.java      | 217 +++++++
 .../hdfs/web/TestWebHdfsContentLength.java      | 197 ++++++
 .../hdfs/web/oauth2/TestAccessTokenTimer.java   |  63 ++
 ...ClientCredentialTimeBasedTokenRefresher.java | 139 ++++
 ...TestRefreshTokenTimeBasedTokenRefresher.java | 138 ++++
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 hadoop-hdfs-project/hadoop-hdfs/pom.xml         |   6 -
 .../hadoop/fs/TestUrlStreamHandlerFactory.java  |  80 ---
 .../java/org/apache/hadoop/fs/TestXAttr.java    |  94 ---
 .../org/apache/hadoop/hdfs/TestDFSPacket.java   |  94 ---
 .../hadoop/hdfs/TestDefaultNameNodePort.java    |  68 --
 .../org/apache/hadoop/hdfs/TestPeerCache.java   | 293 ---------
 .../hdfs/client/impl/TestLeaseRenewer.java      | 209 ------
 .../hadoop/hdfs/protocol/TestExtendedBlock.java |  77 ---
 .../hdfs/server/namenode/TestAuditLogger.java   |   4 +-
 .../hdfs/shortcircuit/TestShortCircuitShm.java  | 109 ----
 .../hadoop/hdfs/util/TestByteArrayManager.java  | 645 -------------------
 .../hdfs/util/TestExactSizeInputStream.java     | 129 ----
 .../hdfs/web/TestByteRangeInputStream.java      | 292 ---------
 .../hdfs/web/TestOffsetUrlInputStream.java      |  64 --
 .../apache/hadoop/hdfs/web/TestTokenAspect.java | 316 ---------
 .../hdfs/web/TestURLConnectionFactory.java      |  50 --
 .../hadoop/hdfs/web/TestWebHDFSOAuth2.java      | 216 -------
 .../hdfs/web/TestWebHdfsContentLength.java      | 197 ------
 .../hdfs/web/oauth2/TestAccessTokenTimer.java   |  63 --
 ...ClientCredentialTimeBasedTokenRefresher.java | 139 ----
 ...TestRefreshTokenTimeBasedTokenRefresher.java | 138 ----
 hadoop-project/pom.xml                          |   5 +
 44 files changed, 3336 insertions(+), 3321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
index 63c16d1..c70b890 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
@@ -51,6 +51,27 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mock-server</groupId>
+      <artifactId>mockserver-netty</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
index c21a6a5..7099c28d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
@@ -30,16 +30,49 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class ReplaceDatanodeOnFailure {
+  /**
+   * DEFAULT condition:
+   *   Let r be the replication number.
+   *   Let n be the number of existing datanodes.
+   *   Add a new datanode only if r >= 3 and either
+   *   (1) floor(r/2) >= n or (2) the block is hflushed/appended.
+   */
+  private static final Condition CONDITION_DEFAULT = new Condition() {
+    @Override
+    public boolean satisfy(final short replication,
+        final DatanodeInfo[] existings, final int n, final boolean isAppend,
+        final boolean isHflushed) {
+      return replication >= 3 &&
+          (n <= (replication / 2) || isAppend || isHflushed);
+    }
+  };
+  /** Return false unconditionally. */
+  private static final Condition CONDITION_FALSE = new Condition() {
+    @Override
+    public boolean satisfy(short replication, DatanodeInfo[] existings,
+        int nExistings, boolean isAppend, boolean isHflushed) {
+      return false;
+    }
+  };
+  /** Return true unconditionally. */
+  private static final Condition CONDITION_TRUE = new Condition() {
+    @Override
+    public boolean satisfy(short replication, DatanodeInfo[] existings,
+        int nExistings, boolean isAppend, boolean isHflushed) {
+      return true;
+    }
+  };
+
   /** The replacement policies */
   public enum Policy {
     /** The feature is disabled in the entire site. */
-    DISABLE(Condition.FALSE),
+    DISABLE(CONDITION_FALSE),
     /** Never add a new datanode. */
-    NEVER(Condition.FALSE),
-    /** @see ReplaceDatanodeOnFailure.Condition#DEFAULT */
-    DEFAULT(Condition.DEFAULT),
+    NEVER(CONDITION_FALSE),
+    /** @see ReplaceDatanodeOnFailure#CONDITION_DEFAULT */
+    DEFAULT(CONDITION_DEFAULT),
     /** Always add a new datanode when an existing datanode is removed. */
-    ALWAYS(Condition.TRUE);
+    ALWAYS(CONDITION_TRUE);
 
     private final Condition condition;
 
@@ -54,41 +87,6 @@ public class ReplaceDatanodeOnFailure {
 
   /** Datanode replacement condition */
   private interface Condition {
-    /** Return true unconditionally. */
-    Condition TRUE = new Condition() {
-      @Override
-      public boolean satisfy(short replication, DatanodeInfo[] existings,
-          int nExistings, boolean isAppend, boolean isHflushed) {
-        return true;
-      }
-    };
-
-    /** Return false unconditionally. */
-    Condition FALSE = new Condition() {
-      @Override
-      public boolean satisfy(short replication, DatanodeInfo[] existings,
-          int nExistings, boolean isAppend, boolean isHflushed) {
-        return false;
-      }
-    };
-
-    /**
-     * DEFAULT condition:
-     *   Let r be the replication number.
-     *   Let n be the number of existing datanodes.
-     *   Add a new datanode only if r >= 3 and either
-     *   (1) floor(r/2) >= n; or
-     *   (2) r > n and the block is hflushed/appended.
-     */
-    Condition DEFAULT = new Condition() {
-      @Override
-      public boolean satisfy(final short replication,
-          final DatanodeInfo[] existings, final int n, final boolean isAppend,
-          final boolean isHflushed) {
-        return replication >= 3 &&
-            (n <= (replication / 2) || isAppend || isHflushed);
-      }
-    };
 
     /** Is the condition satisfied? */
     boolean satisfy(short replication, DatanodeInfo[] existings, int nExistings,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandlerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandlerFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandlerFactory.java
new file mode 100644
index 0000000..910fee2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandlerFactory.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test of the URL stream handler factory.
+ */
+public class TestUrlStreamHandlerFactory {
+
+  private static final int RUNS = 20;
+  private static final int THREADS = 10;
+  private static final int TASKS = 200;
+  private static final int TIMEOUT = 30;
+
+  @Test
+  public void testConcurrency() throws Exception {
+    for (int i = 0; i < RUNS; i++) {
+      singleRun();
+    }
+  }
+
+  private void singleRun() throws Exception {
+    final FsUrlStreamHandlerFactory factory = new FsUrlStreamHandlerFactory();
+    final Random random = new Random();
+    ExecutorService executor = Executors.newFixedThreadPool(THREADS);
+    ArrayList<Future<?>> futures = new ArrayList<Future<?>>(TASKS);
+
+    for (int i = 0; i < TASKS ; i++) {
+      final int aux = i;
+      futures.add(executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          int rand = aux + random.nextInt(3);
+          factory.createURLStreamHandler(String.valueOf(rand));
+        }
+      }));
+    }
+
+    executor.shutdown();
+    try {
+      executor.awaitTermination(TIMEOUT, TimeUnit.SECONDS);
+      executor.shutdownNow();
+    } catch (InterruptedException e) {
+      // pass
+    }
+
+    // check for exceptions
+    for (Future future : futures) {
+      if (!future.isDone()) {
+        break; // timed out
+      }
+      future.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/fs/TestXAttr.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/fs/TestXAttr.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/fs/TestXAttr.java
new file mode 100644
index 0000000..e47658d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/fs/TestXAttr.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests for <code>XAttr</code> objects.
+ */
+public class TestXAttr {
+  private static XAttr XATTR, XATTR1, XATTR2, XATTR3, XATTR4, XATTR5;
+  
+  @BeforeClass
+  public static void setUp() throws Exception {
+    byte[] value = {0x31, 0x32, 0x33};
+    XATTR = new XAttr.Builder()
+      .setName("name")
+      .setValue(value)
+      .build();
+    XATTR1 = new XAttr.Builder()
+      .setNameSpace(XAttr.NameSpace.USER)
+      .setName("name")
+      .setValue(value)
+      .build();
+    XATTR2 = new XAttr.Builder()
+      .setNameSpace(XAttr.NameSpace.TRUSTED)
+      .setName("name")
+      .setValue(value)
+      .build();
+    XATTR3 = new XAttr.Builder()
+      .setNameSpace(XAttr.NameSpace.SYSTEM)
+      .setName("name")
+      .setValue(value)
+      .build();
+    XATTR4 = new XAttr.Builder()
+      .setNameSpace(XAttr.NameSpace.SECURITY)
+      .setName("name")
+      .setValue(value)
+      .build();
+    XATTR5 = new XAttr.Builder()
+      .setNameSpace(XAttr.NameSpace.RAW)
+      .setName("name")
+      .setValue(value)
+      .build();
+  }
+  
+  @Test
+  public void testXAttrEquals() {
+    assertNotSame(XATTR1, XATTR2);
+    assertNotSame(XATTR2, XATTR3);
+    assertNotSame(XATTR3, XATTR4);
+    assertNotSame(XATTR4, XATTR5);
+    assertEquals(XATTR, XATTR1);
+    assertEquals(XATTR1, XATTR1);
+    assertEquals(XATTR2, XATTR2);
+    assertEquals(XATTR3, XATTR3);
+    assertEquals(XATTR4, XATTR4);
+    assertEquals(XATTR5, XATTR5);
+    assertFalse(XATTR1.equals(XATTR2));
+    assertFalse(XATTR2.equals(XATTR3));
+    assertFalse(XATTR3.equals(XATTR4));
+    assertFalse(XATTR4.equals(XATTR5));
+  }
+  
+  @Test
+  public void testXAttrHashCode() {
+    assertEquals(XATTR.hashCode(), XATTR1.hashCode());
+    assertFalse(XATTR1.hashCode() == XATTR2.hashCode());
+    assertFalse(XATTR2.hashCode() == XATTR3.hashCode());
+    assertFalse(XATTR3.hashCode() == XATTR4.hashCode());
+    assertFalse(XATTR4.hashCode() == XATTR5.hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
new file mode 100755
index 0000000..77957bc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.Random;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.htrace.core.SpanId;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDFSPacket {
+  private static final int chunkSize = 512;
+  private static final int checksumSize = 4;
+  private static final int maxChunksPerPacket = 4;
+
+  @Test
+  public void testPacket() throws Exception {
+    Random r = new Random(12345L);
+    byte[] data =  new byte[chunkSize];
+    r.nextBytes(data);
+    byte[] checksum = new byte[checksumSize];
+    r.nextBytes(checksum);
+
+    DataOutputBuffer os =  new DataOutputBuffer(data.length * 2);
+
+    byte[] packetBuf = new byte[data.length * 2];
+    DFSPacket p = new DFSPacket(packetBuf, maxChunksPerPacket,
+                                0, 0, checksumSize, false);
+    p.setSyncBlock(true);
+    p.writeData(data, 0, data.length);
+    p.writeChecksum(checksum, 0, checksum.length);
+    p.writeTo(os);
+
+    //we have set syncBlock to true, so the header has the maximum length
+    int headerLen = PacketHeader.PKT_MAX_HEADER_LEN;
+    byte[] readBuf = os.getData();
+
+    assertArrayRegionsEqual(readBuf, headerLen, checksum, 0, checksum.length);
+    assertArrayRegionsEqual(readBuf, headerLen + checksum.length, data, 0, data.length);
+
+  }
+
+  public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+                                             int off2, int len) {
+    for (int i = 0; i < len; i++) {
+      if (buf1[off1 + i] != buf2[off2 + i]) {
+        Assert.fail("arrays differ at byte " + i + ". " +
+            "The first array has " + (int) buf1[off1 + i] +
+            ", but the second array has " + (int) buf2[off2 + i]);
+      }
+    }
+  }
+
+  @Test
+  public void testAddParentsGetParents() throws Exception {
+    DFSPacket p = new DFSPacket(null, maxChunksPerPacket,
+                                0, 0, checksumSize, false);
+    SpanId parents[] = p.getTraceParents();
+    Assert.assertEquals(0, parents.length);
+    p.addTraceParent(new SpanId(0, 123));
+    p.addTraceParent(new SpanId(0, 123));
+    parents = p.getTraceParents();
+    Assert.assertEquals(1, parents.length);
+    Assert.assertEquals(new SpanId(0, 123), parents[0]);
+    parents = p.getTraceParents(); // test calling 'get' again.
+    Assert.assertEquals(1, parents.length);
+    Assert.assertEquals(new SpanId(0, 123), parents[0]);
+    p.addTraceParent(new SpanId(0, 1));
+    p.addTraceParent(new SpanId(0, 456));
+    p.addTraceParent(new SpanId(0, 789));
+    parents = p.getTraceParents();
+    Assert.assertEquals(4, parents.length);
+    Assert.assertEquals(new SpanId(0, 1), parents[0]);
+    Assert.assertEquals(new SpanId(0, 123), parents[1]);
+    Assert.assertEquals(new SpanId(0, 456), parents[2]);
+    Assert.assertEquals(new SpanId(0, 789), parents[3]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDefaultNameNodePort.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDefaultNameNodePort.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDefaultNameNodePort.java
new file mode 100644
index 0000000..fc76a07
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDefaultNameNodePort.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test NameNode port defaulting code. */
+public class TestDefaultNameNodePort {
+
+  @Test
+  public void testGetAddressFromString() throws Exception {
+    assertEquals(DFSUtilClient.getNNAddress("foo").getPort(),
+                 HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
+    assertEquals(DFSUtilClient.getNNAddress("hdfs://foo/").getPort(),
+                 HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
+    assertEquals(DFSUtilClient.getNNAddress("hdfs://foo:555").getPort(),
+                 555);
+    assertEquals(DFSUtilClient.getNNAddress("foo:555").getPort(),
+                 555);
+  }
+
+  @Test
+  public void testGetAddressFromConf() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    FileSystem.setDefaultUri(conf, "hdfs://foo/");
+    assertEquals(DFSUtilClient.getNNAddress(conf).getPort(),
+        HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
+    FileSystem.setDefaultUri(conf, "hdfs://foo:555/");
+    assertEquals(DFSUtilClient.getNNAddress(conf).getPort(), 555);
+    FileSystem.setDefaultUri(conf, "foo");
+    assertEquals(DFSUtilClient.getNNAddress(conf).getPort(),
+        HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
+  }
+
+  @Test
+  public void testGetUri() {
+    assertEquals(DFSUtilClient.getNNUri(new InetSocketAddress("foo", 555)),
+                 URI.create("hdfs://foo:555"));
+    assertEquals(DFSUtilClient.getNNUri(new InetSocketAddress("foo",
+            HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT)),
+        URI.create("hdfs://foo"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
new file mode 100644
index 0000000..b24df2b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.collect.HashMultiset;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class TestPeerCache {
+  static final Logger LOG = LoggerFactory.getLogger(TestPeerCache.class);
+
+  private static class FakePeer implements Peer {
+    private boolean closed = false;
+    private final boolean hasDomain;
+
+    private final DatanodeID dnId;
+
+    public FakePeer(DatanodeID dnId, boolean hasDomain) {
+      this.dnId = dnId;
+      this.hasDomain = hasDomain;
+    }
+
+    @Override
+    public ReadableByteChannel getInputStreamChannel() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setReadTimeout(int timeoutMs) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getReceiveBufferSize() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean getTcpNoDelay() throws IOException {
+      return false;
+    }
+
+    @Override
+    public void setWriteTimeout(int timeoutMs) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isClosed() {
+      return closed;
+    }
+  
+    @Override
+    public void close() throws IOException {
+      closed = true;
+    }
+
+    @Override
+    public String getRemoteAddressString() {
+      return dnId.getInfoAddr();
+    }
+
+    @Override
+    public String getLocalAddressString() {
+      return "127.0.0.1:123";
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  
+    @Override
+    public boolean isLocal() {
+      return true;
+    }
+  
+    @Override
+    public String toString() {
+      return "FakePeer(dnId=" + dnId + ")";
+    }
+
+    @Override
+    public DomainSocket getDomainSocket() {
+      if (!hasDomain) return null;
+      // Return a mock which throws an exception whenever any function is
+      // called.
+      return Mockito.mock(DomainSocket.class,
+          new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation)
+                throws Throwable {
+              throw new RuntimeException("injected fault.");
+          } });
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof FakePeer)) return false;
+      FakePeer other = (FakePeer)o;
+      return hasDomain == other.hasDomain &&
+          dnId.equals(other.dnId);
+    }
+
+    @Override
+    public int hashCode() {
+      return dnId.hashCode() ^ (hasDomain ? 1 : 0);
+    }
+
+    @Override
+    public boolean hasSecureChannel() {
+      return false;
+    }
+  }
+
+  @Test
+  public void testAddAndRetrieve() throws Exception {
+    PeerCache cache = new PeerCache(3, 100000);
+    DatanodeID dnId = new DatanodeID("192.168.0.1",
+          "fakehostname", "fake_datanode_id",
+          100, 101, 102, 103);
+    FakePeer peer = new FakePeer(dnId, false);
+    cache.put(dnId, peer);
+    assertTrue(!peer.isClosed());
+    assertEquals(1, cache.size());
+    assertEquals(peer, cache.get(dnId, false));
+    assertEquals(0, cache.size());
+    cache.close();
+  }
+
+  @Test
+  public void testExpiry() throws Exception {
+    final int CAPACITY = 3;
+    final int EXPIRY_PERIOD = 10;
+    PeerCache cache = new PeerCache(CAPACITY, EXPIRY_PERIOD);
+    DatanodeID dnIds[] = new DatanodeID[CAPACITY];
+    FakePeer peers[] = new FakePeer[CAPACITY];
+    for (int i = 0; i < CAPACITY; ++i) {
+      dnIds[i] = new DatanodeID("192.168.0.1",
+          "fakehostname_" + i, "fake_datanode_id",
+          100, 101, 102, 103);
+      peers[i] = new FakePeer(dnIds[i], false);
+    }
+    for (int i = 0; i < CAPACITY; ++i) {
+      cache.put(dnIds[i], peers[i]);
+    }
+
+    // Wait for the peers to expire
+    Thread.sleep(EXPIRY_PERIOD * 50);
+    assertEquals(0, cache.size());
+
+    // make sure that the peers were closed when they were expired
+    for (int i = 0; i < CAPACITY; ++i) {
+      assertTrue(peers[i].isClosed());
+    }
+
+    // sleep for another second and see if 
+    // the daemon thread runs fine on empty cache
+    Thread.sleep(EXPIRY_PERIOD * 50);
+    cache.close();
+  }
+
+  @Test
+  public void testEviction() throws Exception {
+    final int CAPACITY = 3;
+    PeerCache cache = new PeerCache(CAPACITY, 100000);
+    DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1];
+    FakePeer peers[] = new FakePeer[CAPACITY + 1];
+    for (int i = 0; i < dnIds.length; ++i) {
+      dnIds[i] = new DatanodeID("192.168.0.1",
+          "fakehostname_" + i, "fake_datanode_id_" + i,
+          100, 101, 102, 103);
+      peers[i] = new FakePeer(dnIds[i], false);
+    }
+    for (int i = 0; i < CAPACITY; ++i) {
+      cache.put(dnIds[i], peers[i]);
+    }
+    // Check that the peers are cached
+    assertEquals(CAPACITY, cache.size());
+
+    // Add another entry and check that the first entry was evicted
+    cache.put(dnIds[CAPACITY], peers[CAPACITY]);
+    assertEquals(CAPACITY, cache.size());
+    assertSame(null, cache.get(dnIds[0], false));
+
+    // Make sure that the other entries are still there
+    for (int i = 1; i < CAPACITY; ++i) {
+      Peer peer = cache.get(dnIds[i], false);
+      assertSame(peers[i], peer);
+      assertTrue(!peer.isClosed());
+      peer.close();
+    }
+    assertEquals(1, cache.size());
+    cache.close();
+  }
+
+  @Test
+  public void testMultiplePeersWithSameKey() throws Exception {
+    final int CAPACITY = 3;
+    PeerCache cache = new PeerCache(CAPACITY, 100000);
+    DatanodeID dnId = new DatanodeID("192.168.0.1",
+          "fakehostname", "fake_datanode_id",
+          100, 101, 102, 103);
+    HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY);
+    for (int i = 0; i < CAPACITY; ++i) {
+      FakePeer peer = new FakePeer(dnId, false);
+      peers.add(peer);
+      cache.put(dnId, peer);
+    }
+    // Check that all of the peers ended up in the cache
+    assertEquals(CAPACITY, cache.size());
+    while (!peers.isEmpty()) {
+      Peer peer = cache.get(dnId, false);
+      assertTrue(peer != null);
+      assertTrue(!peer.isClosed());
+      peers.remove(peer);
+    }
+    assertEquals(0, cache.size());
+    cache.close();
+  }
+
+  @Test
+  public void testDomainSocketPeers() throws Exception {
+    final int CAPACITY = 3;
+    PeerCache cache = new PeerCache(CAPACITY, 100000);
+    DatanodeID dnId = new DatanodeID("192.168.0.1",
+          "fakehostname", "fake_datanode_id",
+          100, 101, 102, 103);
+    HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY);
+    for (int i = 0; i < CAPACITY; ++i) {
+      FakePeer peer = new FakePeer(dnId, i == CAPACITY - 1);
+      peers.add(peer);
+      cache.put(dnId, peer);
+    }
+    // Check that all of the peers ended up in the cache
+    assertEquals(CAPACITY, cache.size());
+    // Test that get(requireDomainPeer=true) finds the peer with the 
+    // domain socket.
+    Peer peer = cache.get(dnId, true);
+    assertTrue(peer.getDomainSocket() != null);
+    peers.remove(peer);
+    // Test that get(requireDomainPeer=true) returns null when there are
+    // no more peers with domain sockets.
+    peer = cache.get(dnId, true);
+    assertTrue(peer == null);
+    // Check that all of the other peers ended up in the cache.
+    while (!peers.isEmpty()) {
+      peer = cache.get(dnId, false);
+      assertTrue(peer != null);
+      assertTrue(!peer.isClosed());
+      peers.remove(peer);
+    }
+    assertEquals(0, cache.size());
+    cache.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
new file mode 100644
index 0000000..eb10e96
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertSame;
+
+public class TestLeaseRenewer {
+  private final String FAKE_AUTHORITY="hdfs://nn1/";
+  private final UserGroupInformation FAKE_UGI_A =
+    UserGroupInformation.createUserForTesting(
+      "myuser", new String[]{"group1"});
+  private final UserGroupInformation FAKE_UGI_B =
+    UserGroupInformation.createUserForTesting(
+      "myuser", new String[]{"group1"});
+
+  private DFSClient MOCK_DFSCLIENT;
+  private LeaseRenewer renewer;
+
+  /** Cause renewals often so test runs quickly. */
+  private static final long FAST_GRACE_PERIOD = 100L;
+
+  @Before
+  public void setupMocksAndRenewer() throws IOException {
+    MOCK_DFSCLIENT = createMockClient();
+
+    renewer = LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+    renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
+}
+
+  private DFSClient createMockClient() {
+    final DfsClientConf mockConf = Mockito.mock(DfsClientConf.class);
+    Mockito.doReturn((int)FAST_GRACE_PERIOD).when(mockConf).getHdfsTimeout();
+
+    DFSClient mock = Mockito.mock(DFSClient.class);
+    Mockito.doReturn(true).when(mock).isClientRunning();
+    Mockito.doReturn(mockConf).when(mock).getConf();
+    Mockito.doReturn("myclient").when(mock).getClientName();
+    return mock;
+  }
+
+  @Test
+  public void testInstanceSharing() throws IOException {
+    // Two lease renewers with the same UGI should return
+    // the same instance
+    LeaseRenewer lr = LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+    LeaseRenewer lr2 = LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+    Assert.assertSame(lr, lr2);
+
+    // But a different UGI should return a different instance
+    LeaseRenewer lr3 = LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_B, MOCK_DFSCLIENT);
+    Assert.assertNotSame(lr, lr3);
+
+    // A different authority with same UGI should also be a different
+    // instance.
+    LeaseRenewer lr4 = LeaseRenewer.getInstance(
+        "someOtherAuthority", FAKE_UGI_B, MOCK_DFSCLIENT);
+    Assert.assertNotSame(lr, lr4);
+    Assert.assertNotSame(lr3, lr4);
+  }
+
+  @Test
+  public void testRenewal() throws Exception {
+    // Keep track of how many times the lease gets renewed
+    final AtomicInteger leaseRenewalCount = new AtomicInteger();
+    Mockito.doAnswer(new Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        leaseRenewalCount.incrementAndGet();
+        return true;
+      }
+    }).when(MOCK_DFSCLIENT).renewLease();
+
+
+    // Set up a file so that we start renewing our lease.
+    DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
+    long fileId = 123L;
+    renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
+
+    // Wait for lease to get renewed
+    long failTime = Time.monotonicNow() + 5000;
+    while (Time.monotonicNow() < failTime &&
+        leaseRenewalCount.get() == 0) {
+      Thread.sleep(50);
+    }
+    if (leaseRenewalCount.get() == 0) {
+      Assert.fail("Did not renew lease at all!");
+    }
+
+    renewer.closeFile(fileId, MOCK_DFSCLIENT);
+  }
+
+  /**
+   * Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles
+   * to several DFSClients with the same name, the first of which has no files
+   * open. Previously, this was causing the lease to not get renewed.
+   */
+  @Test
+  public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
+    // First DFSClient has no files open so doesn't renew leases.
+    final DFSClient mockClient1 = createMockClient();
+    Mockito.doReturn(false).when(mockClient1).renewLease();
+    assertSame(renewer, LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
+
+    // Set up a file so that we start renewing our lease.
+    DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
+    long fileId = 456L;
+    renewer.put(fileId, mockStream1, mockClient1);
+
+    // Second DFSClient does renew lease
+    final DFSClient mockClient2 = createMockClient();
+    Mockito.doReturn(true).when(mockClient2).renewLease();
+    assertSame(renewer, LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_A, mockClient2));
+
+    // Set up a file so that we start renewing our lease.
+    DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
+    renewer.put(fileId, mockStream2, mockClient2);
+
+
+    // Wait for lease to get renewed
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          Mockito.verify(mockClient1, Mockito.atLeastOnce()).renewLease();
+          Mockito.verify(mockClient2, Mockito.atLeastOnce()).renewLease();
+          return true;
+        } catch (AssertionError err) {
+          LeaseRenewer.LOG.warn("Not yet satisfied", err);
+          return false;
+        } catch (IOException e) {
+          // should not throw!
+          throw new RuntimeException(e);
+        }
+      }
+    }, 100, 10000);
+
+    renewer.closeFile(fileId, mockClient1);
+    renewer.closeFile(fileId, mockClient2);
+  }
+
+  @Test
+  public void testThreadName() throws Exception {
+    DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
+    long fileId = 789L;
+    Assert.assertFalse("Renewer not initially running",
+        renewer.isRunning());
+
+    // Pretend to open a file
+    renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
+
+    Assert.assertTrue("Renewer should have started running",
+        renewer.isRunning());
+
+    // Check the thread name is reasonable
+    String threadName = renewer.getDaemonName();
+    Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName);
+
+    // Pretend to close the file
+    renewer.closeFile(fileId, MOCK_DFSCLIENT);
+    renewer.setEmptyTime(Time.monotonicNow());
+
+    // Should stop the renewer running within a few seconds
+    long failTime = Time.monotonicNow() + 5000;
+    while (renewer.isRunning() && Time.monotonicNow() < failTime) {
+      Thread.sleep(50);
+    }
+    Assert.assertFalse(renewer.isRunning());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java
new file mode 100644
index 0000000..10c1671
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import org.junit.Test;
+
+
+public class TestExtendedBlock {
+  static final String POOL_A = "blockpool-a";
+  static final String POOL_B = "blockpool-b";
+  static final Block BLOCK_1_GS1 = new Block(1L, 100L, 1L);
+  static final Block BLOCK_1_GS2 = new Block(1L, 100L, 2L);
+  static final Block BLOCK_2_GS1 = new Block(2L, 100L, 1L);
+  
+  @Test
+  public void testEquals() {
+    // Same block -> equal
+    assertEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1),
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1));
+    // Different pools, same block id -> not equal
+    assertNotEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1),
+        new ExtendedBlock(POOL_B, BLOCK_1_GS1));
+    // Same pool, different block id -> not equal
+    assertNotEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1),
+        new ExtendedBlock(POOL_A, BLOCK_2_GS1));
+    // Same block, different genstamps -> equal
+    assertEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1),
+        new ExtendedBlock(POOL_A, BLOCK_1_GS2));
+  }
+  
+  @Test
+  public void testHashcode() {
+    
+    // Different pools, same block id -> different hashcode
+    assertNotEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(),
+        new ExtendedBlock(POOL_B, BLOCK_1_GS1).hashCode());
+    
+    // Same pool, different block id -> different hashcode
+    assertNotEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(),
+        new ExtendedBlock(POOL_A, BLOCK_2_GS1).hashCode());
+    
+    // Same block -> same hashcode
+    assertEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(),
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode());
+
+  }
+
+  private static void assertNotEquals(Object a, Object b) {
+    assertFalse("expected not equal: '" + a + "' and '" + b + "'",
+        a.equals(b));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitShm.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitShm.java
new file mode 100644
index 0000000..9d48444
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitShm.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+public class TestShortCircuitShm {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestShortCircuitShm.class);
+  
+  private static final File TEST_BASE =
+      new File(System.getProperty("test.build.data", "/tmp"));
+
+  @Before
+  public void before() {
+    Assume.assumeTrue(null == 
+        SharedFileDescriptorFactory.getLoadingFailureReason());
+  }
+
+  @Test(timeout=60000)
+  public void testStartupShutdown() throws Exception {
+    File path = new File(TEST_BASE, "testStartupShutdown");
+    path.mkdirs();
+    SharedFileDescriptorFactory factory =
+        SharedFileDescriptorFactory.create("shm_",
+            new String[] { path.getAbsolutePath() } );
+    FileInputStream stream =
+        factory.createDescriptor("testStartupShutdown", 4096);
+    ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream);
+    shm.free();
+    stream.close();
+    FileUtil.fullyDelete(path);
+  }
+
+  @Test(timeout=60000)
+  public void testAllocateSlots() throws Exception {
+    File path = new File(TEST_BASE, "testAllocateSlots");
+    path.mkdirs();
+    SharedFileDescriptorFactory factory =
+        SharedFileDescriptorFactory.create("shm_", 
+            new String[] { path.getAbsolutePath() });
+    FileInputStream stream =
+        factory.createDescriptor("testAllocateSlots", 4096);
+    ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream);
+    int numSlots = 0;
+    ArrayList<Slot> slots = new ArrayList<Slot>();
+    while (!shm.isFull()) {
+      Slot slot = shm.allocAndRegisterSlot(new ExtendedBlockId(123L, "test_bp1"));
+      slots.add(slot);
+      numSlots++;
+    }
+    LOG.info("allocated " + numSlots + " slots before running out.");
+    int slotIdx = 0;
+    for (Iterator<Slot> iter = shm.slotIterator();
+        iter.hasNext(); ) {
+      Assert.assertTrue(slots.contains(iter.next()));
+    }
+    for (Slot slot : slots) {
+      Assert.assertFalse(slot.addAnchor());
+      Assert.assertEquals(slotIdx++, slot.getSlotIdx());
+    }
+    for (Slot slot : slots) {
+      slot.makeAnchorable();
+    }
+    for (Slot slot : slots) {
+      Assert.assertTrue(slot.addAnchor());
+    }
+    for (Slot slot : slots) {
+      slot.removeAnchor();
+    }
+    for (Slot slot : slots) {
+      shm.unregisterSlot(slot.getSlotIdx());
+      slot.makeInvalid();
+    }
+    shm.free();
+    stream.close();
+    FileUtil.fullyDelete(path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
new file mode 100644
index 0000000..f5dd883
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.FixedLengthManager;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.ManagerMap;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test {@link ByteArrayManager}.
+ */
+public class TestByteArrayManager {
+  static {
+    GenericTestUtils.setLogLevel(
+        LoggerFactory.getLogger(ByteArrayManager.class), Level.ALL);
+  }
+
+  static final Logger LOG = LoggerFactory.getLogger(TestByteArrayManager.class);
+
+  private static final Comparator<Future<Integer>> CMP = new Comparator<Future<Integer>>() {
+    @Override
+    public int compare(Future<Integer> left, Future<Integer> right) {
+      try {
+        return left.get().intValue() - right.get().intValue();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  };
+
+  @Test
+  public void testCounter() throws Exception {
+    final long countResetTimePeriodMs = 200L;
+    final Counter c = new Counter(countResetTimePeriodMs);
+
+    final int n = ThreadLocalRandom.current().nextInt(512) + 512;
+    final List<Future<Integer>> futures = new ArrayList<Future<Integer>>(n);
+    
+    final ExecutorService pool = Executors.newFixedThreadPool(32);
+    try {
+      // increment
+      for(int i = 0; i < n; i++) {
+        futures.add(pool.submit(new Callable<Integer>() {
+          @Override
+          public Integer call() throws Exception {
+            return (int)c.increment();
+          }
+        }));
+      }
+  
+      // sort and wait for the futures
+      Collections.sort(futures, CMP);
+    } finally {
+      pool.shutdown();
+    }
+
+    // check futures
+    Assert.assertEquals(n, futures.size());
+    for(int i = 0; i < n; i++) {
+      Assert.assertEquals(i + 1, futures.get(i).get().intValue());
+    }
+    Assert.assertEquals(n, c.getCount());
+
+    // test auto-reset
+    Thread.sleep(countResetTimePeriodMs + 100);
+    Assert.assertEquals(1, c.increment());
+  }
+
+  
+
+  @Test
+  public void testAllocateRecycle() throws Exception {
+    final int countThreshold = 4;
+    final int countLimit = 8;
+    final long countResetTimePeriodMs = 200L;
+    final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
+        new ByteArrayManager.Conf(
+            countThreshold, countLimit, countResetTimePeriodMs));
+    
+    final CounterMap counters = bam.getCounters();
+    final ManagerMap managers = bam.getManagers();
+    
+    final int[] uncommonArrays = {0, 1, 2, 4, 8, 16, 32, 64};
+    final int arrayLength = 1024;
+
+
+    final Allocator allocator = new Allocator(bam);
+    final Recycler recycler = new Recycler(bam);
+    try {
+      { // allocate within threshold
+        for(int i = 0; i < countThreshold; i++) {
+          allocator.submit(arrayLength);
+        }        
+        waitForAll(allocator.futures);
+  
+        Assert.assertEquals(countThreshold,
+            counters.get(arrayLength, false).getCount());
+        Assert.assertNull(managers.get(arrayLength, false));
+        for(int n : uncommonArrays) {
+          Assert.assertNull(counters.get(n, false));
+          Assert.assertNull(managers.get(n, false));
+        }
+      }
+
+      { // recycle half of the arrays
+        for(int i = 0; i < countThreshold/2; i++) {
+          recycler.submit(removeLast(allocator.futures).get());
+        }
+
+        for(Future<Integer> f : recycler.furtures) {
+          Assert.assertEquals(-1, f.get().intValue());
+        }
+        recycler.furtures.clear();
+      }
+
+      { // allocate one more
+        allocator.submit(arrayLength).get();
+
+        Assert.assertEquals(countThreshold + 1, counters.get(arrayLength, false).getCount());
+        Assert.assertNotNull(managers.get(arrayLength, false));
+      }
+
+      { // recycle the remaining arrays
+        final int n = allocator.recycleAll(recycler);
+
+        recycler.verify(n);
+      }
+        
+      {
+        // allocate until the maximum.
+        for(int i = 0; i < countLimit; i++) {
+          allocator.submit(arrayLength);
+        }
+        waitForAll(allocator.futures);
+
+        // allocate one more should be blocked
+        final AllocatorThread t = new AllocatorThread(arrayLength, bam);
+        t.start();
+        
+        // check if the thread is waiting, timed wait or runnable.
+        for(int i = 0; i < 5; i++) {
+          Thread.sleep(100);
+          final Thread.State threadState = t.getState();
+          if (threadState != Thread.State.RUNNABLE
+              && threadState != Thread.State.WAITING
+              && threadState != Thread.State.TIMED_WAITING) {
+            Assert.fail("threadState = " + threadState);
+          }
+        }
+
+        // recycle an array
+        recycler.submit(removeLast(allocator.futures).get());
+        Assert.assertEquals(1, removeLast(recycler.furtures).get().intValue());
+
+        // check if the thread is unblocked
+        Thread.sleep(100);
+        Assert.assertEquals(Thread.State.TERMINATED, t.getState());
+            
+        // recycle the remaining, the recycle should be full.
+        Assert.assertEquals(countLimit-1, allocator.recycleAll(recycler));
+        recycler.submit(t.array);
+        recycler.verify(countLimit);
+
+        // recycle one more; it should not increase the free queue size
+        Assert.assertEquals(countLimit, bam.release(new byte[arrayLength]));
+      }
+    } finally {
+      allocator.pool.shutdown();
+      recycler.pool.shutdown();
+    }
+  }
+
+  static <T> Future<T> removeLast(List<Future<T>> furtures) throws Exception {
+    return remove(furtures, furtures.size() - 1);
+  }
+  static <T> Future<T> remove(List<Future<T>> furtures, int i) throws Exception {
+    return furtures.isEmpty()? null: furtures.remove(i);
+  }
+  
+  static <T> void waitForAll(List<Future<T>> furtures) throws Exception {
+    for(Future<T> f : furtures) {
+      f.get();
+    }
+  }
+
+  static class AllocatorThread extends Thread {
+    private final ByteArrayManager bam;
+    private final int arrayLength;
+    private byte[] array;
+    
+    AllocatorThread(int arrayLength, ByteArrayManager bam) {
+      this.bam = bam;
+      this.arrayLength = arrayLength;
+    }
+
+    @Override
+    public void run() {
+      try {
+        array = bam.newByteArray(arrayLength);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  static class Allocator {
+    private final ByteArrayManager bam;
+    final ExecutorService pool = Executors.newFixedThreadPool(8);
+    final List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>();
+
+    Allocator(ByteArrayManager bam) {
+      this.bam = bam;
+    }
+    
+    Future<byte[]> submit(final int arrayLength) {
+      final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
+        @Override
+        public byte[] call() throws Exception {
+          final byte[] array = bam.newByteArray(arrayLength);
+          Assert.assertEquals(arrayLength, array.length);
+          return array;
+        }
+      });
+      futures.add(f);
+      return f;
+    }
+    
+    int recycleAll(Recycler recycler) throws Exception {
+      final int n = futures.size();
+      for(Future<byte[]> f : futures) {
+        recycler.submit(f.get());
+      }
+      futures.clear();
+      return n;
+    }
+  }
+
+  static class Recycler {
+    private final ByteArrayManager bam;
+    final ExecutorService pool = Executors.newFixedThreadPool(8);
+    final List<Future<Integer>> furtures = new LinkedList<Future<Integer>>();
+
+    Recycler(ByteArrayManager bam) {
+      this.bam = bam;
+    }
+
+    Future<Integer> submit(final byte[] array) {
+      final Future<Integer> f = pool.submit(new Callable<Integer>() {
+        @Override
+        public Integer call() throws Exception {
+          return bam.release(array);
+        }
+      });
+      furtures.add(f);
+      return f;
+    }
+
+    void verify(final int expectedSize) throws Exception {
+      Assert.assertEquals(expectedSize, furtures.size());
+      Collections.sort(furtures, CMP);
+      for(int i = 0; i < furtures.size(); i++) {
+        Assert.assertEquals(i+1, furtures.get(i).get().intValue());
+      }
+      furtures.clear();
+    }
+  }
+
+
+  @Test
+  public void testByteArrayManager() throws Exception {
+    final int countThreshold = 32;
+    final int countLimit = 64;
+    final long countResetTimePeriodMs = 1000L;
+    final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
+        new ByteArrayManager.Conf(
+            countThreshold, countLimit, countResetTimePeriodMs));
+  
+    final CounterMap counters = bam.getCounters();
+    final ManagerMap managers = bam.getManagers();
+
+    final ExecutorService pool = Executors.newFixedThreadPool(128);
+    
+    final Runner[] runners = new Runner[Runner.NUM_RUNNERS];
+    final Thread[] threads = new Thread[runners.length];
+
+    final int num = 1 << 10;
+    for(int i = 0; i < runners.length; i++) {
+      runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam);
+      threads[i] = runners[i].start(num);
+    }
+    
+    final List<Exception> exceptions = new ArrayList<Exception>();
+    final Thread randomRecycler = new Thread() {
+      @Override
+      public void run() {
+        LOG.info("randomRecycler start");
+        for(int i = 0; shouldRun(); i++) {
+          final int j = ThreadLocalRandom.current().nextInt(runners.length);
+          try {
+            runners[j].recycle();
+          } catch (Exception e) {
+            e.printStackTrace();
+            exceptions.add(new Exception(this + " has an exception", e));
+          }
+
+          if ((i & 0xFF) == 0) {
+            LOG.info("randomRecycler sleep, i=" + i);
+            sleepMs(100);
+          }
+        }
+        LOG.info("randomRecycler done");
+      }
+      
+      boolean shouldRun() {
+        for(int i = 0; i < runners.length; i++) {
+          if (threads[i].isAlive()) {
+            return true;
+          }
+          if (!runners[i].isEmpty()) {
+            return true;
+          }
+        }
+        return false;
+      }
+    };
+    randomRecycler.start();
+    
+    randomRecycler.join();
+    Assert.assertTrue(exceptions.isEmpty());
+
+    Assert.assertNull(counters.get(0, false));
+    for(int i = 1; i < runners.length; i++) {
+      if (!runners[i].assertionErrors.isEmpty()) {
+        for(AssertionError e : runners[i].assertionErrors) {
+          LOG.error("AssertionError " + i, e);
+        }
+        Assert.fail(runners[i].assertionErrors.size() + " AssertionError(s)");
+      }
+      
+      final int arrayLength = Runner.index2arrayLength(i);
+      final boolean exceedCountThreshold = counters.get(arrayLength, false).getCount() > countThreshold; 
+      final FixedLengthManager m = managers.get(arrayLength, false);
+      if (exceedCountThreshold) {
+        Assert.assertNotNull(m);
+      } else {
+        Assert.assertNull(m);
+      }
+    }
+  }
+
+  static void sleepMs(long ms) {
+    try {
+      Thread.sleep(ms);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail("Sleep is interrupted: " + e);
+    }
+  }
+
+  static class Runner implements Runnable {
+    static final int NUM_RUNNERS = 5;
+
+    static int index2arrayLength(int index) {
+      return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1);
+    }
+
+    private final ByteArrayManager bam;
+    final int maxArrayLength;
+    final int countThreshold;
+    final int maxArrays;
+    final ExecutorService pool;
+    final List<Future<byte[]>> arrays = new ArrayList<Future<byte[]>>();
+
+    final AtomicInteger count = new AtomicInteger();
+    final int p;
+    private int n;
+    
+    final List<AssertionError> assertionErrors = new ArrayList<AssertionError>();
+
+    Runner(int index, int countThreshold, int maxArrays,
+        ExecutorService pool, int p, ByteArrayManager bam) {
+      this.maxArrayLength = index2arrayLength(index);
+      this.countThreshold = countThreshold;
+      this.maxArrays = maxArrays;
+      this.pool = pool;
+      this.p = p;
+      this.bam = bam;
+    }
+
+    boolean isEmpty() {
+      synchronized (arrays) {
+        return arrays.isEmpty();
+      }
+    }
+ 
+    Future<byte[]> submitAllocate() {
+      count.incrementAndGet();
+
+      final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
+        @Override
+        public byte[] call() throws Exception {
+          final int lower = maxArrayLength == ByteArrayManager.MIN_ARRAY_LENGTH?
+              0: maxArrayLength >> 1;
+          final int arrayLength = ThreadLocalRandom.current().nextInt(
+              maxArrayLength - lower) + lower + 1;
+          final byte[] array = bam.newByteArray(arrayLength);
+          try {
+            Assert.assertEquals("arrayLength=" + arrayLength + ", lower=" + lower,
+                maxArrayLength, array.length);
+          } catch(AssertionError e) {
+            assertionErrors.add(e);
+          }
+          return array;
+        }
+      });
+      synchronized (arrays) {
+        arrays.add(f);
+      }
+      return f;
+    }
+
+    Future<byte[]> removeFirst() throws Exception {
+      synchronized (arrays) {
+        return remove(arrays, 0);
+      }
+    }
+
+    void recycle() throws Exception {
+      final Future<byte[]> f = removeFirst();
+      if (f != null) {
+        printf("randomRecycler: ");
+        try {
+          recycle(f.get(10, TimeUnit.MILLISECONDS));
+        } catch(TimeoutException e) {
+          recycle(new byte[maxArrayLength]);
+          printf("timeout, new byte[%d]\n", maxArrayLength);
+        }
+      }
+    }
+
+    int recycle(final byte[] array) {
+      return bam.release(array);
+    }
+
+    Future<Integer> submitRecycle(final byte[] array) {
+      count.decrementAndGet();
+
+      final Future<Integer> f = pool.submit(new Callable<Integer>() {
+        @Override
+        public Integer call() throws Exception {
+          return recycle(array);
+        }
+      });
+      return f;
+    }
+
+    @Override
+    public void run() {
+      for(int i = 0; i < n; i++) {
+        final boolean isAllocate = ThreadLocalRandom.current()
+            .nextInt(NUM_RUNNERS) < p;
+        if (isAllocate) {
+          submitAllocate();
+        } else {
+          try {
+            final Future<byte[]> f = removeFirst();
+            if (f != null) {
+              submitRecycle(f.get());
+            }
+          } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(this + " has " + e);
+          }
+        }
+
+        if ((i & 0xFF) == 0) {
+          sleepMs(100);
+        }
+      }
+    }
+    
+    Thread start(int n) {
+      this.n = n;
+      final Thread t = new Thread(this);
+      t.start();
+      return t;
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + ": max=" + maxArrayLength
+          + ", count=" + count;
+    }
+  }
+
+  static class NewByteArrayWithLimit extends ByteArrayManager {
+    private final int maxCount;
+    private int count = 0;
+    
+    NewByteArrayWithLimit(int maxCount) {
+      this.maxCount = maxCount;
+    }
+
+    @Override
+    public synchronized byte[] newByteArray(int size) throws InterruptedException {
+      for(; count >= maxCount; ) {
+        wait();
+      }
+      count++;
+      return new byte[size];
+    }
+    
+    @Override
+    public synchronized int release(byte[] array) {
+      if (count == maxCount) {
+        notifyAll();
+      }
+      count--;
+      return 0;
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    GenericTestUtils.setLogLevel(LoggerFactory.getLogger(ByteArrayManager.class),
+                                 Level.OFF);
+    final int arrayLength = 64 * 1024; //64k
+    final int nThreads = 512;
+    final int nAllocations = 1 << 15;
+    final int maxArrays = 1 << 10;
+    final int nTrials = 5;
+
+    System.out.println("arrayLength=" + arrayLength
+        + ", nThreads=" + nThreads
+        + ", nAllocations=" + nAllocations
+        + ", maxArrays=" + maxArrays);
+    
+    final ByteArrayManager[] impls = {
+        new ByteArrayManager.NewByteArrayWithoutLimit(),
+        new NewByteArrayWithLimit(maxArrays),
+        new ByteArrayManager.Impl(new ByteArrayManager.Conf(
+            HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT,
+            maxArrays,
+            HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT))
+    };
+    final double[] avg = new double[impls.length];
+
+    for(int i = 0; i < impls.length; i++) {
+      double duration = 0;
+      printf("%26s:", impls[i].getClass().getSimpleName());
+      for(int j = 0; j < nTrials; j++) {
+        final int[] sleepTime = new int[nAllocations];
+        for(int k = 0; k < sleepTime.length; k++) {
+          sleepTime[k] = ThreadLocalRandom.current().nextInt(100);
+        }
+      
+        final long elapsed = performanceTest(arrayLength, maxArrays, nThreads,
+            sleepTime, impls[i]);
+        duration += elapsed;
+        printf("%5d, ", elapsed);
+      }
+      avg[i] = duration/nTrials;
+      printf("avg=%6.3fs", avg[i]/1000);
+      for(int j = 0; j < i; j++) {
+        printf(" (%6.2f%%)", percentageDiff(avg[j], avg[i]));
+      }
+      printf("\n");
+    }
+  }
+  
+  static double percentageDiff(double original, double newValue) {
+    return (newValue - original)/original*100;
+  }
+  
+  static void printf(String format, Object... args) {
+    System.out.printf(format, args);
+    System.out.flush();
+  }
+  
+  static long performanceTest(final int arrayLength, final int maxArrays,
+      final int nThreads, final int[] sleepTimeMSs, final ByteArrayManager impl)
+          throws Exception {
+    final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
+    final List<Future<Void>> futures = new ArrayList<Future<Void>>(sleepTimeMSs.length);
+    final long startTime = Time.monotonicNow();
+
+    for(int i = 0; i < sleepTimeMSs.length; i++) {
+      final long sleepTime = sleepTimeMSs[i];
+      futures.add(pool.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          byte[] array = impl.newByteArray(arrayLength);
+          sleepMs(sleepTime);
+          impl.release(array);
+          return null;
+        }
+      }));
+    }
+    for(Future<Void> f : futures) {
+      f.get();
+    }
+
+    final long endTime = Time.monotonicNow();
+    pool.shutdown();
+    return endTime - startTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java
new file mode 100644
index 0000000..02787be
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Test;
+
+public class TestExactSizeInputStream {
+  @Test
+  public void testBasicsReadSingle() throws IOException {
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("hello"), 3);
+    assertEquals(3, s.available());
+    
+    assertEquals((int)'h', s.read());
+    assertEquals((int)'e', s.read());
+    assertEquals((int)'l', s.read());
+    assertEquals(-1, s.read());
+    assertEquals(0, s.available());
+  }
+  
+  @Test
+  public void testBasicsReadArray() throws IOException {
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("hello"), 3);
+    assertEquals(3, s.available());
+    
+    byte[] buf = new byte[10];
+    
+    assertEquals(2, s.read(buf, 0, 2));
+    assertEquals('h', buf[0]);
+    assertEquals('e', buf[1]);
+    
+    assertEquals(1, s.read(buf, 0, 2));
+    assertEquals('l', buf[0]);
+    
+    assertEquals(-1, s.read(buf, 0, 2));
+  }
+  
+  @Test
+  public void testBasicsSkip() throws IOException {
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("hello"), 3);
+    assertEquals(3, s.available());
+    
+    assertEquals(2, s.skip(2));
+    assertEquals(1, s.skip(2));
+    assertEquals(0, s.skip(2));
+  }
+  
+  @Test
+  public void testReadNotEnough() throws IOException {
+    // Ask for 5 bytes, only has 2
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+    assertEquals(2, s.available());
+    
+    assertEquals((int)'h', s.read());
+    assertEquals((int)'e', s.read());
+    try {
+      s.read();
+      fail("Read when should be out of data");
+    } catch (EOFException e) {
+      // expected
+    }
+  }
+  
+  @Test
+  public void testSkipNotEnough() throws IOException {
+    // Ask for 5 bytes, only has 2
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+    assertEquals(2, s.skip(3));
+    try {
+      s.skip(1);
+      fail("Skip when should be out of data");
+    } catch (EOFException e) {
+      // expected
+    }
+  }
+  
+  @Test
+  public void testReadArrayNotEnough() throws IOException {
+    // Ask for 5 bytes, only has 2
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+    byte[] buf = new byte[10];
+    assertEquals(2, s.read(buf, 0, 5));
+    try {
+      s.read(buf, 2, 3);
+      fail("Read buf when should be out of data");
+    } catch (EOFException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testMark() throws IOException {
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+    assertFalse(s.markSupported());
+    try {
+      s.mark(1);
+      fail("Mark should not succeed");
+    } catch (UnsupportedOperationException uoe) {
+      // expected
+    }
+  }
+  
+  private static InputStream byteStream(String data) {
+    return new ByteArrayInputStream(data.getBytes());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
new file mode 100644
index 0000000..7f1f00f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import com.google.common.net.HttpHeaders;
+import org.apache.hadoop.hdfs.web.ByteRangeInputStream.InputStreamAndFileLength;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+
+public class TestByteRangeInputStream {
+  private class ByteRangeInputStreamImpl extends ByteRangeInputStream {
+    public ByteRangeInputStreamImpl(URLOpener o, URLOpener r)
+        throws IOException {
+      super(o, r);
+    }
+
+    @Override
+    protected URL getResolvedUrl(HttpURLConnection connection)
+        throws IOException {
+      return new URL("http://resolvedurl/");
+    }
+  }
+
+  private ByteRangeInputStream.URLOpener getMockURLOpener(URL url)
+      throws IOException {
+    ByteRangeInputStream.URLOpener opener =
+        mock(ByteRangeInputStream.URLOpener.class, CALLS_REAL_METHODS);
+    opener.setURL(url);
+    doReturn(getMockConnection("65535"))
+        .when(opener).connect(anyLong(), anyBoolean());
+    return opener;
+  }
+
+  private HttpURLConnection getMockConnection(String length)
+      throws IOException {
+    HttpURLConnection mockConnection = mock(HttpURLConnection.class);
+    doReturn(new ByteArrayInputStream("asdf".getBytes()))
+        .when(mockConnection).getInputStream();
+    doReturn(length).when(mockConnection)
+        .getHeaderField(HttpHeaders.CONTENT_LENGTH);
+    return mockConnection;
+  }
+
+  @Test
+  public void testByteRange() throws IOException {
+    ByteRangeInputStream.URLOpener oMock = getMockURLOpener(
+        new URL("http://test"));
+    ByteRangeInputStream.URLOpener rMock = getMockURLOpener(null);
+    ByteRangeInputStream bris = new ByteRangeInputStreamImpl(oMock, rMock);
+
+    bris.seek(0);
+
+    assertEquals("getPos wrong", 0, bris.getPos());
+
+    bris.read();
+
+    assertEquals("Initial call made incorrectly (offset check)",
+        0, bris.startPos);
+    assertEquals("getPos should return 1 after reading one byte", 1,
+        bris.getPos());
+    verify(oMock, times(1)).connect(0, false);
+
+    bris.read();
+
+    assertEquals("getPos should return 2 after reading two bytes", 2,
+        bris.getPos());
+    // No additional connections should have been made (no seek)
+    verify(oMock, times(1)).connect(0, false);
+
+    rMock.setURL(new URL("http://resolvedurl/"));
+
+    bris.seek(100);
+    bris.read();
+
+    assertEquals("Seek to 100 bytes made incorrectly (offset Check)",
+        100, bris.startPos);
+    assertEquals("getPos should return 101 after reading one byte", 101,
+        bris.getPos());
+    verify(rMock, times(1)).connect(100, true);
+
+    bris.seek(101);
+    bris.read();
+
+    // Seek to 101 should not result in another request
+    verify(rMock, times(1)).connect(100, true);
+    verify(rMock, times(0)).connect(101, true);
+
+    bris.seek(2500);
+    bris.read();
+
+    assertEquals("Seek to 2500 bytes made incorrectly (offset Check)",
+        2500, bris.startPos);
+
+    doReturn(getMockConnection(null))
+        .when(rMock).connect(anyLong(), anyBoolean());
+    bris.seek(500);
+    try {
+      bris.read();
+      fail("Exception should be thrown when content-length is not given");
+    } catch (IOException e) {
+      assertTrue("Incorrect response message: " + e.getMessage(),
+          e.getMessage().startsWith(HttpHeaders.CONTENT_LENGTH +
+                                    " is missing: "));
+    }
+    bris.close();
+  }
+
+  @Test
+  public void testPropagatedClose() throws IOException {
+    ByteRangeInputStream bris =
+        mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
+    InputStreamAndFileLength mockStream = new InputStreamAndFileLength(1L,
+        mock(InputStream.class));
+    doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
+    Whitebox.setInternalState(bris, "status",
+                              ByteRangeInputStream.StreamStatus.SEEK);
+
+    int brisOpens = 0;
+    int brisCloses = 0;
+    int isCloses = 0;
+
+    // first open, shouldn't close underlying stream
+    bris.getInputStream();
+    verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
+    verify(bris, times(brisCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
+
+    // stream is open, shouldn't close underlying stream
+    bris.getInputStream();
+    verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
+    verify(bris, times(brisCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
+
+    // seek forces a reopen, should close underlying stream
+    bris.seek(1);
+    bris.getInputStream();
+    verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
+    verify(bris, times(brisCloses)).close();
+    verify(mockStream.in, times(++isCloses)).close();
+
+    // verify that the underlying stream isn't closed after a seek
+    // ie. the state was correctly updated
+    bris.getInputStream();
+    verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
+    verify(bris, times(brisCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
+
+    // seeking to same location should be a no-op
+    bris.seek(1);
+    bris.getInputStream();
+    verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
+    verify(bris, times(brisCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
+
+    // close should of course close
+    bris.close();
+    verify(bris, times(++brisCloses)).close();
+    verify(mockStream.in, times(++isCloses)).close();
+
+    // it's already closed, underlying stream should not close
+    bris.close();
+    verify(bris, times(++brisCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
+
+    // it's closed, don't reopen it
+    boolean errored = false;
+    try {
+      bris.getInputStream();
+    } catch (IOException e) {
+      errored = true;
+      assertEquals("Stream closed", e.getMessage());
+    } finally {
+      assertTrue("Read a closed steam", errored);
+    }
+    verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
+    verify(bris, times(brisCloses)).close();
+
+    verify(mockStream.in, times(isCloses)).close();
+  }
+
+
+  @Test
+  public void testAvailable() throws IOException {
+    ByteRangeInputStream bris =
+            mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
+    InputStreamAndFileLength mockStream = new InputStreamAndFileLength(65535L,
+            mock(InputStream.class));
+    doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
+    Whitebox.setInternalState(bris, "status",
+            ByteRangeInputStream.StreamStatus.SEEK);
+
+
+    assertEquals("Before read or seek, available should be same as filelength",
+            65535, bris.available());
+    verify(bris, times(1)).openInputStream(Mockito.anyLong());
+
+    bris.seek(10);
+    assertEquals("Seek 10 bytes, available should return filelength - 10"
+            , 65525,
+            bris.available());
+
+    //no more bytes available
+    bris.seek(65535);
+    assertEquals("Seek till end of file, available should return 0 bytes", 0,
+            bris.available());
+
+    //test reads, seek back to 0 and start reading
+    bris.seek(0);
+    bris.read();
+    assertEquals("Read 1 byte, available must return  filelength - 1",
+            65534, bris.available());
+
+    bris.read();
+    assertEquals("Read another 1 byte, available must return  filelength - 2",
+            65533, bris.available());
+
+    //seek and read
+    bris.seek(100);
+    bris.read();
+    assertEquals("Seek to offset 100 and read 1 byte, available should return filelength - 101",
+            65434, bris.available());
+    bris.close();
+  }
+
+  @Test
+  public void testAvailableLengthNotKnown() throws IOException {
+    ByteRangeInputStream bris =
+            mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
+    //Length is null for chunked transfer-encoding
+    InputStreamAndFileLength mockStream = new InputStreamAndFileLength(null,
+            mock(InputStream.class));
+    doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
+    Whitebox.setInternalState(bris, "status",
+            ByteRangeInputStream.StreamStatus.SEEK);
+
+    assertEquals(Integer.MAX_VALUE, bris.available());
+  }
+
+  @Test
+  public void testAvailableStreamClosed() throws IOException {
+    ByteRangeInputStream bris =
+            mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
+    InputStreamAndFileLength mockStream = new InputStreamAndFileLength(null,
+            mock(InputStream.class));
+    doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
+    Whitebox.setInternalState(bris, "status",
+            ByteRangeInputStream.StreamStatus.SEEK);
+
+    bris.close();
+    try{
+      bris.available();
+      fail("Exception should be thrown when stream is closed");
+    }catch(IOException e){
+      assertTrue("Exception when stream is closed",
+              e.getMessage().equals("Stream closed"));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65f53f24/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java
new file mode 100644
index 0000000..1a95fc8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.URL;
+
+import org.junit.Test;
+
+public class TestOffsetUrlInputStream {
+  @Test
+  public void testRemoveOffset() throws IOException {
+    { //no offset
+      String s = "http://test/Abc?Length=99";
+      assertEquals(s, WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //no parameters
+      String s = "http://test/Abc";
+      assertEquals(s, WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as first parameter
+      String s = "http://test/Abc?offset=10&Length=99";
+      assertEquals("http://test/Abc?Length=99",
+          WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as second parameter
+      String s = "http://test/Abc?op=read&OFFset=10&Length=99";
+      assertEquals("http://test/Abc?op=read&Length=99",
+          WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as last parameter
+      String s = "http://test/Abc?Length=99&offset=10";
+      assertEquals("http://test/Abc?Length=99",
+          WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as the only parameter
+      String s = "http://test/Abc?offset=10";
+      assertEquals("http://test/Abc",
+          WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
+    }
+  }
+}


Mime
View raw message