hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [15/21] hadoop git commit: HDFS-6440. Support more than 2 NameNodes. Contributed by Jesse Yates.
Date Wed, 24 Jun 2015 17:50:53 GMT
HDFS-6440. Support more than 2 NameNodes. Contributed by Jesse Yates.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/49dfad94
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/49dfad94
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/49dfad94

Branch: refs/heads/HDFS-7240
Commit: 49dfad942970459297f72632ed8dfd353e0c86de
Parents: 122cad6
Author: Aaron T. Myers <atm@apache.org>
Authored: Tue Jun 23 17:26:11 2015 -0700
Committer: Aaron T. Myers <atm@apache.org>
Committed: Tue Jun 23 17:26:11 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/ha/ZKFailoverController.java  |  61 ++-
 .../org/apache/hadoop/ha/MiniZKFCCluster.java   |  93 +++-
 .../hadoop/ha/TestZKFailoverController.java     |  32 ++
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../bkjournal/TestBookKeeperHACheckpoints.java  |   7 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 +
 .../java/org/apache/hadoop/hdfs/HAUtil.java     |  36 +-
 .../token/block/BlockTokenSecretManager.java    |  40 +-
 .../server/blockmanagement/BlockManager.java    |  21 +-
 .../hdfs/server/namenode/CheckpointConf.java    |  14 +-
 .../hdfs/server/namenode/ImageServlet.java      |  88 +++-
 .../server/namenode/NameNodeHttpServer.java     |   7 +-
 .../hdfs/server/namenode/TransferFsImage.java   |  47 +-
 .../server/namenode/ha/BootstrapStandby.java    |  94 ++--
 .../hdfs/server/namenode/ha/EditLogTailer.java  | 162 +++++--
 .../server/namenode/ha/RemoteNameNodeInfo.java  | 133 ++++++
 .../server/namenode/ha/StandbyCheckpointer.java | 182 +++++---
 .../hdfs/tools/DFSZKFailoverController.java     |  13 +
 .../src/main/resources/hdfs-default.xml         |  20 +
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  | 464 ++++++++++++-------
 .../apache/hadoop/hdfs/MiniDFSNNTopology.java   |  18 +-
 .../hadoop/hdfs/TestDFSUpgradeFromImage.java    |   6 +-
 .../apache/hadoop/hdfs/TestRollingUpgrade.java  | 109 +++--
 .../hadoop/hdfs/qjournal/MiniQJMHACluster.java  |  94 ++--
 .../security/token/block/TestBlockToken.java    |  10 +-
 .../hdfs/server/namenode/TestBackupNode.java    |   2 +-
 .../hdfs/server/namenode/TestCheckpoint.java    |   3 +-
 .../server/namenode/ha/HAStressTestHarness.java |  46 +-
 .../hdfs/server/namenode/ha/HATestUtil.java     |  60 ++-
 .../namenode/ha/TestBootstrapStandby.java       | 176 ++++---
 .../ha/TestBootstrapStandbyWithQJM.java         |  47 +-
 .../ha/TestDNFencingWithReplication.java        |   1 +
 .../server/namenode/ha/TestEditLogTailer.java   |  12 +-
 .../ha/TestFailoverWithBlockTokensEnabled.java  |  55 ++-
 .../server/namenode/ha/TestHAConfiguration.java |  49 +-
 .../namenode/ha/TestPipelinesFailover.java      | 110 +++--
 .../namenode/ha/TestRemoteNameNodeInfo.java     |  61 +++
 .../namenode/ha/TestSeveralNameNodes.java       | 179 +++++++
 .../namenode/ha/TestStandbyCheckpoints.java     | 106 +++--
 .../src/test/resources/hadoop-0.23-reserved.tgz | Bin 4558 -> 5590 bytes
 .../src/test/resources/hadoop-1-reserved.tgz    | Bin 2572 -> 3348 bytes
 .../src/test/resources/hadoop-2-reserved.tgz    | Bin 2838 -> 3465 bytes
 .../src/test/resources/hadoop-22-dfs-dir.tgz    | Bin 318180 -> 413239 bytes
 .../src/test/resources/hadoop1-bbw.tgz          | Bin 40234 -> 43294 bytes
 .../src/test/resources/log4j.properties         |   2 +-
 45 files changed, 1926 insertions(+), 740 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
