hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [22/37] hadoop git commit: HDFS-13215. RBF: Move Router to its own module.
Date Tue, 20 Mar 2018 06:35:59 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/73be4292/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java
new file mode 100644
index 0000000..cb50515
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver.extractTempFileName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the multiple destination resolver.
+ */
+public class TestMultipleDestinationResolver {
+
+  private MultipleDestinationMountTableResolver resolver;
+
+  @Before
+  public void setup() throws IOException {
+    Configuration conf = new Configuration();
+    resolver = new MultipleDestinationMountTableResolver(conf, null);
+
+    // We manually point /tmp to only subcluster0
+    Map<String, String> map1 = new HashMap<>();
+    map1.put("subcluster0", "/tmp");
+    resolver.addEntry(MountTable.newInstance("/tmp", map1));
+
+    // We manually point / to subcluster0,1,2 with default order (hash)
+    Map<String, String> mapDefault = new HashMap<>();
+    mapDefault.put("subcluster0", "/");
+    mapDefault.put("subcluster1", "/");
+    mapDefault.put("subcluster2", "/");
+    MountTable defaultEntry = MountTable.newInstance("/", mapDefault);
+    resolver.addEntry(defaultEntry);
+
+    // We manually point /hash to subcluster0,1,2 with hashing
+    Map<String, String> mapHash = new HashMap<>();
+    mapHash.put("subcluster0", "/hash");
+    mapHash.put("subcluster1", "/hash");
+    mapHash.put("subcluster2", "/hash");
+    MountTable hashEntry = MountTable.newInstance("/hash", mapHash);
+    hashEntry.setDestOrder(DestinationOrder.HASH);
+    resolver.addEntry(hashEntry);
+
+    // We manually point /hashall to subcluster0,1,2 with hashing (full tree)
+    Map<String, String> mapHashAll = new HashMap<>();
+    mapHashAll.put("subcluster0", "/hashall");
+    mapHashAll.put("subcluster1", "/hashall");
+    mapHashAll.put("subcluster2", "/hashall");
+    MountTable hashEntryAll = MountTable.newInstance("/hashall", mapHashAll);
+    hashEntryAll.setDestOrder(DestinationOrder.HASH_ALL);
+    resolver.addEntry(hashEntryAll);
+
+    // We point /local to subclusters 0, 1, 2 with the local order
+    Map<String, String> mapLocal = new HashMap<>();
+    mapLocal.put("subcluster0", "/local");
+    mapLocal.put("subcluster1", "/local");
+    mapLocal.put("subcluster2", "/local");
+    MountTable localEntry = MountTable.newInstance("/local", mapLocal);
+    localEntry.setDestOrder(DestinationOrder.LOCAL);
+    resolver.addEntry(localEntry);
+
+    // We point /random to subclusters 0, 1, 2 with the random order
+    Map<String, String> mapRandom = new HashMap<>();
+    mapRandom.put("subcluster0", "/random");
+    mapRandom.put("subcluster1", "/random");
+    mapRandom.put("subcluster2", "/random");
+    MountTable randomEntry = MountTable.newInstance("/random", mapRandom);
+    randomEntry.setDestOrder(DestinationOrder.RANDOM);
+    resolver.addEntry(randomEntry);
+
+    // Read only mount point
+    Map<String, String> mapReadOnly = new HashMap<>();
+    mapReadOnly.put("subcluster0", "/readonly");
+    mapReadOnly.put("subcluster1", "/readonly");
+    mapReadOnly.put("subcluster2", "/readonly");
+    MountTable readOnlyEntry = MountTable.newInstance("/readonly", mapReadOnly);
+    readOnlyEntry.setReadOnly(true);
+    resolver.addEntry(readOnlyEntry);
+  }
+
+  @Test
+  public void testHashEqualDistribution() throws IOException {
+    // First level
+    testEvenDistribution("/hash");
+    testEvenDistribution("/hash/folder0", false);
+
+    // All levels
+    testEvenDistribution("/hashall");
+    testEvenDistribution("/hashall/folder0");
+  }
+
+  @Test
+  public void testHashAll() throws IOException {
+    // Files should be spread across subclusters
+    PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt");
+    assertDest("subcluster0", dest0);
+    PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt");
+    assertDest("subcluster1", dest1);
+
+    // Files within folder should be spread across subclusters
+    PathLocation dest2 = resolver.getDestinationForPath("/hashall/folder0");
+    assertDest("subcluster2", dest2);
+    PathLocation dest3 = resolver.getDestinationForPath(
+        "/hashall/folder0/file0.txt");
+    assertDest("subcluster1", dest3);
+    PathLocation dest4 = resolver.getDestinationForPath(
+        "/hashall/folder0/file1.txt");
+    assertDest("subcluster0", dest4);
+
+    PathLocation dest5 = resolver.getDestinationForPath(
+        "/hashall/folder0/folder0/file0.txt");
+    assertDest("subcluster1", dest5);
+    PathLocation dest6 = resolver.getDestinationForPath(
+        "/hashall/folder0/folder0/file1.txt");
+    assertDest("subcluster1", dest6);
+    PathLocation dest7 = resolver.getDestinationForPath(
+        "/hashall/folder0/folder0/file2.txt");
+    assertDest("subcluster0", dest7);
+
+    PathLocation dest8 = resolver.getDestinationForPath("/hashall/folder1");
+    assertDest("subcluster1", dest8);
+    PathLocation dest9 = resolver.getDestinationForPath(
+        "/hashall/folder1/file0.txt");
+    assertDest("subcluster0", dest9);
+    PathLocation dest10 = resolver.getDestinationForPath(
+        "/hashall/folder1/file1.txt");
+    assertDest("subcluster1", dest10);
+
+    PathLocation dest11 = resolver.getDestinationForPath("/hashall/folder2");
+    assertDest("subcluster2", dest11);
+    PathLocation dest12 = resolver.getDestinationForPath(
+        "/hashall/folder2/file0.txt");
+    assertDest("subcluster0", dest12);
+    PathLocation dest13 = resolver.getDestinationForPath(
+        "/hashall/folder2/file1.txt");
+    assertDest("subcluster0", dest13);
+    PathLocation dest14 = resolver.getDestinationForPath(
+        "/hashall/folder2/file2.txt");
+    assertDest("subcluster1", dest14);
+  }
+
+  @Test
+  public void testHashFirst() throws IOException {
+    PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt");
+    assertDest("subcluster0", dest0);
+    PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt");
+    assertDest("subcluster1", dest1);
+
+    // All these must be in the same location: subcluster0
+    PathLocation dest2 = resolver.getDestinationForPath("/hash/folder0");
+    assertDest("subcluster0", dest2);
+    PathLocation dest3 = resolver.getDestinationForPath(
+        "/hash/folder0/file0.txt");
+    assertDest("subcluster0", dest3);
+    PathLocation dest4 = resolver.getDestinationForPath(
+        "/hash/folder0/file1.txt");
+    assertDest("subcluster0", dest4);
+
+    PathLocation dest5 = resolver.getDestinationForPath(
+        "/hash/folder0/folder0/file0.txt");
+    assertDest("subcluster0", dest5);
+    PathLocation dest6 = resolver.getDestinationForPath(
+        "/hash/folder0/folder0/file1.txt");
+    assertDest("subcluster0", dest6);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest7 = resolver.getDestinationForPath("/hash/folder1");
+    assertDest("subcluster2", dest7);
+    PathLocation dest8 = resolver.getDestinationForPath(
+        "/hash/folder1/file0.txt");
+    assertDest("subcluster2", dest8);
+    PathLocation dest9 = resolver.getDestinationForPath(
+        "/hash/folder1/file1.txt");
+    assertDest("subcluster2", dest9);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest10 = resolver.getDestinationForPath("/hash/folder2");
+    assertDest("subcluster2", dest10);
+    PathLocation dest11 = resolver.getDestinationForPath(
+        "/hash/folder2/file0.txt");
+    assertDest("subcluster2", dest11);
+    PathLocation dest12 = resolver.getDestinationForPath(
+        "/hash/folder2/file1.txt");
+    assertDest("subcluster2", dest12);
+  }
+
+  @Test
+  public void testRandomEqualDistribution() throws IOException {
+    testEvenDistribution("/random");
+  }
+
+  @Test
+  public void testSingleDestination() throws IOException {
+    // All the files in /tmp should be in subcluster0
+    for (int f = 0; f < 100; f++) {
+      String filename = "/tmp/b/c/file" + f + ".txt";
+      PathLocation destination = resolver.getDestinationForPath(filename);
+      RemoteLocation loc = destination.getDefaultLocation();
+      assertEquals("subcluster0", loc.getNameserviceId());
+      assertEquals(filename, loc.getDest());
+    }
+  }
+
+  @Test
+  public void testResolveSubdirectories() throws Exception {
+    // Simulate a testdir under a multi-destination mount.
+    Random r = new Random();
+    String testDir = "/sort/testdir" + r.nextInt();
+    String file1 = testDir + "/file1" + r.nextInt();
+    String file2 = testDir + "/file2" + r.nextInt();
+
+    // Verify both files resolve to the same namespace as the parent dir.
+    PathLocation testDirLocation = resolver.getDestinationForPath(testDir);
+    RemoteLocation defaultLoc = testDirLocation.getDefaultLocation();
+    String testDirNamespace = defaultLoc.getNameserviceId();
+
+    PathLocation file1Location = resolver.getDestinationForPath(file1);
+    RemoteLocation defaultLoc1 = file1Location.getDefaultLocation();
+    assertEquals(testDirNamespace, defaultLoc1.getNameserviceId());
+
+    PathLocation file2Location = resolver.getDestinationForPath(file2);
+    RemoteLocation defaultLoc2 = file2Location.getDefaultLocation();
+    assertEquals(testDirNamespace, defaultLoc2.getNameserviceId());
+  }
+
+  @Test
+  public void testExtractTempFileName() {
+    for (String teststring : new String[] {
+        "testfile1.txt.COPYING",
+        "testfile1.txt._COPYING_",
+        "testfile1.txt._COPYING_.attempt_1486662804109_0055_m_000042_0",
+        "testfile1.txt.tmp",
+        "_temp/testfile1.txt",
+        "_temporary/testfile1.txt.af77e2ab-4bc5-4959-ae08-299c880ee6b8",
+        "_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" +
+          "testfile1.txt" }) {
+      String finalName = extractTempFileName(teststring);
+      assertEquals("testfile1.txt", finalName);
+    }
+
+    // False cases
+    assertEquals(
+        "file1.txt.COPYING1", extractTempFileName("file1.txt.COPYING1"));
+    assertEquals("file1.txt.tmp2", extractTempFileName("file1.txt.tmp2"));
+
+    // Speculation patterns
+    String finalName = extractTempFileName(
+        "_temporary/part-00007.af77e2ab-4bc5-4959-ae08-299c880ee6b8");
+    assertEquals("part-00007", finalName);
+    finalName = extractTempFileName(
+        "_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" +
+          "part-00003");
+    assertEquals("part-00003", finalName);
+
+    // Subfolders
+    finalName = extractTempFileName("folder0/testfile1.txt._COPYING_");
+    assertEquals("folder0/testfile1.txt", finalName);
+    finalName = extractTempFileName(
+        "folder0/folder1/testfile1.txt._COPYING_");
+    assertEquals("folder0/folder1/testfile1.txt", finalName);
+    finalName = extractTempFileName(
+        "processedHrsData.txt/_temporary/0/_temporary/" +
+        "attempt_201706281636_0007_m_000003_46/part-00003");
+    assertEquals("processedHrsData.txt/part-00003", finalName);
+  }
+
+  @Test
+  public void testReadOnly() throws IOException {
+    MountTable mount = resolver.getMountPoint("/readonly");
+    assertTrue(mount.isReadOnly());
+
+    PathLocation dest0 = resolver.getDestinationForPath("/readonly/file0.txt");
+    assertDest("subcluster1", dest0);
+    PathLocation dest1 = resolver.getDestinationForPath("/readonly/file1.txt");
+    assertDest("subcluster2", dest1);
+
+    // All these must be in the same location: subcluster0
+    PathLocation dest2 = resolver.getDestinationForPath("/readonly/folder0");
+    assertDest("subcluster1", dest2);
+    PathLocation dest3 = resolver.getDestinationForPath(
+        "/readonly/folder0/file0.txt");
+    assertDest("subcluster1", dest3);
+    PathLocation dest4 = resolver.getDestinationForPath(
+        "/readonly/folder0/file1.txt");
+    assertDest("subcluster1", dest4);
+
+    PathLocation dest5 = resolver.getDestinationForPath(
+        "/readonly/folder0/folder0/file0.txt");
+    assertDest("subcluster1", dest5);
+    PathLocation dest6 = resolver.getDestinationForPath(
+        "/readonly/folder0/folder0/file1.txt");
+    assertDest("subcluster1", dest6);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest7 = resolver.getDestinationForPath("/readonly/folder1");
+    assertDest("subcluster2", dest7);
+    PathLocation dest8 = resolver.getDestinationForPath(
+        "/readonly/folder1/file0.txt");
+    assertDest("subcluster2", dest8);
+    PathLocation dest9 = resolver.getDestinationForPath(
+        "/readonly/folder1/file1.txt");
+    assertDest("subcluster2", dest9);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest10 = resolver.getDestinationForPath("/readonly/folder2");
+    assertDest("subcluster1", dest10);
+    PathLocation dest11 = resolver.getDestinationForPath(
+        "/readonly/folder2/file0.txt");
+    assertDest("subcluster1", dest11);
+    PathLocation dest12 = resolver.getDestinationForPath(
+        "/readonly/folder2/file1.txt");
+    assertDest("subcluster1", dest12);
+  }
+
+  @Test
+  public void testLocalResolver() throws IOException {
+    PathLocation dest0 =
+        resolver.getDestinationForPath("/local/folder0/file0.txt");
+    assertDest("subcluster0", dest0);
+  }
+
+  @Test
+  public void testRandomResolver() throws IOException {
+    Set<String> destinations = new HashSet<>();
+    for (int i = 0; i < 30; i++) {
+      PathLocation dest =
+          resolver.getDestinationForPath("/random/folder0/file0.txt");
+      RemoteLocation firstDest = dest.getDestinations().get(0);
+      String nsId = firstDest.getNameserviceId();
+      destinations.add(nsId);
+    }
+    assertEquals(3, destinations.size());
+  }
+
+  /**
+   * Test that a path has files distributed across destinations evenly.
+   * @param path Path to check.
+   * @throws IOException
+   */
+  private void testEvenDistribution(final String path) throws IOException {
+    testEvenDistribution(path, true);
+  }
+
+  /**
+   * Test that a path has files distributed across destinations evenly or not.
+   * @param path Path to check.
+   * @param even If the distribution should be even or not.
+   * @throws IOException If it cannot check it.
+   */
+  private void testEvenDistribution(final String path, final boolean even)
+      throws IOException {
+
+    // Subcluster -> Files
+    Map<String, Set<String>> results = new HashMap<>();
+    for (int f = 0; f < 10000; f++) {
+      String filename = path + "/file" + f + ".txt";
+      PathLocation destination = resolver.getDestinationForPath(filename);
+      RemoteLocation loc = destination.getDefaultLocation();
+      assertEquals(filename, loc.getDest());
+
+      String nsId = loc.getNameserviceId();
+      if (!results.containsKey(nsId)) {
+        results.put(nsId, new TreeSet<String>());
+      }
+      results.get(nsId).add(filename);
+    }
+
+    if (!even) {
+      // All files should be in one subcluster
+      assertEquals(1, results.size());
+    } else {
+      // Files should be distributed somewhat evenly
+      assertEquals(3, results.size());
+      int count = 0;
+      for (Set<String> files : results.values()) {
+        count = count + files.size();
+      }
+      int avg = count / results.keySet().size();
+      for (Set<String> files : results.values()) {
+        int filesCount = files.size();
+        // Check that the count in each namespace is within 20% of avg
+        assertTrue(filesCount > 0);
+        assertTrue(Math.abs(filesCount - avg) < (avg / 5));
+      }
+    }
+  }
+
+  private static void assertDest(String expectedDest, PathLocation loc) {
+    assertEquals(expectedDest, loc.getDestinations().get(0).getNameserviceId());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/73be4292/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
new file mode 100644
index 0000000..00c2c13
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.ROUTERS;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.newStateStore;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the basic {@link ActiveNamenodeResolver} functionality.
+ */
+public class TestNamenodeResolver {
+
+  private static StateStoreService stateStore;
+  private static ActiveNamenodeResolver namenodeResolver;
+
+  @BeforeClass
+  public static void create() throws Exception {
+
+    Configuration conf = getStateStoreConfiguration();
+
+    // Reduce expirations to 5 seconds
+    conf.setLong(
+        RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
+        TimeUnit.SECONDS.toMillis(5));
+
+    stateStore = newStateStore(conf);
+    assertNotNull(stateStore);
+
+    namenodeResolver = new MembershipNamenodeResolver(conf, stateStore);
+    namenodeResolver.setRouterId(ROUTERS[0]);
+  }
+
+  @AfterClass
+  public static void destroy() throws Exception {
+    stateStore.stop();
+    stateStore.close();
+  }
+
+  @Before
+  public void setup() throws IOException, InterruptedException {
+    // Wait for state store to connect
+    stateStore.loadDriver();
+    waitStateStore(stateStore, 10000);
+
+    // Clear NN registrations
+    boolean cleared = clearRecords(stateStore, MembershipState.class);
+    assertTrue(cleared);
+  }
+
+  @Test
+  public void testStateStoreDisconnected() throws Exception {
+
+    // Add an entry to the store
+    NamenodeStatusReport report = createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
+    assertTrue(namenodeResolver.registerNamenode(report));
+
+    // Close the data store driver
+    stateStore.closeDriver();
+    assertFalse(stateStore.isDriverReady());
+
+    // Flush the caches
+    stateStore.refreshCaches(true);
+
+    // Verify commands fail due to no cached data and no state store
+    // connectivity.
+    List<? extends FederationNamenodeContext> nns =
+        namenodeResolver.getNamenodesForBlockPoolId(NAMESERVICES[0]);
+    assertNull(nns);
+
+    verifyException(namenodeResolver, "registerNamenode",
+        StateStoreUnavailableException.class,
+        new Class[] {NamenodeStatusReport.class}, new Object[] {report});
+  }
+
+  /**
+   * Verify the first registration on the resolver.
+   *
+   * @param nsId Nameservice identifier.
+   * @param nnId Namenode identifier within the nemeservice.
+   * @param resultsCount Number of results expected.
+   * @param state Expected state for the first one.
+   * @throws IOException If we cannot get the namenodes.
+   */
+  private void verifyFirstRegistration(String nsId, String nnId,
+      int resultsCount, FederationNamenodeServiceState state)
+          throws IOException {
+    List<? extends FederationNamenodeContext> namenodes =
+        namenodeResolver.getNamenodesForNameserviceId(nsId);
+    if (resultsCount == 0) {
+      assertNull(namenodes);
+    } else {
+      assertEquals(resultsCount, namenodes.size());
+      if (namenodes.size() > 0) {
+        FederationNamenodeContext namenode = namenodes.get(0);
+        assertEquals(state, namenode.getState());
+        assertEquals(nnId, namenode.getNamenodeId());
+      }
+    }
+  }
+
+  @Test
+  public void testRegistrationExpired()
+      throws InterruptedException, IOException {
+
+    // Populate the state store with a single NN element
+    // 1) ns0:nn0 - Active
+    // Wait for the entry to expire without heartbeating
+    // Verify the NN entry is not accessible once expired.
+    NamenodeStatusReport report = createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
+    assertTrue(namenodeResolver.registerNamenode(report));
+
+    // Load cache
+    stateStore.refreshCaches(true);
+
+    // Verify
+    verifyFirstRegistration(
+        NAMESERVICES[0], NAMENODES[0], 1,
+        FederationNamenodeServiceState.ACTIVE);
+
+    // Wait past expiration (set in conf to 5 seconds)
+    Thread.sleep(6000);
+    // Reload cache
+    stateStore.refreshCaches(true);
+
+    // Verify entry is now expired and is no longer in the cache
+    verifyFirstRegistration(
+        NAMESERVICES[0], NAMENODES[0], 0,
+        FederationNamenodeServiceState.ACTIVE);
+
+    // Heartbeat again, updates dateModified
+    assertTrue(namenodeResolver.registerNamenode(report));
+    // Reload cache
+    stateStore.refreshCaches(true);
+
+    // Verify updated entry is marked active again and accessible to RPC server
+    verifyFirstRegistration(
+        NAMESERVICES[0], NAMENODES[0], 1,
+        FederationNamenodeServiceState.ACTIVE);
+  }
+
+  @Test
+  public void testRegistrationNamenodeSelection()
+      throws InterruptedException, IOException {
+
+    // 1) ns0:nn0 - Active
+    // 2) ns0:nn1 - Standby (newest)
+    // Verify the selected entry is the active entry
+    assertTrue(namenodeResolver.registerNamenode(
+        createNamenodeReport(
+            NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
+    Thread.sleep(100);
+    assertTrue(namenodeResolver.registerNamenode(
+        createNamenodeReport(
+            NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+
+    stateStore.refreshCaches(true);
+
+    verifyFirstRegistration(
+        NAMESERVICES[0], NAMENODES[0], 2,
+        FederationNamenodeServiceState.ACTIVE);
+
+    // 1) ns0:nn0 - Expired (stale)
+    // 2) ns0:nn1 - Standby (newest)
+    // Verify the selected entry is the standby entry as the active entry is
+    // stale
+    assertTrue(namenodeResolver.registerNamenode(
+        createNamenodeReport(
+            NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
+
+    // Expire active registration
+    Thread.sleep(6000);
+
+    // Refresh standby registration
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+
+    // Verify that standby is selected (active is now expired)
+    stateStore.refreshCaches(true);
+    verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 1,
+        FederationNamenodeServiceState.STANDBY);
+
+    // 1) ns0:nn0 - Active
+    // 2) ns0:nn1 - Unavailable (newest)
+    // Verify the selected entry is the active entry
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
+    Thread.sleep(100);
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[1], null)));
+    stateStore.refreshCaches(true);
+    verifyFirstRegistration(NAMESERVICES[0], NAMENODES[0], 2,
+        FederationNamenodeServiceState.ACTIVE);
+
+    // 1) ns0:nn0 - Unavailable (newest)
+    // 2) ns0:nn1 - Standby
+    // Verify the selected entry is the standby entry
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+    Thread.sleep(1000);
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[0], null)));
+
+    stateStore.refreshCaches(true);
+    verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 2,
+        FederationNamenodeServiceState.STANDBY);
+
+    // 1) ns0:nn0 - Active (oldest)
+    // 2) ns0:nn1 - Standby
+    // 3) ns0:nn2 - Active (newest)
+    // Verify the selected entry is the newest active entry
+    assertTrue(namenodeResolver.registerNamenode(
+        createNamenodeReport(NAMESERVICES[0], NAMENODES[0], null)));
+    Thread.sleep(100);
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+    Thread.sleep(100);
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[2], HAServiceState.ACTIVE)));
+
+    stateStore.refreshCaches(true);
+    verifyFirstRegistration(NAMESERVICES[0], NAMENODES[2], 3,
+        FederationNamenodeServiceState.ACTIVE);
+
+    // 1) ns0:nn0 - Standby (oldest)
+    // 2) ns0:nn1 - Standby (newest)
+    // 3) ns0:nn2 - Standby
+    // Verify the selected entry is the newest standby entry
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[0], HAServiceState.STANDBY)));
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[2], HAServiceState.STANDBY)));
+    Thread.sleep(1500);
+    assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
+        NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
+
+    stateStore.refreshCaches(true);
+    verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 3,
+        FederationNamenodeServiceState.STANDBY);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/73be4292/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java
new file mode 100644
index 0000000..cd35816
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test the {@link LocalResolver}.
+ */
+public class TestLocalResolver {
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testLocalResolver() throws IOException {
+
+    // Mock the subcluster mapping
+    Configuration conf = new Configuration();
+    Router router = mock(Router.class);
+    StateStoreService stateStore = mock(StateStoreService.class);
+    MembershipStore membership = mock(MembershipStore.class);
+    when(router.getStateStore()).thenReturn(stateStore);
+    when(stateStore.getRegisteredRecordStore(any(Class.class)))
+        .thenReturn(membership);
+    GetNamenodeRegistrationsResponse response =
+        GetNamenodeRegistrationsResponse.newInstance();
+    // Set the mapping for each client
+    List<MembershipState> records = new LinkedList<>();
+    records.add(newMembershipState("client0", "subcluster0"));
+    records.add(newMembershipState("client1", "subcluster1"));
+    records.add(newMembershipState("client2", "subcluster2"));
+    response.setNamenodeMemberships(records);
+    when(membership.getNamenodeRegistrations(
+        any(GetNamenodeRegistrationsRequest.class))).thenReturn(response);
+
+    // Mock the client resolution: it will be anything in sb
+    final StringBuilder sb = new StringBuilder("clientX");
+    LocalResolver localResolver = new LocalResolver(conf, router);
+    LocalResolver spyLocalResolver = spy(localResolver);
+    doAnswer(new Answer<String>() {
+      @Override
+      public String answer(InvocationOnMock invocation) throws Throwable {
+        return sb.toString();
+      }
+    }).when(spyLocalResolver).getClientAddr();
+
+    // Add the mocks to the resolver
+    MultipleDestinationMountTableResolver resolver =
+        new MultipleDestinationMountTableResolver(conf, router);
+    resolver.addResolver(DestinationOrder.LOCAL, spyLocalResolver);
+
+
+    // We point /local to subclusters 0, 1, 2 with the local order
+    Map<String, String> mapLocal = new HashMap<>();
+    mapLocal.put("subcluster0", "/local");
+    mapLocal.put("subcluster1", "/local");
+    mapLocal.put("subcluster2", "/local");
+    MountTable localEntry = MountTable.newInstance("/local", mapLocal);
+    localEntry.setDestOrder(DestinationOrder.LOCAL);
+    resolver.addEntry(localEntry);
+
+    // Test first with the default destination
+    PathLocation dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster0", dest);
+
+    // We change the client location and verify
+    setClient(sb, "client2");
+    dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster2", dest);
+
+    setClient(sb, "client1");
+    dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster1", dest);
+
+    setClient(sb, "client0");
+    dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster0", dest);
+  }
+
+  private void assertDestination(String expectedNsId, PathLocation loc) {
+    List<RemoteLocation> dests = loc.getDestinations();
+    RemoteLocation dest = dests.get(0);
+    assertEquals(expectedNsId, dest.getNameserviceId());
+  }
+
+  private MembershipState newMembershipState(String addr, String nsId) {
+    return MembershipState.newInstance(
+        "routerId", nsId, "nn0", "cluster0", "blockPool0",
+        addr + ":8001", addr + ":8002", addr + ":8003", addr + ":8004",
+        FederationNamenodeServiceState.ACTIVE, false);
+  }
+
+  /**
+   * Set the address of the client issuing the request. We use a StringBuilder
+   * to modify the value in place for the mock.
+   * @param sb StringBuilder to set the client string.
+   * @param client Address of the client.
+   */
+  private static void setClient(StringBuilder sb, String client) {
+    sb.replace(0, sb.length(), client);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/73be4292/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
new file mode 100644
index 0000000..741d1f6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test functionalities of {@link ConnectionManager}, which manages a pool
+ * of connections to NameNodes.
+ */
+public class TestConnectionManager {
+  private Configuration conf;
+  private ConnectionManager connManager;
+  private static final String[] TEST_GROUP = new String[]{"TEST_GROUP"};
+  private static final UserGroupInformation TEST_USER1 =
+      UserGroupInformation.createUserForTesting("user1", TEST_GROUP);
+  private static final UserGroupInformation TEST_USER2 =
+      UserGroupInformation.createUserForTesting("user2", TEST_GROUP);
+  private static final UserGroupInformation TEST_USER3 =
+      UserGroupInformation.createUserForTesting("user3", TEST_GROUP);
+  private static final String TEST_NN_ADDRESS = "nn1:8080";
+
+  @Before
+  public void setup() throws Exception {
+    conf = new Configuration();
+    connManager = new ConnectionManager(conf);
+    NetUtils.addStaticResolution("nn1", "localhost");
+    NetUtils.createSocketAddrForHost("nn1", 8080);
+    connManager.start();
+  }
+
+  @After
+  public void shutdown() {
+    if (connManager != null) {
+      connManager.close();
+    }
+  }
+
+  @Test
+  public void testCleanup() throws Exception {
+    Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
+
+    ConnectionPool pool1 = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10);
+    addConnectionsToPool(pool1, 9, 4);
+    poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool1);
+
+    ConnectionPool pool2 = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10);
+    addConnectionsToPool(pool2, 10, 10);
+    poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool2);
+
+    checkPoolConnections(TEST_USER1, 9, 4);
+    checkPoolConnections(TEST_USER2, 10, 10);
+
+    // Clean up first pool, one connection should be removed, and second pool
+    // should remain the same.
+    connManager.cleanup(pool1);
+    checkPoolConnections(TEST_USER1, 8, 4);
+    checkPoolConnections(TEST_USER2, 10, 10);
+
+    // Clean up the first pool again, it should have no effect since it reached
+    // the MIN_ACTIVE_RATIO.
+    connManager.cleanup(pool1);
+    checkPoolConnections(TEST_USER1, 8, 4);
+    checkPoolConnections(TEST_USER2, 10, 10);
+
+    // Make sure the number of connections doesn't go below minSize
+    ConnectionPool pool3 = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10);
+    addConnectionsToPool(pool3, 10, 0);
+    poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool3);
+    connManager.cleanup(pool3);
+    checkPoolConnections(TEST_USER3, 2, 0);
+    // With active connections added to pool, make sure it honors the
+    // MIN_ACTIVE_RATIO again
+    addConnectionsToPool(pool3, 10, 2);
+    connManager.cleanup(pool3);
+    checkPoolConnections(TEST_USER3, 4, 2);
+  }
+
+  @Test
+  public void testGetConnection() throws Exception {
+    Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
+    final int totalConns = 10;
+    int activeConns = 5;
+
+    ConnectionPool pool = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10);
+    addConnectionsToPool(pool, totalConns, activeConns);
+    poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool);
+
+    // All remaining connections should be usable
+    final int remainingSlots = totalConns - activeConns;
+    for (int i = 0; i < remainingSlots; i++) {
+      ConnectionContext cc = pool.getConnection();
+      assertTrue(cc.isUsable());
+      cc.getClient();
+      activeConns++;
+    }
+
+    checkPoolConnections(TEST_USER1, totalConns, activeConns);
+
+    // Ask for more and this returns an active connection
+    ConnectionContext cc = pool.getConnection();
+    assertTrue(cc.isActive());
+  }
+
+  private void addConnectionsToPool(ConnectionPool pool, int numTotalConn,
+      int numActiveConn) throws IOException {
+    for (int i = 0; i < numTotalConn; i++) {
+      ConnectionContext cc = pool.newConnection();
+      pool.addConnection(cc);
+      if (i < numActiveConn) {
+        cc.getClient();
+      }
+    }
+  }
+
+  private void checkPoolConnections(UserGroupInformation ugi,
+      int numOfConns, int numOfActiveConns) {
+    for (Map.Entry<ConnectionPoolId, ConnectionPool> e :
+        connManager.getPools().entrySet()) {
+      if (e.getKey().getUgi() == ugi) {
+        assertEquals(numOfConns, e.getValue().getNumConnections());
+        assertEquals(numOfActiveConns, e.getValue().getNumActiveConnections());
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/73be4292/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
new file mode 100644
index 0000000..877fb02
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.service.Service.STATE;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test the service that heartbeats the state of the namenodes to the State
+ * Store.
+ */
+public class TestNamenodeHeartbeat {
+
+  private static RouterDFSCluster cluster;
+  private static ActiveNamenodeResolver namenodeResolver;
+  private static List<NamenodeHeartbeatService> services;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+
+    cluster = new RouterDFSCluster(true, 2);
+
+    // Start NNs and DNs and wait until ready
+    cluster.startCluster();
+
+    // Mock locator that records the heartbeats
+    List<String> nss = cluster.getNameservices();
+    String ns = nss.get(0);
+    Configuration conf = cluster.generateNamenodeConfiguration(ns);
+    namenodeResolver = new MockResolver(conf);
+    namenodeResolver.setRouterId("testrouter");
+
+    // Create one heartbeat service per NN
+    services = new ArrayList<>();
+    for (NamenodeContext nn : cluster.getNamenodes()) {
+      String nsId = nn.getNameserviceId();
+      String nnId = nn.getNamenodeId();
+      NamenodeHeartbeatService service = new NamenodeHeartbeatService(
+          namenodeResolver, nsId, nnId);
+      service.init(conf);
+      service.start();
+      services.add(service);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    cluster.shutdown();
+    for (NamenodeHeartbeatService service: services) {
+      service.stop();
+      service.close();
+    }
+  }
+
+  @Test
+  public void testNamenodeHeartbeatService() throws IOException {
+
+    RouterDFSCluster testCluster = new RouterDFSCluster(true, 1);
+    Configuration heartbeatConfig = testCluster.generateNamenodeConfiguration(
+        NAMESERVICES[0]);
+    NamenodeHeartbeatService server = new NamenodeHeartbeatService(
+        namenodeResolver, NAMESERVICES[0], NAMENODES[0]);
+    server.init(heartbeatConfig);
+    assertEquals(STATE.INITED, server.getServiceState());
+    server.start();
+    assertEquals(STATE.STARTED, server.getServiceState());
+    server.stop();
+    assertEquals(STATE.STOPPED, server.getServiceState());
+    server.close();
+  }
+
+  @Test
+  public void testHearbeat() throws InterruptedException, IOException {
+
+    // Set NAMENODE1 to active for all nameservices
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+      }
+    }
+
+    // Wait for heartbeats to record
+    Thread.sleep(5000);
+
+    // Verify the locator has matching NN entries for each NS
+    for (String ns : cluster.getNameservices()) {
+      List<? extends FederationNamenodeContext> nns =
+          namenodeResolver.getNamenodesForNameserviceId(ns);
+
+      // Active
+      FederationNamenodeContext active = nns.get(0);
+      assertEquals(NAMENODES[0], active.getNamenodeId());
+
+      // Standby
+      FederationNamenodeContext standby = nns.get(1);
+      assertEquals(NAMENODES[1], standby.getNamenodeId());
+    }
+
+    // Switch active NNs in 1/2 nameservices
+    List<String> nss = cluster.getNameservices();
+    String failoverNS = nss.get(0);
+    String normalNs = nss.get(1);
+
+    cluster.switchToStandby(failoverNS, NAMENODES[0]);
+    cluster.switchToActive(failoverNS, NAMENODES[1]);
+
+    // Wait for heartbeats to record
+    Thread.sleep(5000);
+
+    // Verify the locator has recorded the failover for the failover NS
+    List<? extends FederationNamenodeContext> failoverNSs =
+        namenodeResolver.getNamenodesForNameserviceId(failoverNS);
+    // Active
+    FederationNamenodeContext active = failoverNSs.get(0);
+    assertEquals(NAMENODES[1], active.getNamenodeId());
+
+    // Standby
+    FederationNamenodeContext standby = failoverNSs.get(1);
+    assertEquals(NAMENODES[0], standby.getNamenodeId());
+
+    // Verify the locator has the same records for the other ns
+    List<? extends FederationNamenodeContext> normalNss =
+        namenodeResolver.getNamenodesForNameserviceId(normalNs);
+    // Active
+    active = normalNss.get(0);
+    assertEquals(NAMENODES[0], active.getNamenodeId());
+    // Standby
+    standby = normalNss.get(1);
+    assertEquals(NAMENODES[1], standby.getNamenodeId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/73be4292/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
new file mode 100644
index 0000000..e130b7b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.TestConfigurationFieldsBase;
+
+import java.util.HashSet;
+
+/**
+ * Unit test class to compare the following RBF configuration class:
+ * <p></p>
+ * {@link RBFConfigKeys}
+ * <p></p>
+ * against hdfs-rbf-default.xml for missing properties.
+ * <p></p>
+ * Refer to {@link org.apache.hadoop.conf.TestConfigurationFieldsBase}
+ * for how this class works.
+ */
+public class TestRBFConfigFields extends TestConfigurationFieldsBase {
+  @Override
+  public void initializeMemberVariables() {
+    xmlFilename = "hdfs-rbf-default.xml";
+    configurationClasses = new Class[] {RBFConfigKeys.class};
+
+    // Set error modes
+    errorIfMissingConfigProps = true;
+    errorIfMissingXmlProps = true;
+
+    // Initialize used variables
+    configurationPropsToSkipCompare = new HashSet<String>();
+
+    // Allocate
+    xmlPropsToSkipCompare = new HashSet<String>();
+    xmlPrefixToSkipCompare = new HashSet<String>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/73be4292/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
new file mode 100644
index 0000000..39398f7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.service.Service.STATE;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * The the safe mode for the {@link Router} controlled by
+ * {@link SafeModeTimer}.
+ */
+public class TestRouter {
+
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void create() throws IOException {
+    // Basic configuration without the state store
+    conf = new Configuration();
+    // 1 sec cache refresh
+    conf.setInt(RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 1);
+    // Mock resolver classes
+    conf.setClass(RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+        MockResolver.class, ActiveNamenodeResolver.class);
+    conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+        MockResolver.class, FileSubclusterResolver.class);
+
+    // Bind to any available port
+    conf.set(RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(RBFConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY, "0.0.0.0");
+
+    // Simulate a co-located NN
+    conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0");
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + "ns0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + "ns0",
+            "127.0.0.1:0" + 0);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + "ns0",
+            "127.0.0.1:" + 0);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + "ns0",
+            "0.0.0.0");
+  }
+
+  @AfterClass
+  public static void destroy() {
+  }
+
+  @Before
+  public void setup() throws IOException, URISyntaxException {
+  }
+
+  @After
+  public void cleanup() {
+  }
+
+  private static void testRouterStartup(Configuration routerConfig)
+      throws InterruptedException, IOException {
+    Router router = new Router();
+    assertEquals(STATE.NOTINITED, router.getServiceState());
+    router.init(routerConfig);
+    assertEquals(STATE.INITED, router.getServiceState());
+    router.start();
+    assertEquals(STATE.STARTED, router.getServiceState());
+    router.stop();
+    assertEquals(STATE.STOPPED, router.getServiceState());
+    router.close();
+  }
+
+  @Test
+  public void testRouterService() throws InterruptedException, IOException {
+
+    // Admin only
+    testRouterStartup(new RouterConfigBuilder(conf).admin().build());
+
+    // Http only
+    testRouterStartup(new RouterConfigBuilder(conf).http().build());
+
+    // Rpc only
+    testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
+
+    // Metrics only
+    testRouterStartup(new RouterConfigBuilder(conf).metrics().build());
+
+    // Statestore only
+    testRouterStartup(new RouterConfigBuilder(conf).stateStore().build());
+
+    // Heartbeat only
+    testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build());
+
+    // Run with all services
+    testRouterStartup(new RouterConfigBuilder(conf).all().build());
+  }
+
+  @Test
+  public void testRouterRestartRpcService() throws IOException {
+
+    // Start
+    Router router = new Router();
+    router.init(new RouterConfigBuilder(conf).rpc().build());
+    router.start();
+
+    // Verify RPC server is running
+    assertNotNull(router.getRpcServerAddress());
+    RouterRpcServer rpcServer = router.getRpcServer();
+    assertNotNull(rpcServer);
+    assertEquals(STATE.STARTED, rpcServer.getServiceState());
+
+    // Stop router and RPC server
+    router.stop();
+    assertEquals(STATE.STOPPED, rpcServer.getServiceState());
+
+    router.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/73be4292/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
new file mode 100644
index 0000000..a8ffded
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * The administrator interface of the {@link Router} implemented by
+ * {@link RouterAdminServer}.
+ */
+public class TestRouterAdmin {
+
+  private static StateStoreDFSCluster cluster;
+  private static RouterContext routerContext;
+  public static final String RPC_BEAN =
+      "Hadoop:service=Router,name=FederationRPC";
+  private static List<MountTable> mockMountTable;
+  private static StateStoreService stateStore;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    cluster = new StateStoreDFSCluster(false, 1);
+    // Build and start a router with State Store + admin + RPC
+    Configuration conf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+    cluster.addRouterOverrides(conf);
+    cluster.startRouters();
+    routerContext = cluster.getRandomRouter();
+    mockMountTable = cluster.generateMockMountTable();
+    Router router = routerContext.getRouter();
+    stateStore = router.getStateStore();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.stopRouter(routerContext);
+  }
+
+  @Before
+  public void testSetup() throws Exception {
+    assertTrue(
+        synchronizeRecords(stateStore, mockMountTable, MountTable.class));
+  }
+
+  @Test
+  public void testAddMountTable() throws IOException {
+    MountTable newEntry = MountTable.newInstance(
+        "/testpath", Collections.singletonMap("ns0", "/testdir"),
+        Time.now(), Time.now());
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Existing mount table size
+    List<MountTable> records = getMountTableEntries(mountTable);
+    assertEquals(records.size(), mockMountTable.size());
+
+    // Add
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+
+    // New mount table size
+    List<MountTable> records2 = getMountTableEntries(mountTable);
+    assertEquals(records2.size(), mockMountTable.size() + 1);
+  }
+
+  @Test
+  public void testAddDuplicateMountTable() throws IOException {
+    MountTable newEntry = MountTable.newInstance("/testpath",
+        Collections.singletonMap("ns0", "/testdir"), Time.now(), Time.now());
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Existing mount table size
+    List<MountTable> entries1 = getMountTableEntries(mountTable);
+    assertEquals(entries1.size(), mockMountTable.size());
+
+    // Add
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+
+    // New mount table size
+    List<MountTable> entries2 = getMountTableEntries(mountTable);
+    assertEquals(entries2.size(), mockMountTable.size() + 1);
+
+    // Add again, should fail
+    AddMountTableEntryResponse addResponse2 =
+        mountTable.addMountTableEntry(addRequest);
+    assertFalse(addResponse2.getStatus());
+  }
+
+  @Test
+  public void testAddReadOnlyMountTable() throws IOException {
+    MountTable newEntry = MountTable.newInstance(
+        "/readonly", Collections.singletonMap("ns0", "/testdir"),
+        Time.now(), Time.now());
+    newEntry.setReadOnly(true);
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Existing mount table size
+    List<MountTable> records = getMountTableEntries(mountTable);
+    assertEquals(records.size(), mockMountTable.size());
+
+    // Add
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+
+    // New mount table size
+    List<MountTable> records2 = getMountTableEntries(mountTable);
+    assertEquals(records2.size(), mockMountTable.size() + 1);
+
+    // Check that we have the read only entry
+    MountTable record = getMountTableEntry("/readonly");
+    assertEquals("/readonly", record.getSourcePath());
+    assertTrue(record.isReadOnly());
+
+    // Removing the new entry
+    RemoveMountTableEntryRequest removeRequest =
+        RemoveMountTableEntryRequest.newInstance("/readonly");
+    RemoveMountTableEntryResponse removeResponse =
+        mountTable.removeMountTableEntry(removeRequest);
+    assertTrue(removeResponse.getStatus());
+  }
+
+  @Test
+  public void testAddOrderMountTable() throws IOException {
+    testAddOrderMountTable(DestinationOrder.HASH);
+    testAddOrderMountTable(DestinationOrder.LOCAL);
+    testAddOrderMountTable(DestinationOrder.RANDOM);
+    testAddOrderMountTable(DestinationOrder.HASH_ALL);
+  }
+
+  private void testAddOrderMountTable(final DestinationOrder order)
+      throws IOException {
+    final String mnt = "/" + order;
+    MountTable newEntry = MountTable.newInstance(
+        mnt, Collections.singletonMap("ns0", "/testdir"),
+        Time.now(), Time.now());
+    newEntry.setDestOrder(order);
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Add
+    AddMountTableEntryRequest addRequest;
+    AddMountTableEntryResponse addResponse;
+    addRequest = AddMountTableEntryRequest.newInstance(newEntry);
+    addResponse = mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+
+    // Check that we have the read only entry
+    MountTable record = getMountTableEntry(mnt);
+    assertEquals(mnt, record.getSourcePath());
+    assertEquals(order, record.getDestOrder());
+
+    // Removing the new entry
+    RemoveMountTableEntryRequest removeRequest =
+        RemoveMountTableEntryRequest.newInstance(mnt);
+    RemoveMountTableEntryResponse removeResponse =
+        mountTable.removeMountTableEntry(removeRequest);
+    assertTrue(removeResponse.getStatus());
+  }
+
+  @Test
+  public void testRemoveMountTable() throws IOException {
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Existing mount table size
+    List<MountTable> entries1 = getMountTableEntries(mountTable);
+    assertEquals(entries1.size(), mockMountTable.size());
+
+    // Remove an entry
+    RemoveMountTableEntryRequest removeRequest =
+        RemoveMountTableEntryRequest.newInstance("/");
+    mountTable.removeMountTableEntry(removeRequest);
+
+    // New mount table size
+    List<MountTable> entries2 = getMountTableEntries(mountTable);
+    assertEquals(entries2.size(), mockMountTable.size() - 1);
+  }
+
+  @Test
+  public void testEditMountTable() throws IOException {
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Verify starting condition
+    MountTable entry = getMountTableEntry("/");
+    assertEquals(
+        Collections.singletonList(new RemoteLocation("ns0", "/")),
+        entry.getDestinations());
+
+    // Edit the entry for /
+    MountTable updatedEntry = MountTable.newInstance(
+        "/", Collections.singletonMap("ns1", "/"), Time.now(), Time.now());
+    UpdateMountTableEntryRequest updateRequest =
+        UpdateMountTableEntryRequest.newInstance(updatedEntry);
+    mountTable.updateMountTableEntry(updateRequest);
+
+    // Verify edited condition
+    entry = getMountTableEntry("/");
+    assertEquals(
+        Collections.singletonList(new RemoteLocation("ns1", "/")),
+        entry.getDestinations());
+  }
+
+  @Test
+  public void testGetMountTable() throws IOException {
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Verify size of table
+    List<MountTable> entries = getMountTableEntries(mountTable);
+    assertEquals(mockMountTable.size(), entries.size());
+
+    // Verify all entries are present
+    int matches = 0;
+    for (MountTable e : entries) {
+      for (MountTable entry : mockMountTable) {
+        assertEquals(e.getDestinations().size(), 1);
+        assertNotNull(e.getDateCreated());
+        assertNotNull(e.getDateModified());
+        if (entry.getSourcePath().equals(e.getSourcePath())) {
+          matches++;
+        }
+      }
+    }
+    assertEquals(matches, mockMountTable.size());
+  }
+
+  @Test
+  public void testGetSingleMountTableEntry() throws IOException {
+    MountTable entry = getMountTableEntry("/ns0");
+    assertNotNull(entry);
+    assertEquals(entry.getSourcePath(), "/ns0");
+  }
+
+  /**
+   * Gets an existing mount table record in the state store.
+   *
+   * @param mount The mount point of the record to remove.
+   * @return The matching record if found, null if it is not found.
+   * @throws IOException If the state store could not be accessed.
+   */
+  private MountTable getMountTableEntry(final String mount) throws IOException {
+    // Refresh the cache
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance(mount);
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+    List<MountTable> results = getMountTableEntries(mountTable, request);
+    if (results.size() > 0) {
+      // First result is sorted to have the shortest mount string length
+      return results.get(0);
+    }
+    return null;
+  }
+
+  private List<MountTable> getMountTableEntries(MountTableManager mountTable)
+      throws IOException {
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance("/");
+    return getMountTableEntries(mountTable, request);
+  }
+
+  private List<MountTable> getMountTableEntries(MountTableManager mountTable,
+      GetMountTableEntriesRequest request) throws IOException {
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    GetMountTableEntriesResponse response =
+        mountTable.getMountTableEntries(request);
+    return response.getEntries();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/73be4292/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
new file mode 100644
index 0000000..828f19c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Tests Router admin commands.
+ */
+public class TestRouterAdminCLI {
+  private static StateStoreDFSCluster cluster;
+  private static RouterContext routerContext;
+  private static StateStoreService stateStore;
+
+  private static RouterAdmin admin;
+  private static RouterClient client;
+
+  private static final String TEST_USER = "test-user";
+
+  private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+  private static final PrintStream OLD_OUT = System.out;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    cluster = new StateStoreDFSCluster(false, 1);
+    // Build and start a router with State Store + admin + RPC
+    Configuration conf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+    cluster.addRouterOverrides(conf);
+
+    // Start routers
+    cluster.startRouters();
+
+    routerContext = cluster.getRandomRouter();
+    Router router = routerContext.getRouter();
+    stateStore = router.getStateStore();
+
+    Configuration routerConf = new Configuration();
+    InetSocketAddress routerSocket = router.getAdminServerAddress();
+    routerConf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+        routerSocket);
+    admin = new RouterAdmin(routerConf);
+    client = routerContext.getAdminClient();
+  }
+
+  @AfterClass
+  public static void tearDownCluster() {
+    cluster.stopRouter(routerContext);
+    cluster.shutdown();
+    cluster = null;
+  }
+
+  @After
+  public void tearDown() {
+    // set back system out
+    System.setOut(OLD_OUT);
+  }
+
+  @Test
+  public void testAddMountTable() throws Exception {
+    String nsId = "ns0";
+    String src = "/test-addmounttable";
+    String dest = "/addmounttable";
+    String[] argv = new String[] {"-add", src, nsId, dest};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance(src);
+    GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    MountTable mountTable = getResponse.getEntries().get(0);
+
+    List<RemoteLocation> destinations = mountTable.getDestinations();
+    assertEquals(1, destinations.size());
+
+    assertEquals(src, mountTable.getSourcePath());
+    assertEquals(nsId, destinations.get(0).getNameserviceId());
+    assertEquals(dest, destinations.get(0).getDest());
+    assertFalse(mountTable.isReadOnly());
+
+    // test mount table update behavior
+    dest = dest + "-new";
+    argv = new String[] {"-add", src, nsId, dest, "-readonly"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+
+    getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    mountTable = getResponse.getEntries().get(0);
+    assertEquals(2, mountTable.getDestinations().size());
+    assertEquals(nsId, mountTable.getDestinations().get(1).getNameserviceId());
+    assertEquals(dest, mountTable.getDestinations().get(1).getDest());
+    assertTrue(mountTable.isReadOnly());
+  }
+
+  @Test
+  public void testAddOrderMountTable() throws Exception {
+    testAddOrderMountTable(DestinationOrder.HASH);
+    testAddOrderMountTable(DestinationOrder.LOCAL);
+    testAddOrderMountTable(DestinationOrder.RANDOM);
+    testAddOrderMountTable(DestinationOrder.HASH_ALL);
+  }
+
+  private void testAddOrderMountTable(DestinationOrder order)
+      throws Exception {
+    final String mnt = "/" + order;
+    final String nsId = "ns0,ns1";
+    final String dest = "/";
+    String[] argv = new String[] {
+        "-add", mnt, nsId, dest, "-order", order.toString()};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    // Check the state in the State Store
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    MountTableManager mountTable = client.getMountTableManager();
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance(mnt);
+    GetMountTableEntriesResponse response =
+        mountTable.getMountTableEntries(request);
+    List<MountTable> entries = response.getEntries();
+    assertEquals(1, entries.size());
+    assertEquals(2, entries.get(0).getDestinations().size());
+    assertEquals(order, response.getEntries().get(0).getDestOrder());
+  }
+
+  @Test
+  public void testListMountTable() throws Exception {
+    String nsId = "ns0";
+    String src = "/test-lsmounttable";
+    String dest = "/lsmounttable";
+    String[] argv = new String[] {"-add", src, nsId, dest};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    // re-set system out for testing
+    System.setOut(new PrintStream(out));
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    argv = new String[] {"-ls", src};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    assertTrue(out.toString().contains(src));
+
+    out.reset();
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance("/");
+    GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+
+    // Test ls command without input path, it will list
+    // mount table under root path.
+    argv = new String[] {"-ls"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    assertTrue(out.toString().contains(src));
+    String outStr = out.toString();
+    // verify if all the mount table are listed
+    for(MountTable entry: getResponse.getEntries()) {
+      assertTrue(outStr.contains(entry.getSourcePath()));
+    }
+  }
+
+  @Test
+  public void testRemoveMountTable() throws Exception {
+    String nsId = "ns0";
+    String src = "/test-rmmounttable";
+    String dest = "/rmmounttable";
+    String[] argv = new String[] {"-add", src, nsId, dest};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance(src);
+    GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    // ensure mount table added successfully
+    MountTable mountTable = getResponse.getEntries().get(0);
+    assertEquals(src, mountTable.getSourcePath());
+
+    argv = new String[] {"-rm", src};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    assertEquals(0, getResponse.getEntries().size());
+
+    // remove an invalid mount table
+    String invalidPath = "/invalid";
+    System.setOut(new PrintStream(out));
+    argv = new String[] {"-rm", invalidPath};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    assertTrue(out.toString().contains(
+        "Cannot remove mount point " + invalidPath));
+  }
+
+  @Test
+  public void testMountTableDefaultACL() throws Exception {
+    String[] argv = new String[] {"-add", "/testpath0", "ns0", "/testdir0"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance("/testpath0");
+    GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    MountTable mountTable = getResponse.getEntries().get(0);
+
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String group = ugi.getGroups().isEmpty() ? ugi.getShortUserName()
+        : ugi.getPrimaryGroupName();
+    assertEquals(ugi.getShortUserName(), mountTable.getOwnerName());
+    assertEquals(group, mountTable.getGroupName());
+    assertEquals((short) 0755, mountTable.getMode().toShort());
+  }
+
+  @Test
+  public void testMountTablePermissions() throws Exception {
+    // re-set system out for testing
+    System.setOut(new PrintStream(out));
+    // use superuser to add new mount table with only read permission
+    String[] argv = new String[] {"-add", "/testpath2-1", "ns0", "/testdir2-1",
+        "-owner", TEST_USER, "-group", TEST_USER, "-mode", "0455"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    String superUser = UserGroupInformation.
+        getCurrentUser().getShortUserName();
+    // use normal user as current user to test
+    UserGroupInformation remoteUser = UserGroupInformation
+        .createRemoteUser(TEST_USER);
+    UserGroupInformation.setLoginUser(remoteUser);
+
+    // verify read permission by executing other commands
+    verifyExecutionResult("/testpath2-1", true, -1, -1);
+
+    // add new mount table with only write permission
+    argv = new String[] {"-add", "/testpath2-2", "ns0", "/testdir2-2",
+        "-owner", TEST_USER, "-group", TEST_USER, "-mode", "0255"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    verifyExecutionResult("/testpath2-2", false, 0, 0);
+
+    // set mount table entry with read and write permission
+    argv = new String[] {"-add", "/testpath2-3", "ns0", "/testdir2-3",
+        "-owner", TEST_USER, "-group", TEST_USER, "-mode", "0755"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    verifyExecutionResult("/testpath2-3", true, 0, 0);
+
+    // set back login user
+    remoteUser = UserGroupInformation.createRemoteUser(superUser);
+    UserGroupInformation.setLoginUser(remoteUser);
+  }
+
+  /**
+   * Verify router admin commands execution result.
+   *
+   * @param mount
+   *          target mount table
+   * @param canRead
+   *          whether can list mount tables under specified mount
+   * @param addCommandCode
+   *          expected return code of add command executed for specified mount
+   * @param rmCommandCode
+   *          expected return code of rm command executed for specified mount
+   * @throws Exception
+   */
+  private void verifyExecutionResult(String mount, boolean canRead,
+      int addCommandCode, int rmCommandCode) throws Exception {
+    String[] argv = null;
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+
+    out.reset();
+    // execute ls command
+    argv = new String[] {"-ls", mount};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    assertEquals(canRead, out.toString().contains(mount));
+
+    // execute add/update command
+    argv = new String[] {"-add", mount, "ns0", mount + "newdir"};
+    assertEquals(addCommandCode, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    // execute remove command
+    argv = new String[] {"-rm", mount};
+    assertEquals(rmCommandCode, ToolRunner.run(admin, argv));
+  }
+
+  @Test
+  public void testManageSafeMode() throws Exception {
+    // ensure the Router become RUNNING state
+    waitState(RouterServiceState.RUNNING);
+    assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode());
+    assertEquals(0, ToolRunner.run(admin,
+            new String[] {"-safemode", "enter"}));
+    // verify state
+    assertEquals(RouterServiceState.SAFEMODE,
+        routerContext.getRouter().getRouterState());
+    assertTrue(routerContext.getRouter().getRpcServer().isInSafeMode());
+
+    System.setOut(new PrintStream(out));
+    assertEquals(0, ToolRunner.run(admin,
+        new String[] {"-safemode", "get"}));
+    assertTrue(out.toString().contains("true"));
+
+    assertEquals(0, ToolRunner.run(admin,
+        new String[] {"-safemode", "leave"}));
+    // verify state
+    assertEquals(RouterServiceState.RUNNING,
+        routerContext.getRouter().getRouterState());
+    assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode());
+
+    out.reset();
+    assertEquals(0, ToolRunner.run(admin,
+        new String[] {"-safemode", "get"}));
+    assertTrue(out.toString().contains("false"));
+  }
+
+  @Test
+  public void testCreateInvalidEntry() throws Exception {
+    String[] argv = new String[] {
+        "-add", "test-createInvalidEntry", "ns0", "/createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", "/test-createInvalidEntry", "ns0", "createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", null, "ns0", "/createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", "/test-createInvalidEntry", "ns0", null};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", "", "ns0", "/createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", "/test-createInvalidEntry", null, "/createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+
+    argv = new String[] {
+        "-add", "/test-createInvalidEntry", "", "/createInvalidEntry"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+  }
+
+  /**
+   * Wait for the Router transforming to expected state.
+   * @param expectedState Expected Router state.
+   * @throws Exception
+   */
+  private void waitState(final RouterServiceState expectedState)
+      throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return expectedState == routerContext.getRouter().getRouterState();
+      }
+    }, 1000, 30000);
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message