index b1f5920..30ec8f2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -141,6 +142,7 @@ public abstract class ZKFailoverController {
       throws AccessControlException, IOException;
   protected abstract InetSocketAddress getRpcAddressToBindTo();
   protected abstract PolicyProvider getPolicyProvider();
+  protected abstract List<HAServiceTarget> getAllOtherNodes();
 
   /**
    * Return the name of a znode inside the configured parent znode in which
@@ -616,9 +618,11 @@ public abstract class ZKFailoverController {
    * Coordinate a graceful failover. This proceeds in several phases:
    * 1) Pre-flight checks: ensure that the local node is healthy, and
    * thus a candidate for failover.
-   * 2) Determine the current active node. If it is the local node, no
+   * 2a) Determine the current active node. If it is the local node, no
    * need to failover - return success.
-   * 3) Ask that node to yield from the election for a number of seconds.
+   * 2b) Get the other nodes
+   * 3a) Ask the other nodes to yield from election for a number of seconds
+   * 3b) Ask the active node to yield from the election for a number of seconds.
    * 4) Allow the normal election path to run in other threads. Wait until
    * we either become unhealthy or we see an election attempt recorded by
    * the normal code path.
@@ -648,12 +652,27 @@ public abstract class ZKFailoverController {
           "No need to failover. Returning success.");
       return;
     }
-    
-    // Phase 3: ask the old active to yield from the election.
-    LOG.info("Asking " + oldActive + " to cede its active state for " +
-        timeout + "ms");
-    ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout);
-    oldZkfc.cedeActive(timeout);
+
+    // Phase 2b: get the other nodes
+    List<HAServiceTarget> otherNodes = getAllOtherNodes();
+    List<ZKFCProtocol> otherZkfcs = new ArrayList<ZKFCProtocol>(otherNodes.size());
+
+    // Phase 3: ask the other nodes to yield from the election.
+    HAServiceTarget activeNode = null;
+    for (HAServiceTarget remote : otherNodes) {
+      // same location, same node - may not always be == equality
+      if (remote.getAddress().equals(oldActive.getAddress())) {
+        activeNode = remote;
+        continue;
+      }
+      otherZkfcs.add(cedeRemoteActive(remote, timeout));
+    }
+
+    assert
+      activeNode != null : "Active node does not match any known remote node";
+
+    // Phase 3b: ask the old active to yield
+    otherZkfcs.add(cedeRemoteActive(activeNode, timeout));
 
     // Phase 4: wait for the normal election to make the local node
     // active.
@@ -676,8 +695,10 @@ public abstract class ZKFailoverController {
     // Phase 5. At this point, we made some attempt to become active. So we
     // can tell the old active to rejoin if it wants. This allows a quick
     // fail-back if we immediately crash.
-    oldZkfc.cedeActive(-1);
-    
+    for (ZKFCProtocol zkfc : otherZkfcs) {
+      zkfc.cedeActive(-1);
+    }
+
     if (attempt.succeeded) {
       LOG.info("Successfully became active. " + attempt.status);
     } else {
@@ -688,6 +709,23 @@ public abstract class ZKFailoverController {
   }
 
   /**
+   * Ask the remote zkfc to cede its active status and wait for the specified
+   * timeout before attempting to claim leader status.
+   * @param remote node to ask
+   * @param timeout amount of time to cede
+   * @return the {@link ZKFCProtocol} used to talk to the ndoe
+   * @throws IOException
+   */
+  private ZKFCProtocol cedeRemoteActive(HAServiceTarget remote, int timeout)
+    throws IOException {
+    LOG.info("Asking " + remote + " to cede its active state for "
+               + timeout + "ms");
+    ZKFCProtocol oldZkfc = remote.getZKFCProxy(conf, timeout);
+    oldZkfc.cedeActive(timeout);
+    return oldZkfc;
+  }
+
+  /**
    * Ensure that the local node is in a healthy state, and thus
    * eligible for graceful failover.
    * @throws ServiceFailedException if the node is unhealthy
@@ -777,7 +815,8 @@ public abstract class ZKFailoverController {
           break;
           
         default:
-          throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
+          throw new IllegalArgumentException("Unhandled state:"
+                                               + lastHealthState);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
index 5aee611..b496bf9 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,7 +51,7 @@ public class MiniZKFCCluster {
   private final TestContext ctx;
   private final ZooKeeperServer zks;
 
-  private DummyHAService svcs[];
+  private List<DummyHAService> svcs;
   private DummyZKFCThread thrs[];
   private Configuration conf;
   
@@ -63,38 +65,67 @@ public class MiniZKFCCluster {
     conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
     conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
     conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
-    svcs = new DummyHAService[2];
-    svcs[0] = new DummyHAService(HAServiceState.INITIALIZING,
-        new InetSocketAddress("svc1", 1234));
-    svcs[0].setSharedResource(sharedResource);
-    svcs[1] = new DummyHAService(HAServiceState.INITIALIZING,
-        new InetSocketAddress("svc2", 1234));
-    svcs[1].setSharedResource(sharedResource);
-    
+    svcs = new ArrayList<DummyHAService>(2);
+    // remove any existing instances we are keeping track of
+    DummyHAService.instances.clear();
+
+    for (int i = 0; i < 2; i++) {
+      addSvcs(svcs, i);
+    }
+
     this.ctx = new TestContext();
     this.zks = zks;
   }
-  
+
+  private void addSvcs(List<DummyHAService> svcs, int i) {
+    svcs.add(new DummyHAService(HAServiceState.INITIALIZING, new InetSocketAddress("svc" + (i + 1),
+        1234)));
+    svcs.get(i).setSharedResource(sharedResource);
+  }
+
   /**
    * Set up two services and their failover controllers. svc1 is started
    * first, so that it enters ACTIVE state, and then svc2 is started,
    * which enters STANDBY
    */
   public void start() throws Exception {
+    start(2);
+  }
+
+  /**
+   * Set up the specified number of services and their failover controllers. svc1 is
+   * started first, so that it enters ACTIVE state, and then svc2...svcN is started, which enters
+   * STANDBY.
+   * <p>
+   * Adds any extra svc needed beyond the first two before starting the rest of the cluster.
+   * @param count number of zkfcs to start
+   */
+  public void start(int count) throws Exception {
+    // setup the expected number of zkfcs, if we need to add more. This seemed the least invasive
+    // way to add the services - otherwise its a large test rewrite or changing a lot of assumptions
+    if (count > 2) {
+      for (int i = 2; i < count; i++) {
+        addSvcs(svcs, i);
+      }
+    }
+
     // Format the base dir, should succeed
-    thrs = new DummyZKFCThread[2];
-    thrs[0] = new DummyZKFCThread(ctx, svcs[0]);
+    thrs = new DummyZKFCThread[count];
+    thrs[0] = new DummyZKFCThread(ctx, svcs.get(0));
     assertEquals(0, thrs[0].zkfc.run(new String[]{"-formatZK"}));
     ctx.addThread(thrs[0]);
     thrs[0].start();
     
     LOG.info("Waiting for svc0 to enter active state");
     waitForHAState(0, HAServiceState.ACTIVE);
-    
-    LOG.info("Adding svc1");
-    thrs[1] = new DummyZKFCThread(ctx, svcs[1]);
-    thrs[1].start();
-    waitForHAState(1, HAServiceState.STANDBY);
+
+    // add the remaining zkfc
+    for (int i = 1; i < count; i++) {
+      LOG.info("Adding svc" + i);
+      thrs[i] = new DummyZKFCThread(ctx, svcs.get(i));
+      thrs[i].start();
+      waitForHAState(i, HAServiceState.STANDBY);
+    }
   }
   
   /**
@@ -122,7 +153,7 @@ public class MiniZKFCCluster {
   }
   
   public DummyHAService getService(int i) {
-    return svcs[i];
+    return svcs.get(i);
   }
 
   public ActiveStandbyElector getElector(int i) {
@@ -134,23 +165,23 @@ public class MiniZKFCCluster {
   }
   
   public void setHealthy(int idx, boolean healthy) {
-    svcs[idx].isHealthy = healthy;
+    svcs.get(idx).isHealthy = healthy;
   }
 
   public void setFailToBecomeActive(int idx, boolean doFail) {
-    svcs[idx].failToBecomeActive = doFail;
+    svcs.get(idx).failToBecomeActive = doFail;
   }
 
   public void setFailToBecomeStandby(int idx, boolean doFail) {
-    svcs[idx].failToBecomeStandby = doFail;
+    svcs.get(idx).failToBecomeStandby = doFail;
   }
   
   public void setFailToFence(int idx, boolean doFail) {
-    svcs[idx].failToFence = doFail;
+    svcs.get(idx).failToFence = doFail;
   }
   
   public void setUnreachable(int idx, boolean unreachable) {
-    svcs[idx].actUnreachable = unreachable;
+    svcs.get(idx).actUnreachable = unreachable;
   }
 
   /**
@@ -204,7 +235,7 @@ public class MiniZKFCCluster {
     byte[] data = zks.getZKDatabase().getData(
         DummyZKFC.LOCK_ZNODE, stat, null);
     
-    assertArrayEquals(Ints.toByteArray(svcs[idx].index), data);
+    assertArrayEquals(Ints.toByteArray(svcs.get(idx).index), data);
     long session = stat.getEphemeralOwner();
     LOG.info("Expiring svc " + idx + "'s zookeeper session " + session);
     zks.closeSession(session);
@@ -218,7 +249,7 @@ public class MiniZKFCCluster {
    */
   public void waitForActiveLockHolder(Integer idx)
       throws Exception {
-    DummyHAService svc = idx == null ? null : svcs[idx];
+    DummyHAService svc = idx == null ? null : svcs.get(idx);
     ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
         DummyZKFC.SCOPED_PARENT_ZNODE,
         (idx == null) ? null : Ints.toByteArray(svc.index));
@@ -320,5 +351,17 @@ public class MiniZKFCCluster {
     protected PolicyProvider getPolicyProvider() {
       return null;
     }
+
+    @Override
+    protected List<HAServiceTarget> getAllOtherNodes() {
+      List<HAServiceTarget> services = new ArrayList<HAServiceTarget>(
+          DummyHAService.instances.size());
+      for (DummyHAService service : DummyHAService.instances) {
+        if (service != this.localTarget) {
+          services.add(service);
+        }
+      }
+      return services;
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
index d8271c5..b8d9ce4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
@@ -605,6 +605,38 @@ public class TestZKFailoverController extends ClientBaseWithFixes {
     }
   }
 
+  @Test(timeout = 25000)
+  public void testGracefulFailoverMultipleZKfcs() throws Exception {
+    try {
+      cluster.start(3);
+
+      cluster.waitForActiveLockHolder(0);
+
+      // failover to first
+      cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+      cluster.waitForActiveLockHolder(1);
+
+      // failover to second
+      cluster.getService(2).getZKFCProxy(conf, 5000).gracefulFailover();
+      cluster.waitForActiveLockHolder(2);
+
+      // failover back to original
+      cluster.getService(0).getZKFCProxy(conf, 5000).gracefulFailover();
+      cluster.waitForActiveLockHolder(0);
+
+      Thread.sleep(10000); // allow to quiesce
+
+      assertEquals(0, cluster.getService(0).fenceCount);
+      assertEquals(0, cluster.getService(1).fenceCount);
+      assertEquals(0, cluster.getService(2).fenceCount);
+      assertEquals(2, cluster.getService(0).activeTransitionCount);
+      assertEquals(1, cluster.getService(1).activeTransitionCount);
+      assertEquals(1, cluster.getService(2).activeTransitionCount);
+    } finally {
+      cluster.stop();
+    }
+  }
+
   private int runFC(DummyHAService target, String ... args) throws Exception {
     DummyZKFC zkfc = new DummyZKFC(conf, target);
     return zkfc.run(args);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 981ca55..09a6891 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -38,6 +38,8 @@ Trunk (Unreleased)
 
     HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
 
+    HDFS-6440. Support more than 2 NameNodes. (Jesse Yates via atm)
+
   IMPROVEMENTS
 
     HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
index b74cd7f..ed53512 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
@@ -32,6 +32,10 @@ import org.junit.BeforeClass;
  * using a bookkeeper journal manager as the shared directory
  */
 public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints {
+  //overwrite the nn count
+ static{
+   TestStandbyCheckpoints.NUM_NNS = 2;
+ }
   private static BKJMUtil bkutil = null;
   static int numBookies = 3;
   static int journalCount = 0;
@@ -57,8 +61,7 @@ public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints {
       .build();
     cluster.waitActive();
 
-    nn0 = cluster.getNameNode(0);
-    nn1 = cluster.getNameNode(1);
+    setNNs();
     fs = HATestUtil.configureFailoverFs(cluster, conf);
 
     cluster.transitionToActive(0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 30540a9..ebd668f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -132,6 +132,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090";
   public static final String  DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY = "dfs.namenode.secondary.https-address";
   public static final String  DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:50091";
+  public static final String  DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY = "dfs.namenode.checkpoint.check.quiet-multiplier";
+  public static final double  DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT = 1.5;
   public static final String  DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY = "dfs.namenode.checkpoint.check.period";
   public static final long    DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT = 60;
   public static final String  DFS_NAMENODE_CHECKPOINT_PERIOD_KEY = "dfs.namenode.checkpoint.period";
@@ -544,6 +546,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int DFS_HA_LOGROLL_PERIOD_DEFAULT = 2 * 60; // 2m
   public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period";
   public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
+  public static final String DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY = "dfs.ha.tail-edits.namenode-retries";
+  public static final int DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT = 3;
   public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout";
   public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s
   public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
index c967c69..686a0b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
@@ -143,7 +143,7 @@ public class HAUtil {
    * @param conf the configuration of this node
    * @return the NN ID of the other node in this nameservice
    */
-  public static String getNameNodeIdOfOtherNode(Configuration conf, String nsId) {
+  public static List<String> getNameNodeIdOfOtherNodes(Configuration conf, String nsId) {
     Preconditions.checkArgument(nsId != null,
         "Could not determine namespace id. Please ensure that this " +
         "machine is one of the machines listed as a NN RPC address, " +
@@ -157,20 +157,20 @@ public class HAUtil {
         DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
             nsId),
         nsId);
-    Preconditions.checkArgument(nnIds.size() == 2,
-        "Expected exactly 2 NameNodes in namespace '%s'. " +
-        "Instead, got only %s (NN ids were '%s'",
-        nsId, nnIds.size(), Joiner.on("','").join(nnIds));
+    Preconditions.checkArgument(nnIds.size() >= 2,
+        "Expected at least 2 NameNodes in namespace '%s'. " +
+          "Instead, got only %s (NN ids were '%s')",
+          nsId, nnIds.size(), Joiner.on("','").join(nnIds));
     Preconditions.checkState(myNNId != null && !myNNId.isEmpty(),
         "Could not determine own NN ID in namespace '%s'. Please " +
         "ensure that this node is one of the machines listed as an " +
         "NN RPC address, or configure " + DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY,
         nsId);
 
-    ArrayList<String> nnSet = Lists.newArrayList(nnIds);
-    nnSet.remove(myNNId);
-    assert nnSet.size() == 1;
-    return nnSet.get(0);
+    ArrayList<String> namenodes = Lists.newArrayList(nnIds);
+    namenodes.remove(myNNId);
+    assert namenodes.size() >= 1;
+    return namenodes;
   }
 
   /**
@@ -180,16 +180,20 @@ public class HAUtil {
    * @param myConf the configuration of this node
    * @return the configuration of the other node in an HA setup
    */
-  public static Configuration getConfForOtherNode(
+  public static List<Configuration> getConfForOtherNodes(
       Configuration myConf) {
     
     String nsId = DFSUtil.getNamenodeNameServiceId(myConf);
-    String otherNn = getNameNodeIdOfOtherNode(myConf, nsId);
-    
-    // Look up the address of the active NN.
-    Configuration confForOtherNode = new Configuration(myConf);
-    NameNode.initializeGenericKeys(confForOtherNode, nsId, otherNn);
-    return confForOtherNode;
+    List<String> otherNn = getNameNodeIdOfOtherNodes(myConf, nsId);
+
+    // Look up the address of the other NNs
+    List<Configuration> confs = new ArrayList<Configuration>(otherNn.size());
+    for (String nn : otherNn) {
+      Configuration confForOtherNode = new Configuration(myConf);
+      NameNode.initializeGenericKeys(confForOtherNode, nsId, nn);
+      confs.add(confForOtherNode);
+    }
+    return confs;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index b103c1a..53da44c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -52,17 +52,11 @@ import com.google.common.base.Preconditions;
 @InterfaceAudience.Private
 public class BlockTokenSecretManager extends
     SecretManager<BlockTokenIdentifier> {
-  public static final Log LOG = LogFactory
-      .getLog(BlockTokenSecretManager.class);
-  
-  // We use these in an HA setup to ensure that the pair of NNs produce block
-  // token serial numbers that are in different ranges.
-  private static final int LOW_MASK  = ~(1 << 31);
-  
+  public static final Log LOG = LogFactory.getLog(BlockTokenSecretManager.class);
+
   public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();
 
   private final boolean isMaster;
-  private int nnIndex;
   
   /**
    * keyUpdateInterval is the interval that NN updates its block keys. It should
@@ -77,21 +71,22 @@ public class BlockTokenSecretManager extends
   private final Map<Integer, BlockKey> allKeys;
   private String blockPoolId;
   private final String encryptionAlgorithm;
-  
+
+  private final int intRange;
+  private final int nnRangeStart;
+
   private final SecureRandom nonceGenerator = new SecureRandom();
 
-  ;
-  
   /**
    * Constructor for slaves.
-   * 
+   *
    * @param keyUpdateInterval how often a new key will be generated
    * @param tokenLifetime how long an individual token is valid
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
       long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
     this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
-        encryptionAlgorithm);
+        encryptionAlgorithm, 0, 1);
   }
   
   /**
@@ -99,23 +94,25 @@ public class BlockTokenSecretManager extends
    * 
    * @param keyUpdateInterval how often a new key will be generated
    * @param tokenLifetime how long an individual token is valid
-   * @param nnIndex namenode index
+   * @param nnIndex namenode index of the namenode for which we are creating the manager
    * @param blockPoolId block pool ID
    * @param encryptionAlgorithm encryption algorithm to use
+   * @param numNNs number of namenodes possible
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
-      long tokenLifetime, int nnIndex, String blockPoolId,
+      long tokenLifetime, int nnIndex, int numNNs,  String blockPoolId,
       String encryptionAlgorithm) {
-    this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
-        encryptionAlgorithm);
-    Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1);
-    this.nnIndex = nnIndex;
+    this(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm, nnIndex, numNNs);
+    Preconditions.checkArgument(nnIndex >= 0);
+    Preconditions.checkArgument(numNNs > 0);
     setSerialNo(new SecureRandom().nextInt());
     generateKeys();
   }
   
   private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
-      long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm, int nnIndex, int numNNs) {
+    this.intRange = Integer.MAX_VALUE / numNNs;
+    this.nnRangeStart = intRange * nnIndex;
     this.isMaster = isMaster;
     this.keyUpdateInterval = keyUpdateInterval;
     this.tokenLifetime = tokenLifetime;
@@ -127,7 +124,8 @@ public class BlockTokenSecretManager extends
   
   @VisibleForTesting
   public synchronized void setSerialNo(int serialNo) {
-    this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31);
+    // we mod the serial number by the range and then add that times the index
+    this.serialNo = (serialNo % intRange) + (nnRangeStart);
   }
   
   public void setBlockPoolId(String blockPoolId) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 824801f..7d3a678 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.IOException;
@@ -42,6 +43,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -399,14 +401,21 @@ public class BlockManager {
     boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
 
     if (isHaEnabled) {
-      String thisNnId = HAUtil.getNameNodeId(conf, nsId);
-      String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId);
-      return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1, null,
-          encryptionAlgorithm);
+      // figure out which index we are of the nns
+      Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
+      String nnId = HAUtil.getNameNodeId(conf, nsId);
+      int nnIndex = 0;
+      for (String id : nnIds) {
+        if (id.equals(nnId)) {
+          break;
+        }
+        nnIndex++;
+      }
+      return new BlockTokenSecretManager(updateMin * 60 * 1000L,
+          lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, encryptionAlgorithm);
     } else {
       return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
+          lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
index b1636bc..c30730b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
@@ -44,7 +44,13 @@ public class CheckpointConf {
 
   /** The output dir for legacy OIV image */
   private final String legacyOivImageDir;
-  
+
+  /**
+  * multiplier on the checkpoint period to allow other nodes to do the checkpointing, when not the
+  * 'primary' checkpoint node
+  */
+  private double quietMultiplier;
+
   public CheckpointConf(Configuration conf) {
     checkpointCheckPeriod = conf.getLong(
         DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
@@ -57,6 +63,8 @@ public class CheckpointConf {
     maxRetriesOnMergeError = conf.getInt(DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY,
                                   DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT);
     legacyOivImageDir = conf.get(DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);
+    quietMultiplier = conf.getDouble(DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY,
+      DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT);
     warnForDeprecatedConfigs(conf);
   }
   
@@ -91,4 +99,8 @@ public class CheckpointConf {
   public String getLegacyOivImageDir() {
     return legacyOivImageDir;
   }
+
+  public double getQuietPeriod() {
+    return this.checkpointPeriod * this.quietMultiplier;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
index c565eb5..9dc20b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
@@ -30,6 +30,7 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.commons.logging.Log;
@@ -81,6 +82,9 @@ public class ImageServlet extends HttpServlet {
   private static final String LATEST_FSIMAGE_VALUE = "latest";
   private static final String IMAGE_FILE_TYPE = "imageFile";
 
+  private SortedSet<ImageUploadRequest> currentlyDownloadingCheckpoints = Collections
+      .<ImageUploadRequest> synchronizedSortedSet(new TreeSet<ImageUploadRequest>());
+
   @Override
   public void doGet(final HttpServletRequest request,
       final HttpServletResponse response) throws ServletException, IOException {
@@ -253,10 +257,12 @@ public class ImageServlet extends HttpServlet {
     }
 
     if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) {
-      Configuration otherNnConf = HAUtil.getConfForOtherNode(conf);
-      validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf
-          .get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY),
-          NameNode.getAddress(otherNnConf).getHostName()));
+      List<Configuration> otherNnConfs = HAUtil.getConfForOtherNodes(conf);
+      for (Configuration otherNnConf : otherNnConfs) {
+        validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf
+                .get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY),
+            NameNode.getAddress(otherNnConf).getHostName()));
+      }
     }
 
     for (String v : validRequestors) {
@@ -420,7 +426,6 @@ public class ImageServlet extends HttpServlet {
   /**
    * Set the required parameters for uploading image
    * 
-   * @param httpMethod instance of method to set the parameters
    * @param storage colon separated storageInfo string
    * @param txid txid of the image
    * @param imageFileSize size of the imagefile to be uploaded
@@ -459,12 +464,37 @@ public class ImageServlet extends HttpServlet {
 
             @Override
             public Void run() throws Exception {
+              // if its not the active NN, then we need to notify the caller it was was the wrong
+              // target (regardless of the fact that we got the image)
+              HAServiceProtocol.HAServiceState state = NameNodeHttpServer
+                  .getNameNodeStateFromContext(getServletContext());
+              if (state != HAServiceProtocol.HAServiceState.ACTIVE) {
+                // we need a different response type here so the client can differentiate this
+                // from the failure to upload due to (1) security, or (2) other checkpoints already
+                // present
+                response.sendError(HttpServletResponse.SC_EXPECTATION_FAILED,
+                    "Nameode "+request.getLocalAddr()+" is currently not in a state which can "
+                        + "accept uploads of new fsimages. State: "+state);
+                return null;
+              }
 
               final long txid = parsedParams.getTxId();
+              String remoteAddr = request.getRemoteAddr();
+              ImageUploadRequest imageRequest = new ImageUploadRequest(txid, remoteAddr);
 
               final NameNodeFile nnf = parsedParams.getNameNodeFile();
 
-              if (!nnImage.addToCheckpointing(txid)) {
+              // if the node is attempting to upload an older transaction, we ignore it
+              SortedSet<ImageUploadRequest> larger = currentlyDownloadingCheckpoints.tailSet(imageRequest);
+              if (larger.size() > 0) {
+                response.sendError(HttpServletResponse.SC_CONFLICT,
+                    "Another checkpointer is already in the process of uploading a" +
+                        " checkpoint made up to transaction ID " + larger.last());
+                return null;
+              }
+
+              //make sure no one else has started uploading one
+              if (!currentlyDownloadingCheckpoints.add(imageRequest)) {
                 response.sendError(HttpServletResponse.SC_CONFLICT,
                     "Either current namenode is checkpointing or another"
                         + " checkpointer is already in the process of "
@@ -499,6 +529,10 @@ public class ImageServlet extends HttpServlet {
                   // remove some old ones.
                   nnImage.purgeOldStorage(nnf);
                 } finally {
+                  // remove the request once we've processed it, or it threw an error, so we
+                  // aren't using it either
+                  currentlyDownloadingCheckpoints.remove(imageRequest);
+
                   stream.close();
                 }
               } finally {
@@ -555,4 +589,46 @@ public class ImageServlet extends HttpServlet {
       return nnf;
     }
   }
+
+  private static class ImageUploadRequest implements Comparable<ImageUploadRequest> {
+
+    private final long txId;
+    private final String address;
+
+    public ImageUploadRequest(long txid, String remoteAddr) {
+      this.txId = txid;
+      this.address = remoteAddr;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ImageUploadRequest that = (ImageUploadRequest) o;
+
+      if (txId != that.txId) return false;
+      if (!address.equals(that.address)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (txId ^ (txId >>> 32));
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+
+    @Override public int compareTo(ImageUploadRequest other) {
+      return Long.compare(txId, other.txId);
+    }
+
+    @Override public String toString() {
+      return "ImageRequest{" +
+          "txId=" + txId +
+          ", address='" + address + '\'' +
+          '}';
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
index 09b6b80..6bd9868 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
@@ -27,6 +27,7 @@ import javax.servlet.ServletContext;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -272,4 +273,8 @@ public class NameNodeHttpServer {
       ServletContext context) {
     return (StartupProgress)context.getAttribute(STARTUP_PROGRESS_ATTRIBUTE_KEY);
   }
-}
+
+  public static HAServiceProtocol.HAServiceState getNameNodeStateFromContext(ServletContext context) {
+    return getNameNodeFromContext(context).getServiceState();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index 9783cca..afecf99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -70,7 +70,33 @@ import org.mortbay.jetty.EofException;
  */
 @InterfaceAudience.Private
 public class TransferFsImage {
-  
+
+  public enum TransferResult{
+    SUCCESS(HttpServletResponse.SC_OK, false),
+    AUTHENTICATION_FAILURE(HttpServletResponse.SC_FORBIDDEN, true),
+    NOT_ACTIVE_NAMENODE_FAILURE(HttpServletResponse.SC_EXPECTATION_FAILED, false),
+    OLD_TRANSACTION_ID_FAILURE(HttpServletResponse.SC_CONFLICT, false),
+    UNEXPECTED_FAILURE(-1, true);
+
+    private final int response;
+    private final boolean shouldReThrowException;
+
+    private TransferResult(int response, boolean rethrow) {
+      this.response = response;
+      this.shouldReThrowException = rethrow;
+    }
+
+    public static TransferResult getResultForCode(int code){
+      TransferResult ret = UNEXPECTED_FAILURE;
+      for(TransferResult result:TransferResult.values()){
+        if(result.response == code){
+          return result;
+        }
+      }
+      return ret;
+    }
+  }
+
   public final static String CONTENT_LENGTH = "Content-Length";
   public final static String FILE_LENGTH = "File-Length";
   public final static String MD5_HEADER = "X-MD5-Digest";
@@ -198,9 +224,9 @@ public class TransferFsImage {
    * @param txid the transaction ID of the image to be uploaded
    * @throws IOException if there is an I/O error
    */
-  public static void uploadImageFromStorage(URL fsName, Configuration conf,
+  public static TransferResult uploadImageFromStorage(URL fsName, Configuration conf,
       NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
-    uploadImageFromStorage(fsName, conf, storage, nnf, txid, null);
+    return uploadImageFromStorage(fsName, conf, storage, nnf, txid, null);
   }
 
   /**
@@ -215,7 +241,7 @@ public class TransferFsImage {
    * @param canceler optional canceler to check for abort of upload
    * @throws IOException if there is an I/O error or cancellation
    */
-  public static void uploadImageFromStorage(URL fsName, Configuration conf,
+  public static TransferResult uploadImageFromStorage(URL fsName, Configuration conf,
       NNStorage storage, NameNodeFile nnf, long txid, Canceler canceler)
       throws IOException {
     URL url = new URL(fsName, ImageServlet.PATH_SPEC);
@@ -223,21 +249,18 @@ public class TransferFsImage {
     try {
       uploadImage(url, conf, storage, nnf, txid, canceler);
     } catch (HttpPutFailedException e) {
-      if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
-        // this is OK - this means that a previous attempt to upload
-        // this checkpoint succeeded even though we thought it failed.
-        LOG.info("Image upload with txid " + txid + 
-            " conflicted with a previous image upload to the " +
-            "same NameNode. Continuing...", e);
-        return;
-      } else {
+      // translate the error code to a result, which is a bit more obvious in usage
+      TransferResult result = TransferResult.getResultForCode(e.getResponseCode());
+      if (result.shouldReThrowException) {
         throw e;
       }
+      return result;
     }
     double xferSec = Math.max(
         ((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);
     LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName
         + " in " + xferSec + " seconds");
+    return TransferResult.SUCCESS;
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
index 88d9a6a..c22d7f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
@@ -23,8 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.net.URL;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -77,10 +77,8 @@ public class BootstrapStandby implements Tool, Configurable {
   private static final Log LOG = LogFactory.getLog(BootstrapStandby.class);
   private String nsId;
   private String nnId;
-  private String otherNNId;
+  private List<RemoteNameNodeInfo> remoteNNs;
 
-  private URL otherHttpAddr;
-  private InetSocketAddress otherIpcAddr;
   private Collection<URI> dirsToFormat;
   private List<URI> editUrisToFormat;
   private List<URI> sharedEditsUris;
@@ -139,8 +137,8 @@ public class BootstrapStandby implements Tool, Configurable {
     System.err.println("Usage: " + this.getClass().getSimpleName() +
         " [-force] [-nonInteractive] [-skipSharedEditsCheck]");
   }
-  
-  private NamenodeProtocol createNNProtocolProxy()
+
+  private NamenodeProtocol createNNProtocolProxy(InetSocketAddress otherIpcAddr)
       throws IOException {
     return NameNodeProxies.createNonHAProxy(getConf(),
         otherIpcAddr, NamenodeProtocol.class,
@@ -149,18 +147,36 @@ public class BootstrapStandby implements Tool, Configurable {
   }
   
   private int doRun() throws IOException {
-    NamenodeProtocol proxy = createNNProtocolProxy();
-    NamespaceInfo nsInfo;
-    boolean isUpgradeFinalized;
-    try {
-      nsInfo = proxy.versionRequest();
-      isUpgradeFinalized = proxy.isUpgradeFinalized();
-    } catch (IOException ioe) {
-      LOG.fatal("Unable to fetch namespace information from active NN at " +
-          otherIpcAddr + ": " + ioe.getMessage());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Full exception trace", ioe);
+    // find the active NN
+    NamenodeProtocol proxy = null;
+    NamespaceInfo nsInfo = null;
+    boolean isUpgradeFinalized = false;
+    RemoteNameNodeInfo proxyInfo = null;
+    for (int i = 0; i < remoteNNs.size(); i++) {
+      proxyInfo = remoteNNs.get(i);
+      InetSocketAddress otherIpcAddress = proxyInfo.getIpcAddress();
+      proxy = createNNProtocolProxy(otherIpcAddress);
+      try {
+        // Get the namespace from any active NN. If you just formatted the primary NN and are
+        // bootstrapping the other NNs from that layout, it will only contact the single NN.
+        // However, if there cluster is already running and you are adding a NN later (e.g.
+        // replacing a failed NN), then this will bootstrap from any node in the cluster.
+        nsInfo = proxy.versionRequest();
+        isUpgradeFinalized = proxy.isUpgradeFinalized();
+        break;
+      } catch (IOException ioe) {
+        LOG.warn("Unable to fetch namespace information from remote NN at " + otherIpcAddress
+            + ": " + ioe.getMessage());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Full exception trace", ioe);
+        }
       }
+    }
+
+    if (nsInfo == null) {
+      LOG.fatal(
+          "Unable to fetch namespace information from any remote NN. Possible NameNodes: "
+              + remoteNNs);
       return ERR_CODE_FAILED_CONNECT;
     }
 
@@ -175,9 +191,9 @@ public class BootstrapStandby implements Tool, Configurable {
         "=====================================================\n" +
         "About to bootstrap Standby ID " + nnId + " from:\n" +
         "           Nameservice ID: " + nsId + "\n" +
-        "        Other Namenode ID: " + otherNNId + "\n" +
-        "  Other NN's HTTP address: " + otherHttpAddr + "\n" +
-        "  Other NN's IPC  address: " + otherIpcAddr + "\n" +
+        "        Other Namenode ID: " + proxyInfo.getNameNodeID() + "\n" +
+        "  Other NN's HTTP address: " + proxyInfo.getHttpAddress() + "\n" +
+        "  Other NN's IPC  address: " + proxyInfo.getIpcAddress() + "\n" +
         "             Namespace ID: " + nsInfo.getNamespaceID() + "\n" +
         "            Block pool ID: " + nsInfo.getBlockPoolID() + "\n" +
         "               Cluster ID: " + nsInfo.getClusterID() + "\n" +
@@ -201,7 +217,7 @@ public class BootstrapStandby implements Tool, Configurable {
     }
 
     // download the fsimage from active namenode
-    int download = downloadImage(storage, proxy);
+    int download = downloadImage(storage, proxy, proxyInfo);
     if (download != 0) {
       return download;
     }
@@ -292,7 +308,7 @@ public class BootstrapStandby implements Tool, Configurable {
     }
   }
 
-  private int downloadImage(NNStorage storage, NamenodeProtocol proxy)
+  private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo)
       throws IOException {
     // Load the newly formatted image, using all of the directories
     // (including shared edits)
@@ -316,7 +332,7 @@ public class BootstrapStandby implements Tool, Configurable {
 
       // Download that checkpoint into our storage directories.
       MD5Hash hash = TransferFsImage.downloadImageToStorage(
-          otherHttpAddr, imageTxId, storage, true);
+        proxyInfo.getHttpAddress(), imageTxId, storage, true);
       image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
           hash);
     } catch (IOException ioe) {
@@ -385,18 +401,26 @@ public class BootstrapStandby implements Tool, Configurable {
       throw new HadoopIllegalArgumentException(
         "Shared edits storage is not enabled for this namenode.");
     }
-    
-    Configuration otherNode = HAUtil.getConfForOtherNode(conf);
-    otherNNId = HAUtil.getNameNodeId(otherNode, nsId);
-    otherIpcAddr = NameNode.getServiceAddress(otherNode, true);
-    Preconditions.checkArgument(otherIpcAddr.getPort() != 0 &&
-        !otherIpcAddr.getAddress().isAnyLocalAddress(),
-        "Could not determine valid IPC address for other NameNode (%s)" +
-        ", got: %s", otherNNId, otherIpcAddr);
-
-    final String scheme = DFSUtil.getHttpClientScheme(conf);
-    otherHttpAddr = DFSUtil.getInfoServerWithDefaultHost(
-        otherIpcAddr.getHostName(), otherNode, scheme).toURL();
+
+
+    remoteNNs = RemoteNameNodeInfo.getRemoteNameNodes(conf, nsId);
+    // validate the configured NNs
+    List<RemoteNameNodeInfo> remove = new ArrayList<RemoteNameNodeInfo>(remoteNNs.size());
+    for (RemoteNameNodeInfo info : remoteNNs) {
+      InetSocketAddress address = info.getIpcAddress();
+      LOG.info("Found nn: " + info.getNameNodeID() + ", ipc: " + info.getIpcAddress());
+      if (address.getPort() == 0 || address.getAddress().isAnyLocalAddress()) {
+        LOG.error("Could not determine valid IPC address for other NameNode ("
+            + info.getNameNodeID() + ") , got: " + address);
+        remove.add(info);
+      }
+    }
+
+    // remove any invalid nns
+    remoteNNs.removeAll(remove);
+
+    // make sure we have at least one left to read
+    Preconditions.checkArgument(!remoteNNs.isEmpty(), "Could not find any valid namenodes!");
 
     dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
     editUrisToFormat = FSNamesystem.getNamespaceEditsDirs(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
index 38aa358..cfca77c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
@@ -23,7 +23,13 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -67,10 +73,10 @@ public class EditLogTailer {
   
   private final Configuration conf;
   private final FSNamesystem namesystem;
+  private final Iterator<RemoteNameNodeInfo> nnLookup;
   private FSEditLog editLog;
 
-  private InetSocketAddress activeAddr;
-  private NamenodeProtocol cachedActiveProxy = null;
+  private RemoteNameNodeInfo currentNN;
 
   /**
    * The last transaction ID at which an edit log roll was initiated.
@@ -100,7 +106,17 @@ public class EditLogTailer {
    * available to be read from.
    */
   private final long sleepTimeMs;
-  
+
+  private final int nnCount;
+  private NamenodeProtocol cachedActiveProxy = null;
+  // count of the number of NNs we have attempted in the current lookup loop
+  private int nnLoopCount = 0;
+
+  /**
+   * maximum number of retries we should give each of the remote namenodes before giving up
+   */
+  private int maxRetries;
+
   public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
     this.tailerThread = new EditLogTailerThread();
     this.conf = conf;
@@ -111,12 +127,24 @@ public class EditLogTailer {
 
     logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
         DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000;
+    List<RemoteNameNodeInfo> nns = Collections.emptyList();
     if (logRollPeriodMs >= 0) {
-      this.activeAddr = getActiveNodeAddress();
-      Preconditions.checkArgument(activeAddr.getPort() > 0,
-          "Active NameNode must have an IPC port configured. " +
-          "Got address '%s'", activeAddr);
-      LOG.info("Will roll logs on active node at " + activeAddr + " every " +
+      try {
+        nns = RemoteNameNodeInfo.getRemoteNameNodes(conf);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Remote NameNodes not correctly configured!", e);
+      }
+
+      for (RemoteNameNodeInfo info : nns) {
+        // overwrite the socket address, if we need to
+        InetSocketAddress ipc = NameNode.getServiceAddress(info.getConfiguration(), true);
+        // sanity check the ipc address
+        Preconditions.checkArgument(ipc.getPort() > 0,
+            "Active NameNode must have an IPC port configured. " + "Got address '%s'", ipc);
+        info.setIpcAddress(ipc);
+      }
+
+      LOG.info("Will roll logs on active node every " +
           (logRollPeriodMs / 1000) + " seconds.");
     } else {
       LOG.info("Not going to trigger log rolls on active node because " +
@@ -125,29 +153,24 @@ public class EditLogTailer {
     
     sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
         DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000;
-    
+
+    maxRetries = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY,
+      DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT);
+    if (maxRetries <= 0) {
+      LOG.error("Specified a non-positive number of retries for the number of retries for the " +
+          "namenode connection when manipulating the edit log (" +
+          DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY + "), setting to default: " +
+          DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT);
+      maxRetries = DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT;
+    }
+
+    nnCount = nns.size();
+    // setup the iterator to endlessly loop the nns
+    this.nnLookup = Iterators.cycle(nns);
+
     LOG.debug("logRollPeriodMs=" + logRollPeriodMs +
         " sleepTime=" + sleepTimeMs);
   }
-  
-  private InetSocketAddress getActiveNodeAddress() {
-    Configuration activeConf = HAUtil.getConfForOtherNode(conf);
-    return NameNode.getServiceAddress(activeConf, true);
-  }
-  
-  private NamenodeProtocol getActiveNodeProxy() throws IOException {
-    if (cachedActiveProxy == null) {
-      int rpcTimeout = conf.getInt(
-          DFSConfigKeys.DFS_HA_LOGROLL_RPC_TIMEOUT_KEY,
-          DFSConfigKeys.DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT);
-      NamenodeProtocolPB proxy = RPC.waitForProxy(NamenodeProtocolPB.class,
-          RPC.getProtocolVersion(NamenodeProtocolPB.class), activeAddr, conf,
-          rpcTimeout, Long.MAX_VALUE);
-      cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy);
-    }
-    assert cachedActiveProxy != null;
-    return cachedActiveProxy;
-  }
 
   public void start() {
     tailerThread.start();
@@ -270,9 +293,15 @@ public class EditLogTailer {
    * Trigger the active node to roll its logs.
    */
   private void triggerActiveLogRoll() {
-    LOG.info("Triggering log roll on remote NameNode " + activeAddr);
+    LOG.info("Triggering log roll on remote NameNode");
     try {
-      getActiveNodeProxy().rollEditLog();
+      new MultipleNameNodeProxy<Void>() {
+        @Override
+        protected Void doWork() throws IOException {
+          cachedActiveProxy.rollEditLog();
+          return null;
+        }
+      }.call();
       lastRollTriggerTxId = lastLoadedTxnId;
     } catch (IOException ioe) {
       if (ioe instanceof RemoteException) {
@@ -362,5 +391,76 @@ public class EditLogTailer {
       }
     }
   }
+  /**
+   * Manage the 'active namenode proxy'. This cannot just be the a single proxy since we could
+   * failover across a number of NameNodes, rather than just between an active and a standby.
+   * <p>
+   * We - lazily - get a proxy to one of the configured namenodes and attempt to make the request
+   * against it. If it doesn't succeed, either because the proxy failed to be created or the request
+   * failed, we try the next NN in the list. We try this up to the configuration maximum number of
+   * retries before throwing up our hands. A working proxy is retained across attempts since we
+   * expect the active NameNode to switch rarely.
+   * <p>
+   * This mechanism is <b>very bad</b> for cases where we care about being <i>fast</i>; it just
+   * blindly goes and tries namenodes.
+   */
+  private abstract class MultipleNameNodeProxy<T> implements Callable<T> {
+
+    /**
+     * Do the actual work to the remote namenode via the {@link #cachedActiveProxy}.
+     * @return the result of the work, if there is one
+     * @throws IOException if the actions done to the proxy throw an exception.
+     */
+    protected abstract T doWork() throws IOException;
+
+    public T call() throws IOException {
+      while ((cachedActiveProxy = getActiveNodeProxy()) != null) {
+        try {
+          T ret = doWork();
+          // reset the loop count on success
+          nnLoopCount = 0;
+          return ret;
+        } catch (RemoteException e) {
+          Throwable cause = e.unwrapRemoteException(StandbyException.class);
+          // if its not a standby exception, then we need to re-throw it, something bad has happened
+          if (cause == e) {
+            throw e;
+          } else {
+            // it is a standby exception, so we try the other NN
+            LOG.warn("Failed to reach remote node: " + currentNN
+                + ", retrying with remaining remote NNs");
+            cachedActiveProxy = null;
+            // this NN isn't responding to requests, try the next one
+            nnLoopCount++;
+          }
+        }
+      }
+      throw new IOException("Cannot find any valid remote NN to service request!");
+    }
 
-}
+    private NamenodeProtocol getActiveNodeProxy() throws IOException {
+      if (cachedActiveProxy == null) {
+        while (true) {
+          // if we have reached the max loop count, quit by returning null
+          if ((nnLoopCount / nnCount) >= maxRetries) {
+            return null;
+          }
+
+          currentNN = nnLookup.next();
+          try {
+            NamenodeProtocolPB proxy = RPC.waitForProxy(NamenodeProtocolPB.class,
+                RPC.getProtocolVersion(NamenodeProtocolPB.class), currentNN.getIpcAddress(), conf);
+            cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy);
+            break;
+          } catch (IOException e) {
+            LOG.info("Failed to reach " + currentNN, e);
+            // couldn't even reach this NN, try the next one
+            nnLoopCount++;
+          }
+        }
+      }
+      assert cachedActiveProxy != null;
+      return cachedActiveProxy;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java
new file mode 100644
index 0000000..9a51190
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Information about a single remote NameNode
+ */
+public class RemoteNameNodeInfo {
+
+  public static List<RemoteNameNodeInfo> getRemoteNameNodes(Configuration conf) throws IOException {
+    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+    return getRemoteNameNodes(conf, nsId);
+  }
+
+  public static List<RemoteNameNodeInfo> getRemoteNameNodes(Configuration conf, String nsId)
+      throws IOException {
+    // there is only a single NN configured (and no federation) so we don't have any more NNs
+    if (nsId == null) {
+      return Collections.emptyList();
+    }
+    List<Configuration> otherNodes = HAUtil.getConfForOtherNodes(conf);
+    List<RemoteNameNodeInfo> nns = new ArrayList<RemoteNameNodeInfo>();
+
+    for (Configuration otherNode : otherNodes) {
+      String otherNNId = HAUtil.getNameNodeId(otherNode, nsId);
+      // don't do any validation here as in some cases, it can be overwritten later
+      InetSocketAddress otherIpcAddr = NameNode.getServiceAddress(otherNode, true);
+
+
+      final String scheme = DFSUtil.getHttpClientScheme(conf);
+      URL otherHttpAddr = DFSUtil.getInfoServerWithDefaultHost(otherIpcAddr.getHostName(),
+          otherNode, scheme).toURL();
+
+      nns.add(new RemoteNameNodeInfo(otherNode, otherNNId, otherIpcAddr, otherHttpAddr));
+    }
+    return nns;
+  }
+
+  private final Configuration conf;
+  private final String nnId;
+  private InetSocketAddress ipcAddress;
+  private final URL httpAddress;
+
+  private RemoteNameNodeInfo(Configuration conf, String nnId, InetSocketAddress ipcAddress,
+      URL httpAddress) {
+    this.conf = conf;
+    this.nnId = nnId;
+    this.ipcAddress = ipcAddress;
+    this.httpAddress = httpAddress;
+  }
+
+  public InetSocketAddress getIpcAddress() {
+    return this.ipcAddress;
+  }
+
+  public String getNameNodeID() {
+    return this.nnId;
+  }
+
+  public URL getHttpAddress() {
+    return this.httpAddress;
+  }
+
+  public Configuration getConfiguration() {
+    return this.conf;
+  }
+
+  public void setIpcAddress(InetSocketAddress ipc) {
+    this.ipcAddress = ipc;
+  }
+
+  @Override
+  public String toString() {
+    return "RemoteNameNodeInfo [nnId=" + nnId + ", ipcAddress=" + ipcAddress
+        + ", httpAddress=" + httpAddress + "]";
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RemoteNameNodeInfo that = (RemoteNameNodeInfo) o;
+
+    if (!nnId.equals(that.nnId)) return false;
+    if (!ipcAddress.equals(that.ipcAddress)) return false;
+    // convert to the standard strings since URL.equals does address resolution, which is a
+    // blocking call and a a FindBugs issue.
+    String httpString = httpAddress.toString();
+    String thatHttpString  = that.httpAddress.toString();
+    return httpString.equals(thatHttpString);
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = nnId.hashCode();
+    result = 31 * result + ipcAddress.hashCode();
+    // toString rather than hashCode b/c Url.hashCode is a blocking call.
+    result = 31 * result + httpAddress.toString().hashCode();
+    return result;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
index 1e40368..f5ecbec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
@@ -23,12 +23,10 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URL;
 import java.security.PrivilegedAction;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +43,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -68,12 +67,13 @@ public class StandbyCheckpointer {
   private long lastCheckpointTime;
   private final CheckpointerThread thread;
   private final ThreadFactory uploadThreadFactory;
-  private URL activeNNAddress;
+  private List<URL> activeNNAddresses;
   private URL myNNAddress;
 
   private final Object cancelLock = new Object();
   private Canceler canceler;
-  
+  private boolean isPrimaryCheckPointer = true;
+
   // Keep track of how many checkpoints were canceled.
   // This is for use in tests.
   private static int canceledCount = 0;
@@ -100,14 +100,21 @@ public class StandbyCheckpointer {
     myNNAddress = getHttpAddress(conf);
 
     // Look up the active node's address
-    Configuration confForActive = HAUtil.getConfForOtherNode(conf);
-    activeNNAddress = getHttpAddress(confForActive);
-    
+    List<Configuration> confForActive = HAUtil.getConfForOtherNodes(conf);
+    activeNNAddresses = new ArrayList<URL>(confForActive.size());
+    for (Configuration activeConf : confForActive) {
+      URL activeNNAddress = getHttpAddress(activeConf);
+
+      // sanity check each possible active NN
+      Preconditions.checkArgument(checkAddress(activeNNAddress),
+          "Bad address for active NN: %s", activeNNAddress);
+
+      activeNNAddresses.add(activeNNAddress);
+    }
+
     // Sanity-check.
-    Preconditions.checkArgument(checkAddress(activeNNAddress),
-        "Bad address for active NN: %s", activeNNAddress);
-    Preconditions.checkArgument(checkAddress(myNNAddress),
-        "Bad address for standby NN: %s", myNNAddress);
+    Preconditions.checkArgument(checkAddress(myNNAddress), "Bad address for standby NN: %s",
+        myNNAddress);
   }
   
   private URL getHttpAddress(Configuration conf) throws IOException {
@@ -127,7 +134,7 @@ public class StandbyCheckpointer {
 
   public void start() {
     LOG.info("Starting standby checkpoint thread...\n" +
-        "Checkpointing active NN at " + activeNNAddress + "\n" +
+        "Checkpointing active NN to possible NNs: " + activeNNAddresses + "\n" +
         "Serving checkpoints at " + myNNAddress);
     thread.start();
   }
@@ -148,11 +155,10 @@ public class StandbyCheckpointer {
     thread.interrupt();
   }
 
-  private void doCheckpoint() throws InterruptedException, IOException {
+  private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {
     assert canceler != null;
     final long txid;
     final NameNodeFile imageType;
-    
     // Acquire cpLock to make sure no one is modifying the name system.
     // It does not need the full namesystem write lock, since the only thing
     // that modifies namesystem on standby node is edit log replaying.
@@ -161,9 +167,9 @@ public class StandbyCheckpointer {
       assert namesystem.getEditLog().isOpenForRead() :
         "Standby Checkpointer should only attempt a checkpoint when " +
         "NN is in standby mode, but the edit logs are in an unexpected state";
-      
+
       FSImage img = namesystem.getFSImage();
-      
+
       long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
       long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId();
       assert thisCheckpointTxId >= prevCheckpointTxId;
@@ -185,7 +191,7 @@ public class StandbyCheckpointer {
       img.saveNamespace(namesystem, imageType, canceler);
       txid = img.getStorage().getMostRecentCheckpointTxId();
       assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
-        thisCheckpointTxId + " but instead saved at txid=" + txid;
+          thisCheckpointTxId + " but instead saved at txid=" + txid;
 
       // Save the legacy OIV image, if the output dir is defined.
       String outputDir = checkpointConf.getLegacyOivImageDir();
@@ -195,31 +201,85 @@ public class StandbyCheckpointer {
     } finally {
       namesystem.cpUnlock();
     }
-    
+
+    //early exit if we shouldn't actually send the checkpoint to the ANN
+    if(!sendCheckpoint){
+      return;
+    }
+
     // Upload the saved checkpoint back to the active
-    // Do this in a separate thread to avoid blocking transition to active
+    // Do this in a separate thread to avoid blocking transition to active, but don't allow more
+    // than the expected number of tasks to run or queue up
     // See HDFS-4816
-    ExecutorService executor =
-        Executors.newSingleThreadExecutor(uploadThreadFactory);
-    Future<Void> upload = executor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws IOException {
-        TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
-            namesystem.getFSImage().getStorage(), imageType, txid, canceler);
-        return null;
+    ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,
+        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
+        uploadThreadFactory);
+    // for right now, just match the upload to the nn address by convention. There is no need to
+    // directly tie them together by adding a pair class.
+    List<Future<TransferFsImage.TransferResult>> uploads =
+        new ArrayList<Future<TransferFsImage.TransferResult>>();
+    for (final URL activeNNAddress : activeNNAddresses) {
+      Future<TransferFsImage.TransferResult> upload =
+          executor.submit(new Callable<TransferFsImage.TransferResult>() {
+            @Override
+            public TransferFsImage.TransferResult call() throws IOException {
+              return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
+                  .getFSImage().getStorage(), imageType, txid, canceler);
+            }
+          });
+      uploads.add(upload);
+    }
+    InterruptedException ie = null;
+    IOException ioe= null;
+    int i = 0;
+    boolean success = false;
+    for (; i < uploads.size(); i++) {
+      Future<TransferFsImage.TransferResult> upload = uploads.get(i);
+      try {
+        // TODO should there be some smarts here about retries nodes that are not the active NN?
+        if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {
+          success = true;
+          //avoid getting the rest of the results - we don't care since we had a successful upload
+          break;
+        }
+
+      } catch (ExecutionException e) {
+        ioe = new IOException("Exception during image upload: " + e.getMessage(),
+            e.getCause());
+        break;
+      } catch (InterruptedException e) {
+        ie = e;
+        break;
+      }
+    }
+
+    // we are primary if we successfully updated the ANN
+    this.isPrimaryCheckPointer = success;
+
+    // cleaner than copying code for multiple catch statements and better than catching all
+    // exceptions, so we just handle the ones we expect.
+    if (ie != null || ioe != null) {
+
+      // cancel the rest of the tasks, and close the pool
+      for (; i < uploads.size(); i++) {
+        Future<TransferFsImage.TransferResult> upload = uploads.get(i);
+        // The background thread may be blocked waiting in the throttler, so
+        // interrupt it.
+        upload.cancel(true);
+      }
+
+      // shutdown so we interrupt anything running and don't start anything new
+      executor.shutdownNow();
+      // this is a good bit longer than the thread timeout, just to make sure all the threads
+      // that are not doing any work also stop
+      executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+
+      // re-throw the exception we got, since one of these two must be non-null
+      if (ie != null) {
+        throw ie;
+      } else if (ioe != null) {
+        throw ioe;
       }
-    });
-    executor.shutdown();
-    try {
-      upload.get();
-    } catch (InterruptedException e) {
-      // The background thread may be blocked waiting in the throttler, so
-      // interrupt it.
-      upload.cancel(true);
-      throw e;
-    } catch (ExecutionException e) {
-      throw new IOException("Exception during image upload: " + e.getMessage(),
-          e.getCause());
     }
   }
   
@@ -322,8 +382,10 @@ public class StandbyCheckpointer {
           final long now = monotonicNow();
           final long uncheckpointed = countUncheckpointedTxns();
           final long secsSinceLast = (now - lastCheckpointTime) / 1000;
-          
+
+          // if we need a rollback checkpoint, always attempt to checkpoint
           boolean needCheckpoint = needRollbackCheckpoint;
+
           if (needCheckpoint) {
             LOG.info("Triggering a rollback fsimage for rolling upgrade.");
           } else if (uncheckpointed >= checkpointConf.getTxnCount()) {
@@ -338,19 +400,23 @@ public class StandbyCheckpointer {
                 "exceeds the configured interval " + checkpointConf.getPeriod());
             needCheckpoint = true;
           }
-          
-          synchronized (cancelLock) {
-            if (now < preventCheckpointsUntil) {
-              LOG.info("But skipping this checkpoint since we are about to failover!");
-              canceledCount++;
-              continue;
-            }
-            assert canceler == null;
-            canceler = new Canceler();
-          }
-          
+
           if (needCheckpoint) {
-            doCheckpoint();
+            synchronized (cancelLock) {
+              if (now < preventCheckpointsUntil) {
+                LOG.info("But skipping this checkpoint since we are about to failover!");
+                canceledCount++;
+                continue;
+              }
+              assert canceler == null;
+              canceler = new Canceler();
+            }
+
+            // on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
+            // rollback request, are the checkpointer, are outside the quiet period.
+            boolean sendRequest = isPrimaryCheckPointer || secsSinceLast >= checkpointConf.getQuietPeriod();
+            doCheckpoint(sendRequest);
+
             // reset needRollbackCheckpoint to false only when we finish a ckpt
             // for rollback image
             if (needRollbackCheckpoint
@@ -379,7 +445,7 @@ public class StandbyCheckpointer {
   }
 
   @VisibleForTesting
-  URL getActiveNNAddress() {
-    return activeNNAddress;
+  List<URL> getActiveNNAddresses() {
+    return activeNNAddresses;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
index f125a27..24e5bef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
@@ -261,4 +263,15 @@ public class DFSZKFailoverController extends ZKFailoverController {
     return isThreadDumpCaptured;
   }
 
+  @Override
+  public List<HAServiceTarget> getAllOtherNodes() {
+    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+    List<String> otherNn = HAUtil.getNameNodeIdOfOtherNodes(conf, nsId);
+
+    List<HAServiceTarget> targets = new ArrayList<HAServiceTarget>(otherNn.size());
+    for (String nnId : otherNn) {
+      targets.add(new NNHAServiceTarget(conf, nsId, nnId));
+    }
+    return targets;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dfad94/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index aaa1c2f..76161a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -914,6 +914,18 @@
 </property>
 
 <property>
+  <name>dfs.namenode.checkpoint.check.quiet-multiplier</name>
+  <value>1.5</value>
+  <description>
+    Used to calculate the amount of time between retries when in the 'quiet' period
+    for creating checkpoints (active namenode already has an up-to-date image from another
+    checkpointer), so we wait a multiplier of the dfs.namenode.checkpoint.check.period before
+    retrying the checkpoint because another node likely is already managing the checkpoints,
+    allowing us to save bandwidth to transfer checkpoints that don't need to be used.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.num.checkpoints.retained</name>
   <value>2</value>
   <description>The number of image checkpoint files (fsimage_*) that will be retained by
@@ -1288,6 +1300,14 @@
 </property>
 
 <property>
+  <name>dfs.ha.tail-edits.namenode-retries</name>
+  <value>3</value>
+  <description>
+    Number of retries to use when contacting the namenode when tailing the log.
+  </description>
+</property>
+
+<property>
   <name>dfs.ha.automatic-failover.enabled</name>
   <value>false</value>
   <description>


Mime
View raw message