hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1299412 [4/11] - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/...
Date Sun, 11 Mar 2012 17:56:07 GMT
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Sun Mar 11 17:55:58 2012
@@ -17,20 +17,16 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -50,8 +46,11 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -61,23 +60,22 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
- * A thread per namenode to perform:
- * <ul>
- * <li> Pre-registration handshake with namenode</li>
- * <li> Registration with namenode</li>
- * <li> Send periodic heartbeats to the namenode</li>
- * <li> Handle commands received from the namenode</li>
- * </ul>
+ * One instance per block-pool/namespace on the DN, which handles the
+ * heartbeats to the active and standby NNs for that namespace.
+ * This class manages an instance of {@link BPServiceActor} for each NN,
+ * and delegates calls to both NNs. 
+ * It also maintains the state about which of the NNs is considered active.
  */
 @InterfaceAudience.Private
-class BPOfferService implements Runnable {
+class BPOfferService {
   static final Log LOG = DataNode.LOG;
 
-  final InetSocketAddress nnAddr;
-  
   /**
    * Information about the namespace that this service
    * is registering with. This is assigned after
@@ -92,43 +90,80 @@ class BPOfferService implements Runnable
    */
   DatanodeRegistration bpRegistration;
   
-  long lastBlockReport = 0;
-  long lastDeletedReport = 0;
-
-  boolean resetBlockReportTime = true;
-
-  Thread bpThread;
-  DatanodeProtocolClientSideTranslatorPB bpNamenode;
-  private long lastHeartbeat = 0;
-  private volatile boolean initialized = false;
-  private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList 
-  	= new LinkedList<ReceivedDeletedBlockInfo>();
-  private volatile int pendingReceivedRequests = 0;
-  private volatile boolean shouldServiceRun = true;
   UpgradeManagerDatanode upgradeManager = null;
   private final DataNode dn;
-  private final DNConf dnConf;
 
-  BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
+  /**
+   * A reference to the BPServiceActor associated with the currently
+   * ACTIVE NN. In the case that all NameNodes are in STANDBY mode,
+   * this can be null. If non-null, this must always refer to a member
+   * of the {@link #bpServices} list.
+   */
+  private BPServiceActor bpServiceToActive = null;
+  
+  /**
+   * The list of all actors for namenodes in this nameservice, regardless
+   * of their active or standby states.
+   */
+  private List<BPServiceActor> bpServices =
+    new CopyOnWriteArrayList<BPServiceActor>();
+
+  /**
+   * Each time we receive a heartbeat from a NN claiming to be ACTIVE,
+   * we record that NN's most recent transaction ID here, so long as it
+   * is more recent than the previous value. This allows us to detect
+   * split-brain scenarios in which a prior NN is still asserting its
+   * ACTIVE state but with a too-low transaction ID. See HDFS-2627
+   * for details. 
+   */
+  private long lastActiveClaimTxId = -1;
+
+  BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
+    Preconditions.checkArgument(!nnAddrs.isEmpty(),
+        "Must pass at least one NN.");
     this.dn = dn;
-    this.nnAddr = nnAddr;
-    this.dnConf = dn.getDnConf();
+
+    for (InetSocketAddress addr : nnAddrs) {
+      this.bpServices.add(new BPServiceActor(addr, this));
+    }
+  }
+
+  void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
+    Set<InetSocketAddress> oldAddrs = Sets.newHashSet();
+    for (BPServiceActor actor : bpServices) {
+      oldAddrs.add(actor.getNNSocketAddress());
+    }
+    Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs);
+    
+    if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) {
+      // Keep things simple for now -- we can implement this at a later date.
+      throw new IOException(
+          "HA does not currently support adding a new standby to a running DN. " +
+          "Please do a rolling restart of DNs to reconfigure the list of NNs.");
+    }
   }
 
   /**
-   * returns true if BP thread has completed initialization of storage
-   * and has registered with the corresponding namenode
-   * @return true if initialized
+   * @return true if the service has registered with at least one NameNode.
    */
-  public boolean isInitialized() {
-    return initialized;
+  boolean isInitialized() {
+    return bpRegistration != null;
   }
   
-  public boolean isAlive() {
-    return shouldServiceRun && bpThread.isAlive();
+  /**
+   * @return true if there is at least one actor thread running which is
+   * talking to a NameNode.
+   */
+  boolean isAlive() {
+    for (BPServiceActor actor : bpServices) {
+      if (actor.isAlive()) {
+        return true;
+      }
+    }
+    return false;
   }
   
-  public String getBlockPoolId() {
+  String getBlockPoolId() {
     if (bpNSInfo != null) {
       return bpNSInfo.getBlockPoolID();
     } else {
@@ -138,10 +173,11 @@ class BPOfferService implements Runnable
     }
   }
   
-  public NamespaceInfo getNamespaceInfo() {
+  synchronized NamespaceInfo getNamespaceInfo() {
     return bpNSInfo;
   }
   
+  @Override
   public String toString() {
     if (bpNSInfo == null) {
       // If we haven't yet connected to our NN, we don't yet know our
@@ -153,521 +189,364 @@ class BPOfferService implements Runnable
         storageId = "unknown";
       }
       return "Block pool <registering> (storage id " + storageId +
-        ") connecting to " + nnAddr;
+        ")";
     } else {
       return "Block pool " + getBlockPoolId() +
         " (storage id " + dn.getStorageId() +
-        ") registered with " + nnAddr;
+        ")";
     }
   }
   
-  InetSocketAddress getNNSocketAddress() {
-    return nnAddr;
+  void reportBadBlocks(ExtendedBlock block) {
+    checkBlock(block);
+    for (BPServiceActor actor : bpServices) {
+      actor.reportBadBlocks(block);
+    }
   }
-
-  /**
-   * Used to inject a spy NN in the unit tests.
+  
+  /*
+   * Informing the name node could take a long long time! Should we wait
+   * till namenode is informed before responding with success to the
+   * client? For now we don't.
    */
-  @VisibleForTesting
-  void setNameNode(DatanodeProtocolClientSideTranslatorPB dnProtocol) {
-    bpNamenode = dnProtocol;
+  void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+    checkBlock(block);
+    checkDelHint(delHint);
+    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
+        block.getLocalBlock(),
+        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
+        delHint);
+
+    for (BPServiceActor actor : bpServices) {
+      actor.notifyNamenodeBlockImmediately(bInfo);
+    }
   }
 
-  /**
-   * Perform the first part of the handshake with the NameNode.
-   * This calls <code>versionRequest</code> to determine the NN's
-   * namespace and version info. It automatically retries until
-   * the NN responds or the DN is shutting down.
-   * 
-   * @return the NamespaceInfo
-   * @throws IncorrectVersionException if the remote NN does not match
-   * this DN's version
-   */
-  NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException {
-    NamespaceInfo nsInfo = null;
-    while (shouldRun()) {
-      try {
-        nsInfo = bpNamenode.versionRequest();
-        LOG.debug(this + " received versionRequest response: " + nsInfo);
-        break;
-      } catch(SocketTimeoutException e) {  // namenode is busy
-        LOG.warn("Problem connecting to server: " + nnAddr);
-      } catch(IOException e ) {  // namenode is not available
-        LOG.warn("Problem connecting to server: " + nnAddr);
-      }
-      
-      // try again in a second
-      sleepAndLogInterrupts(5000, "requesting version info from NN");
+  private void checkBlock(ExtendedBlock block) {
+    Preconditions.checkArgument(block != null,
+        "block is null");
+    Preconditions.checkArgument(block.getBlockPoolId().equals(getBlockPoolId()),
+        "block belongs to BP %s instead of BP %s",
+        block.getBlockPoolId(), getBlockPoolId());
+  }
+  
+  private void checkDelHint(String delHint) {
+    Preconditions.checkArgument(delHint != null,
+        "delHint is null");
+  }
+
+  void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+    checkBlock(block);
+    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
+       block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
+    
+    for (BPServiceActor actor : bpServices) {
+      actor.notifyNamenodeDeletedBlock(bInfo);
     }
+  }
+  
+  void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+    checkBlock(block);
+    ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
+       block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
     
-    if (nsInfo != null) {
-      checkNNVersion(nsInfo);        
+    for (BPServiceActor actor : bpServices) {
+      actor.notifyNamenodeBlockImmediately(bInfo);
     }
-    return nsInfo;
   }
 
-  private void checkNNVersion(NamespaceInfo nsInfo)
-      throws IncorrectVersionException {
-    // build and layout versions should match
-    String nsBuildVer = nsInfo.getBuildVersion();
-    String stBuildVer = Storage.getBuildVersion();
-    if (!nsBuildVer.equals(stBuildVer)) {
-      LOG.warn("Data-node and name-node Build versions must be the same. " +
-        "Namenode build version: " + nsBuildVer + "Datanode " +
-        "build version: " + stBuildVer);
-      throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
+  //This must be called only by blockPoolManager
+  void start() {
+    for (BPServiceActor actor : bpServices) {
+      actor.start();
     }
-
-    if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
-      LOG.warn("Data-node and name-node layout versions must be the same." +
-        " Expected: "+ HdfsConstants.LAYOUT_VERSION +
-        " actual "+ bpNSInfo.getLayoutVersion());
-      throw new IncorrectVersionException(
-          bpNSInfo.getLayoutVersion(), "namenode");
+  }
+  
+  //This must be called only by blockPoolManager.
+  void stop() {
+    for (BPServiceActor actor : bpServices) {
+      actor.stop();
+    }
+  }
+  
+  //This must be called only by blockPoolManager
+  void join() {
+    for (BPServiceActor actor : bpServices) {
+      actor.join();
     }
   }
 
-  private void connectToNNAndHandshake() throws IOException {
-    // get NN proxy
-    bpNamenode = dn.connectToNN(nnAddr);
-
-    // First phase of the handshake with NN - get the namespace
-    // info.
-    bpNSInfo = retrieveNamespaceInfo();
-    
-    // Now that we know the namespace ID, etc, we can pass this to the DN.
-    // The DN can now initialize its local storage if we are the
-    // first BP to handshake, etc.
-    dn.initBlockPool(this);
+  synchronized UpgradeManagerDatanode getUpgradeManager() {
+    if(upgradeManager == null)
+      upgradeManager = 
+        new UpgradeManagerDatanode(dn, getBlockPoolId());
     
-    // Second phase of the handshake with the NN.
-    register();
+    return upgradeManager;
   }
   
-  /**
-   * This methods  arranges for the data node to send the block report at 
-   * the next heartbeat.
-   */
-  void scheduleBlockReport(long delay) {
-    if (delay > 0) { // send BR after random delay
-      lastBlockReport = System.currentTimeMillis()
-      - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
-    } else { // send at next heartbeat
-      lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
-    }
-    resetBlockReportTime = true; // reset future BRs for randomness
+  void processDistributedUpgradeCommand(UpgradeCommand comm)
+  throws IOException {
+    UpgradeManagerDatanode upgradeManager = getUpgradeManager();
+    upgradeManager.processUpgradeCommand(comm);
   }
 
-  void reportBadBlocks(ExtendedBlock block) {
-    DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
-    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
-    
-    try {
-      bpNamenode.reportBadBlocks(blocks);  
-    } catch (IOException e){
-      /* One common reason is that NameNode could be in safe mode.
-       * Should we keep on retrying in that case?
-       */
-      LOG.warn("Failed to report bad block " + block + " to namenode : "
-          + " Exception", e);
-    }
+  /**
+   * Start distributed upgrade if it should be initiated by the data-node.
+   */
+  synchronized void startDistributedUpgradeIfNeeded() throws IOException {
+    UpgradeManagerDatanode um = getUpgradeManager();
     
+    if(!um.getUpgradeState())
+      return;
+    um.setUpgradeState(false, um.getUpgradeVersion());
+    um.startUpgrade();
+    return;
   }
   
+  DataNode getDataNode() {
+    return dn;
+  }
+
   /**
-   * Report received blocks and delete hints to the Namenode
-   * @throws IOException
+   * Called by the BPServiceActors when they handshake to a NN.
+   * If this is the first NN connection, this sets the namespace info
+   * for this BPOfferService. If it's a connection to a new NN, it
+   * verifies that this namespace matches (eg to prevent a misconfiguration
+   * where a StandbyNode from a different cluster is specified)
    */
-  private void reportReceivedDeletedBlocks() throws IOException {
-
-    // check if there are newly received blocks
-    ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
-    int currentReceivedRequestsCounter;
-    synchronized (receivedAndDeletedBlockList) {
-      currentReceivedRequestsCounter = pendingReceivedRequests;
-      int numBlocks = receivedAndDeletedBlockList.size();
-      if (numBlocks > 0) {
-        //
-        // Send newly-received and deleted blockids to namenode
-        //
-        receivedAndDeletedBlockArray = receivedAndDeletedBlockList
-            .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
-      }
-    }
-    if (receivedAndDeletedBlockArray != null) {
-      StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
-          bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
-      bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(),
-          report);
-      synchronized (receivedAndDeletedBlockList) {
-        for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
-          receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
-        }
-        pendingReceivedRequests -= currentReceivedRequestsCounter;
-      }
+  synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
+    if (this.bpNSInfo == null) {
+      this.bpNSInfo = nsInfo;
+      
+      // Now that we know the namespace ID, etc, we can pass this to the DN.
+      // The DN can now initialize its local storage if we are the
+      // first BP to handshake, etc.
+      dn.initBlockPool(this);
+      return;
+    } else {
+      checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
+          "Blockpool ID");
+      checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
+          "Namespace ID");
+      checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
+          "Cluster ID");
     }
   }
 
-  /*
-   * Informing the name node could take a long long time! Should we wait
-   * till namenode is informed before responding with success to the
-   * client? For now we don't.
-   */
-  void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
-    if (block == null || delHint == null) {
-      throw new IllegalArgumentException(block == null ? "Block is null"
-          : "delHint is null");
+  /**
+   * After one of the BPServiceActors registers successfully with the
+   * NN, it calls this function to verify that the NN it connected to
+   * is consistent with other NNs serving the block-pool.
+   */
+  void registrationSucceeded(BPServiceActor bpServiceActor,
+      DatanodeRegistration reg) throws IOException {
+    if (bpRegistration != null) {
+      checkNSEquality(bpRegistration.storageInfo.getNamespaceID(),
+          reg.storageInfo.getNamespaceID(), "namespace ID");
+      checkNSEquality(bpRegistration.storageInfo.getClusterID(),
+          reg.storageInfo.getClusterID(), "cluster ID");
+    } else {
+      bpRegistration = reg;
     }
+    
+    dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
+  }
 
-    if (!block.getBlockPoolId().equals(getBlockPoolId())) {
-      LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
-          + getBlockPoolId());
-      return;
+  /**
+   * Verify equality of two namespace-related fields, throwing
+   * an exception if they are unequal.
+   */
+  private static void checkNSEquality(
+      Object ourID, Object theirID,
+      String idHelpText) throws IOException {
+    if (!ourID.equals(theirID)) {
+      throw new IOException(idHelpText + " mismatch: " +
+          "previously connected to " + idHelpText + " " + ourID + 
+          " but now connected to " + idHelpText + " " + theirID);
     }
+  }
 
-    synchronized (receivedAndDeletedBlockList) {
-      receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
-          .getLocalBlock(), delHint));
-      pendingReceivedRequests++;
-      receivedAndDeletedBlockList.notifyAll();
-    }
+  synchronized DatanodeRegistration createRegistration() {
+    Preconditions.checkState(bpNSInfo != null,
+        "getRegistration() can only be called after initial handshake");
+    return dn.createBPRegistration(bpNSInfo);
   }
 
-  void notifyNamenodeDeletedBlock(ExtendedBlock block) {
-    if (block == null) {
-      throw new IllegalArgumentException("Block is null");
+  /**
+   * Called when an actor shuts down. If this is the last actor
+   * to shut down, shuts down the whole blockpool in the DN.
+   */
+  synchronized void shutdownActor(BPServiceActor actor) {
+    if (bpServiceToActive == actor) {
+      bpServiceToActive = null;
     }
 
-    if (!block.getBlockPoolId().equals(getBlockPoolId())) {
-      LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
-          + getBlockPoolId());
-      return;
-    }
+    bpServices.remove(actor);
 
-    synchronized (receivedAndDeletedBlockList) {
-      receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
-          .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT));
+    if (bpServices.isEmpty()) {
+      dn.shutdownBlockPool(this);
+      
+      if(upgradeManager != null)
+        upgradeManager.shutdownUpgrade();
     }
   }
   
 
   /**
-   * Report the list blocks to the Namenode
-   * @throws IOException
+   * Called by the DN to report an error to the NNs.
    */
-  DatanodeCommand blockReport() throws IOException {
-    // send block report if timer has expired.
-    DatanodeCommand cmd = null;
-    long startTime = now();
-    if (startTime - lastBlockReport > dnConf.blockReportInterval) {
-
-      // Create block report
-      long brCreateStartTime = now();
-      BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId());
-
-      // Send block report
-      long brSendStartTime = now();
-      StorageBlockReport[] report = { new StorageBlockReport(
-          bpRegistration.getStorageID(), bReport.getBlockListAsLongs()) };
-      cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), report);
-
-      // Log the block report processing stats from Datanode perspective
-      long brSendCost = now() - brSendStartTime;
-      long brCreateCost = brSendStartTime - brCreateStartTime;
-      dn.metrics.addBlockReport(brSendCost);
-      LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
-          + " blocks took " + brCreateCost + " msec to generate and "
-          + brSendCost + " msecs for RPC and NN processing");
-
-      // If we have sent the first block report, then wait a random
-      // time before we start the periodic block reports.
-      if (resetBlockReportTime) {
-        lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
-        resetBlockReportTime = false;
-      } else {
-        /* say the last block report was at 8:20:14. The current report
-         * should have started around 9:20:14 (default 1 hour interval).
-         * If current time is :
-         *   1) normal like 9:20:18, next report should be at 10:20:14
-         *   2) unexpected like 11:35:43, next report should be at 12:20:14
-         */
-        lastBlockReport += (now() - lastBlockReport) /
-        dnConf.blockReportInterval * dnConf.blockReportInterval;
-      }
-      LOG.info("sent block report, processed command:" + cmd);
-    }
-    return cmd;
-  }
-  
-  
-  DatanodeCommand [] sendHeartBeat() throws IOException {
-    // reports number of failed volumes
-    StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
-        false, dn.data.getCapacity(), dn.data.getDfsUsed(),
-        dn.data.getRemaining(), dn.data.getBlockPoolUsed(getBlockPoolId())) };
-    return bpNamenode.sendHeartbeat(bpRegistration, report,
-        dn.xmitsInProgress.get(),
-        dn.getXceiverCount(), dn.data.getNumFailedVolumes());
-  }
-  
-  //This must be called only by blockPoolManager
-  void start() {
-    if ((bpThread != null) && (bpThread.isAlive())) {
-      //Thread is started already
-      return;
+  void trySendErrorReport(int errCode, String errMsg) {
+    for (BPServiceActor actor : bpServices) {
+      actor.trySendErrorReport(errCode, errMsg);
     }
-    bpThread = new Thread(this, formatThreadName());
-    bpThread.setDaemon(true); // needed for JUnit testing
-    bpThread.start();
-  }
-  
-  private String formatThreadName() {
-    Collection<URI> dataDirs = DataNode.getStorageDirs(dn.getConf());
-    return "DataNode: [" +
-      StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " +
-      " heartbeating to " + nnAddr;
   }
 
-  //This must be called only by blockPoolManager.
-  void stop() {
-    shouldServiceRun = false;
-    if (bpThread != null) {
-        bpThread.interrupt();
+  /**
+   * Ask each of the actors to schedule a block report after
+   * the specified delay.
+   */
+  void scheduleBlockReport(long delay) {
+    for (BPServiceActor actor : bpServices) {
+      actor.scheduleBlockReport(delay);
     }
   }
-  
-  //This must be called only by blockPoolManager
-  void join() {
-    try {
-      if (bpThread != null) {
-        bpThread.join();
-      }
-    } catch (InterruptedException ie) { }
-  }
-  
-  //Cleanup method to be called by current thread before exiting.
-  private synchronized void cleanUp() {
-    
-    if(upgradeManager != null)
-      upgradeManager.shutdownUpgrade();
-    shouldServiceRun = false;
-    IOUtils.cleanup(LOG, bpNamenode);
-    dn.shutdownBlockPool(this);
-  }
 
   /**
-   * Main loop for each BP thread. Run until shutdown,
-   * forever calling remote NameNode functions.
-   */
-  private void offerService() throws Exception {
-    LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
-        + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
-        + dnConf.blockReportInterval + "msec" + " Initial delay: "
-        + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
-        + dnConf.heartBeatInterval);
-
-    //
-    // Now loop for a long time....
-    //
-    while (shouldRun()) {
+   * Ask each of the actors to report a bad block hosted on another DN.
+   */
+  void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) {
+    for (BPServiceActor actor : bpServices) {
       try {
-        long startTime = now();
-
-        //
-        // Every so often, send heartbeat or block-report
-        //
-        if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
-          //
-          // All heartbeat messages include following info:
-          // -- Datanode name
-          // -- data transfer port
-          // -- Total capacity
-          // -- Bytes remaining
-          //
-          lastHeartbeat = startTime;
-          if (!dn.areHeartbeatsDisabledForTests()) {
-            DatanodeCommand[] cmds = sendHeartBeat();
-            dn.metrics.addHeartbeat(now() - startTime);
-
-            long startProcessCommands = now();
-            if (!processCommand(cmds))
-              continue;
-            long endProcessCommands = now();
-            if (endProcessCommands - startProcessCommands > 2000) {
-              LOG.info("Took " + (endProcessCommands - startProcessCommands) +
-                  "ms to process " + cmds.length + " commands from NN");
-            }
-          }
-        }
-        if (pendingReceivedRequests > 0
-            || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
-          reportReceivedDeletedBlocks();
-          lastDeletedReport = startTime;
-        }
-
-        DatanodeCommand cmd = blockReport();
-        processCommand(cmd);
-
-        // Now safe to start scanning the block pool
-        if (dn.blockScanner != null) {
-          dn.blockScanner.addBlockPool(this.getBlockPoolId());
-        }
-
-        //
-        // There is no work to do;  sleep until hearbeat timer elapses, 
-        // or work arrives, and then iterate again.
-        //
-        long waitTime = dnConf.heartBeatInterval - 
-        (System.currentTimeMillis() - lastHeartbeat);
-        synchronized(receivedAndDeletedBlockList) {
-          if (waitTime > 0 && pendingReceivedRequests == 0) {
-            try {
-            	receivedAndDeletedBlockList.wait(waitTime);
-            } catch (InterruptedException ie) {
-              LOG.warn("BPOfferService for " + this + " interrupted");
-            }
-          }
-        } // synchronized
-      } catch(RemoteException re) {
-        String reClass = re.getClassName();
-        if (UnregisteredNodeException.class.getName().equals(reClass) ||
-            DisallowedDatanodeException.class.getName().equals(reClass) ||
-            IncorrectVersionException.class.getName().equals(reClass)) {
-          LOG.warn(this + " is shutting down", re);
-          shouldServiceRun = false;
-          return;
-        }
-        LOG.warn("RemoteException in offerService", re);
-        try {
-          long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
-          Thread.sleep(sleepTime);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
+        actor.reportRemoteBadBlock(dnInfo, block);
       } catch (IOException e) {
-        LOG.warn("IOException in offerService", e);
+        LOG.warn("Couldn't report bad block " + block + " to " + actor,
+            e);
       }
-    } // while (shouldRun())
-  } // offerService
+    }
+  }
+
+  /**
+   * @return a proxy to the active NN, or null if the BPOS has not
+   * acknowledged any NN as active yet.
+   */
+  synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
+    if (bpServiceToActive != null) {
+      return bpServiceToActive.bpNamenode;
+    } else {
+      return null;
+    }
+  }
 
+  @VisibleForTesting
+  synchronized List<BPServiceActor> getBPServiceActors() {
+    return Lists.newArrayList(bpServices);
+  }
+  
   /**
-   * Register one bp with the corresponding NameNode
-   * <p>
-   * The bpDatanode needs to register with the namenode on startup in order
-   * 1) to report which storage it is serving now and 
-   * 2) to receive a registrationID
-   *  
-   * issued by the namenode to recognize registered datanodes.
+   * Update the BPOS's view of which NN is active, based on a heartbeat
+   * response from one of the actors.
    * 
-   * @see FSNamesystem#registerDatanode(DatanodeRegistration)
-   * @throws IOException
+   * @param actor the actor which received the heartbeat
+   * @param nnHaState the HA-related heartbeat contents
    */
-  void register() throws IOException {
-    Preconditions.checkState(bpNSInfo != null,
-        "register() should be called after handshake()");
+  synchronized void updateActorStatesFromHeartbeat(
+      BPServiceActor actor,
+      NNHAStatusHeartbeat nnHaState) {
+    final long txid = nnHaState.getTxId();
     
-    // The handshake() phase loaded the block pool storage
-    // off disk - so update the bpRegistration object from that info
-    bpRegistration = dn.createBPRegistration(bpNSInfo);
-
-    LOG.info(this + " beginning handshake with NN");
-
-    while (shouldRun()) {
-      try {
-        // Use returned registration from namenode with updated machine name.
-        bpRegistration = bpNamenode.registerDatanode(bpRegistration,
-            new DatanodeStorage[0]);
-        break;
-      } catch(SocketTimeoutException e) {  // namenode is busy
-        LOG.info("Problem connecting to server: " + nnAddr);
-        sleepAndLogInterrupts(1000, "connecting to server");
+    final boolean nnClaimsActive =
+      nnHaState.getState() == NNHAStatusHeartbeat.State.ACTIVE;
+    final boolean bposThinksActive = bpServiceToActive == actor;
+    final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; 
+    
+    if (nnClaimsActive && !bposThinksActive) {
+      LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
+          "txid=" + txid);
+      if (!isMoreRecentClaim) {
+        // Split-brain scenario - an NN is trying to claim active
+        // state when a different NN has already claimed it with a higher
+        // txid.
+        LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
+            txid + " but there was already a more recent claim at txid=" +
+            lastActiveClaimTxId);
+        return;
+      } else {
+        if (bpServiceToActive == null) {
+          LOG.info("Acknowledging ACTIVE Namenode " + actor);
+        } else {
+          LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
+              bpServiceToActive + " at higher txid=" + txid);
+        }
+        bpServiceToActive = actor;
       }
+    } else if (!nnClaimsActive && bposThinksActive) {
+      LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
+          "txid=" + nnHaState.getTxId());
+      bpServiceToActive = null;
     }
     
-    LOG.info("Block pool " + this + " successfully registered with NN");
-    dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
-
-    // random short delay - helps scatter the BR from all DNs
-    scheduleBlockReport(dnConf.initialBlockReportDelay);
-  }
-
-
-  private void sleepAndLogInterrupts(int millis,
-      String stateString) {
-    try {
-      Thread.sleep(millis);
-    } catch (InterruptedException ie) {
-      LOG.info("BPOfferService " + this +
-          " interrupted while " + stateString);
+    if (bpServiceToActive == actor) {
+      assert txid >= lastActiveClaimTxId;
+      lastActiveClaimTxId = txid;
     }
   }
 
   /**
-   * No matter what kind of exception we get, keep retrying to offerService().
-   * That's the loop that connects to the NameNode and provides basic DataNode
-   * functionality.
-   *
-   * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
-   * happen either at shutdown or due to refreshNamenodes.
+   * @return true if the given NN address is one of the NNs for this
+   * block pool
    */
-  @Override
-  public void run() {
-    LOG.info(this + " starting to offer service");
-
-    try {
-      // init stuff
-      try {
-        // setup storage
-        connectToNNAndHandshake();
-      } catch (IOException ioe) {
-        // Initial handshake, storage recovery or registration failed
-        // End BPOfferService thread
-        LOG.fatal("Initialization failed for block pool " + this, ioe);
-        return;
+  boolean containsNN(InetSocketAddress addr) {
+    for (BPServiceActor actor : bpServices) {
+      if (actor.getNNSocketAddress().equals(addr)) {
+        return true;
       }
+    }
+    return false;
+  }
+  
+  @VisibleForTesting
+  int countNameNodes() {
+    return bpServices.size();
+  }
 
-      initialized = true; // bp is initialized;
-      
-      while (shouldRun()) {
-        try {
-          startDistributedUpgradeIfNeeded();
-          offerService();
-        } catch (Exception ex) {
-          LOG.error("Exception in BPOfferService for " + this, ex);
-          sleepAndLogInterrupts(5000, "offering service");
-        }
-      }
-    } catch (Throwable ex) {
-      LOG.warn("Unexpected exception in block pool " + this, ex);
-    } finally {
-      LOG.warn("Ending block pool service for: " + this);
-      cleanUp();
+  /**
+   * Run an immediate block report on this thread. Used by tests.
+   */
+  @VisibleForTesting
+  void triggerBlockReportForTests() throws IOException {
+    for (BPServiceActor actor : bpServices) {
+      actor.triggerBlockReportForTests();
     }
   }
 
-  private boolean shouldRun() {
-    return shouldServiceRun && dn.shouldRun();
+  /**
+   * Run an immediate deletion report on this thread. Used by tests.
+   */
+  @VisibleForTesting
+  void triggerDeletionReportForTests() throws IOException {
+    for (BPServiceActor actor : bpServices) {
+      actor.triggerDeletionReportForTests();
+    }
   }
 
   /**
-   * Process an array of datanode commands
-   * 
-   * @param cmds an array of datanode commands
-   * @return true if further processing may be required or false otherwise. 
+   * Run an immediate heartbeat from all actors. Used by tests.
    */
-  private boolean processCommand(DatanodeCommand[] cmds) {
-    if (cmds != null) {
-      for (DatanodeCommand cmd : cmds) {
-        try {
-          if (processCommand(cmd) == false) {
-            return false;
-          }
-        } catch (IOException ioe) {
-          LOG.warn("Error processing datanode Command", ioe);
-        }
-      }
+  @VisibleForTesting
+  void triggerHeartbeatForTests() throws IOException {
+    for (BPServiceActor actor : bpServices) {
+      actor.triggerHeartbeatForTests();
+    }
+  }
+
+  synchronized boolean processCommandFromActor(DatanodeCommand cmd,
+      BPServiceActor actor) throws IOException {
+    assert bpServices.contains(actor);
+    if (actor == bpServiceToActive) {
+      return processCommandFromActive(cmd, actor);
+    } else {
+      return processCommandFromStandby(cmd, actor);
     }
-    return true;
   }
 
   /**
@@ -676,7 +555,8 @@ class BPOfferService implements Runnable
    * @return true if further processing may be required or false otherwise. 
    * @throws IOException
    */
-  private boolean processCommand(DatanodeCommand cmd) throws IOException {
+  private boolean processCommandFromActive(DatanodeCommand cmd,
+      BPServiceActor actor) throws IOException {
     if (cmd == null)
       return true;
     final BlockCommand bcmd = 
@@ -707,19 +587,13 @@ class BPOfferService implements Runnable
       dn.metrics.incrBlocksRemoved(toDelete.length);
       break;
     case DatanodeProtocol.DNA_SHUTDOWN:
-      // shut down the data node
-      shouldServiceRun = false;
-      return false;
+      // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
+      // See HDFS-2987.
+      throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
     case DatanodeProtocol.DNA_REGISTER:
       // namenode requested a registration - at start or if NN lost contact
       LOG.info("DatanodeCommand action: DNA_REGISTER");
-      if (shouldRun()) {
-        // re-retrieve namespace info to make sure that, if the NN
-        // was restarted, we still match its version (HDFS-2120)
-        retrieveNamespaceInfo();
-        // and re-register
-        register();
-      }
+      actor.reRegister();
       break;
     case DatanodeProtocol.DNA_FINALIZE:
       String bp = ((FinalizeCommand) cmd).getBlockPoolId(); 
@@ -739,7 +613,8 @@ class BPOfferService implements Runnable
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
       if (dn.isBlockTokenEnabled) {
-        dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(), 
+        dn.blockPoolTokenSecretManager.setKeys(
+            getBlockPoolId(), 
             ((KeyUpdateCommand) cmd).getExportedKeys());
       }
       break;
@@ -758,41 +633,29 @@ class BPOfferService implements Runnable
     }
     return true;
   }
-  
-  private void processDistributedUpgradeCommand(UpgradeCommand comm)
-  throws IOException {
-    UpgradeManagerDatanode upgradeManager = getUpgradeManager();
-    upgradeManager.processUpgradeCommand(comm);
-  }
-
-  synchronized UpgradeManagerDatanode getUpgradeManager() {
-    if(upgradeManager == null)
-      upgradeManager = 
-        new UpgradeManagerDatanode(dn, getBlockPoolId());
-    
-    return upgradeManager;
-  }
-  
-  /**
-   * Start distributed upgrade if it should be initiated by the data-node.
-   */
-  private void startDistributedUpgradeIfNeeded() throws IOException {
-    UpgradeManagerDatanode um = getUpgradeManager();
-    
-    if(!um.getUpgradeState())
-      return;
-    um.setUpgradeState(false, um.getUpgradeVersion());
-    um.startUpgrade();
-    return;
-  }
-
-  @VisibleForTesting
-  DatanodeProtocolClientSideTranslatorPB getBpNamenode() {
-    return bpNamenode;
+ 
+  private boolean processCommandFromStandby(DatanodeCommand cmd,
+      BPServiceActor actor) throws IOException {
+    if (cmd == null)
+      return true;
+    switch(cmd.getAction()) {
+    case DatanodeProtocol.DNA_REGISTER:
+      // namenode requested a registration - at start or if NN lost contact
+      LOG.info("DatanodeCommand action: DNA_REGISTER");
+      actor.reRegister();
+      return true;
+    case DatanodeProtocol.DNA_TRANSFER:
+    case DatanodeProtocol.DNA_INVALIDATE:
+    case DatanodeProtocol.DNA_SHUTDOWN:
+    case DatanodeProtocol.DNA_RECOVERBLOCK:
+    case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+    case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+      LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
+      return true;   
+    default:
+      LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
+    }
+    return true;
   }
 
-  @VisibleForTesting
-  void setBpNamenode(DatanodeProtocolClientSideTranslatorPB bpNamenode) {
-    this.bpNamenode = bpNamenode;
-  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Sun Mar 11 17:55:58 2012
@@ -153,6 +153,7 @@ class BlockReceiver implements Closeable
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
           replicaInfo = datanode.data.createRbw(block);
+          datanode.notifyNamenodeReceivingBlock(block);
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
           replicaInfo = datanode.data.recoverRbw(
@@ -166,6 +167,7 @@ class BlockReceiver implements Closeable
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
+          datanode.notifyNamenodeReceivingBlock(block);
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
           replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
@@ -174,6 +176,7 @@ class BlockReceiver implements Closeable
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
+          datanode.notifyNamenodeReceivingBlock(block);
           break;
         case TRANSFER_RBW:
         case TRANSFER_FINALIZED:
@@ -320,7 +323,6 @@ class BlockReceiver implements Closeable
   private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
                              byte[] checksumBuf, int checksumOff ) 
                              throws IOException {
-    DatanodeProtocol nn = datanode.getBPNamenode(block.getBlockPoolId());
     while (len > 0) {
       int chunkLen = Math.min(len, bytesPerChecksum);
       
@@ -331,9 +333,7 @@ class BlockReceiver implements Closeable
           try {
             LOG.info("report corrupt block " + block + " from datanode " +
                       srcDataNode + " to namenode");
-            LocatedBlock lb = new LocatedBlock(block, 
-                                            new DatanodeInfo[] {srcDataNode});
-            nn.reportBadBlocks(new LocatedBlock[] {lb});
+            datanode.reportRemoteBadBlock(srcDataNode, block);
           } catch (IOException e) {
             LOG.warn("Failed to report bad block " + block + 
                       " from datanode " + srcDataNode + " to namenode");

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sun Mar 11 17:55:58 2012
@@ -45,7 +45,6 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
 
 import java.io.BufferedOutputStream;
@@ -86,6 +85,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -164,6 +164,8 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import com.google.protobuf.BlockingService;
 
 
@@ -231,143 +233,6 @@ public class DataNode extends Configured
     return NetUtils.createSocketAddr(target);
   }
   
-  /**
-   * Manages he BPOfferService objects for the data node.
-   * Creation, removal, starting, stopping, shutdown on BPOfferService
-   * objects must be done via APIs in this class.
-   */
-  @InterfaceAudience.Private
-  class BlockPoolManager {
-    private final Map<String, BPOfferService> bpMapping;
-    private final Map<InetSocketAddress, BPOfferService> nameNodeThreads;
- 
-    //This lock is used only to ensure exclusion of refreshNamenodes
-    private final Object refreshNamenodesLock = new Object();
-    
-    BlockPoolManager(Configuration conf)
-        throws IOException {
-      bpMapping = new HashMap<String, BPOfferService>();
-      nameNodeThreads = new HashMap<InetSocketAddress, BPOfferService>();
-  
-      List<InetSocketAddress> isas = DFSUtil.getNNServiceRpcAddresses(conf);
-      for(InetSocketAddress isa : isas) {
-        BPOfferService bpos = new BPOfferService(isa, DataNode.this);
-        nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
-      }
-    }
-    
-    synchronized void addBlockPool(BPOfferService t) {
-      if (nameNodeThreads.get(t.getNNSocketAddress()) == null) {
-        throw new IllegalArgumentException(
-            "Unknown BPOfferService thread for namenode address:"
-                + t.getNNSocketAddress());
-      }
-      if (t.getBlockPoolId() == null) {
-        throw new IllegalArgumentException("Null blockpool id");
-      }
-      bpMapping.put(t.getBlockPoolId(), t);
-    }
-    
-    /**
-     * Returns the array of BPOfferService objects. 
-     * Caution: The BPOfferService returned could be shutdown any time.
-     */
-    synchronized BPOfferService[] getAllNamenodeThreads() {
-      BPOfferService[] bposArray = new BPOfferService[nameNodeThreads.values()
-          .size()];
-      return nameNodeThreads.values().toArray(bposArray);
-    }
-    
-    synchronized BPOfferService get(InetSocketAddress addr) {
-      return nameNodeThreads.get(addr);
-    }
-    
-    synchronized BPOfferService get(String bpid) {
-      return bpMapping.get(bpid);
-    }
-    
-    synchronized void remove(BPOfferService t) {
-      nameNodeThreads.remove(t.getNNSocketAddress());
-      bpMapping.remove(t.getBlockPoolId());
-    }
-    
-    void shutDownAll() throws InterruptedException {
-      BPOfferService[] bposArray = this.getAllNamenodeThreads();
-      
-      for (BPOfferService bpos : bposArray) {
-        bpos.stop(); //interrupts the threads
-      }
-      //now join
-      for (BPOfferService bpos : bposArray) {
-        bpos.join();
-      }
-    }
-    
-    synchronized void startAll() throws IOException {
-      try {
-        UserGroupInformation.getLoginUser().doAs(
-            new PrivilegedExceptionAction<Object>() {
-              public Object run() throws Exception {
-                for (BPOfferService bpos : nameNodeThreads.values()) {
-                  bpos.start();
-                }
-                return null;
-              }
-            });
-      } catch (InterruptedException ex) {
-        IOException ioe = new IOException();
-        ioe.initCause(ex.getCause());
-        throw ioe;
-      }
-    }
-    
-    void joinAll() throws InterruptedException {
-      for (BPOfferService bpos: this.getAllNamenodeThreads()) {
-        bpos.join();
-      }
-    }
-    
-    void refreshNamenodes(Configuration conf)
-        throws IOException, InterruptedException {
-      LOG.info("Refresh request received for nameservices: "
-          + conf.get(DFS_FEDERATION_NAMESERVICES));
-      List<InetSocketAddress> newAddresses = 
-        DFSUtil.getNNServiceRpcAddresses(conf);
-      List<BPOfferService> toShutdown = new ArrayList<BPOfferService>();
-      List<InetSocketAddress> toStart = new ArrayList<InetSocketAddress>();
-      synchronized (refreshNamenodesLock) {
-        synchronized (this) {
-          for (InetSocketAddress nnaddr : nameNodeThreads.keySet()) {
-            if (!(newAddresses.contains(nnaddr))) {
-              toShutdown.add(nameNodeThreads.get(nnaddr));
-            }
-          }
-          for (InetSocketAddress nnaddr : newAddresses) {
-            if (!(nameNodeThreads.containsKey(nnaddr))) {
-              toStart.add(nnaddr);
-            }
-          }
-
-          for (InetSocketAddress nnaddr : toStart) {
-            BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this);
-            nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
-          }
-        }
-
-        for (BPOfferService bpos : toShutdown) {
-          bpos.stop();
-          bpos.join();
-        }
-        
-        // stoping the BPOSes causes them to call remove() on their own when they
-        // clean up.
-        
-        // Now start the threads that are not already running.
-        startAll();
-      }
-    }
-  }
-  
   volatile boolean shouldRun = true;
   private BlockPoolManager blockPoolManager;
   public volatile FSDatasetInterface<? extends FSVolumeInterface> data = null;
@@ -655,7 +520,18 @@ public class DataNode extends Configured
     if(bpos != null) {
       bpos.notifyNamenodeReceivedBlock(block, delHint); 
     } else {
-      LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+      LOG.error("Cannot find BPOfferService for reporting block received for bpid="
+          + block.getBlockPoolId());
+    }
+  }
+  
+  // calls specific to BP
+  protected void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+    BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
+    if(bpos != null) {
+      bpos.notifyNamenodeReceivingBlock(block); 
+    } else {
+      LOG.error("Cannot find BPOfferService for reporting block receiving for bpid="
           + block.getBlockPoolId());
     }
   }
@@ -666,18 +542,66 @@ public class DataNode extends Configured
     if (bpos != null) {
       bpos.notifyNamenodeDeletedBlock(block);
     } else {
-      LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid="
+      LOG.error("Cannot find BPOfferService for reporting block deleted for bpid="
           + block.getBlockPoolId());
     }
   }
   
+  /**
+   * Report a bad block which is hosted on the local DN.
+   */
   public void reportBadBlocks(ExtendedBlock block) throws IOException{
+    BPOfferService bpos = getBPOSForBlock(block);
+    bpos.reportBadBlocks(block);
+  }
+
+  /**
+   * Report a bad block on another DN (eg if we received a corrupt replica
+   * from a remote host).
+   * @param srcDataNode the DN hosting the bad block
+   * @param block the block itself
+   */
+  public void reportRemoteBadBlock(DatanodeInfo srcDataNode, ExtendedBlock block)
+      throws IOException {
+    BPOfferService bpos = getBPOSForBlock(block);
+    bpos.reportRemoteBadBlock(srcDataNode, block);
+  }
+  
+  /**
+   * Try to send an error report to the NNs associated with the given
+   * block pool.
+   * @param bpid the block pool ID
+   * @param errCode error code to send
+   * @param errMsg textual message to send
+   */
+  void trySendErrorReport(String bpid, int errCode, String errMsg) {
+    BPOfferService bpos = blockPoolManager.get(bpid);
+    if (bpos == null) {
+      throw new IllegalArgumentException("Bad block pool: " + bpid);
+    }
+    bpos.trySendErrorReport(errCode, errMsg);
+  }
+
+
+  
+  /**
+   * Return the BPOfferService instance corresponding to the given block.
+   * @param block
+   * @return the BPOS
+   * @throws IOException if no such BPOS can be found
+   */
+  private BPOfferService getBPOSForBlock(ExtendedBlock block)
+      throws IOException {
+    Preconditions.checkNotNull(block);
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
-    if(bpos == null || bpos.bpNamenode == null) {
-      throw new IOException("cannot locate OfferService thread for bp="+block.getBlockPoolId());
+    if (bpos == null) {
+      throw new IOException("cannot locate OfferService thread for bp="+
+          block.getBlockPoolId());
     }
-    bpos.reportBadBlocks(block);
+    return bpos;
   }
+
+
   
   // used only for testing
   void setHeartbeatsDisabledForTests(
@@ -730,7 +654,8 @@ public class DataNode extends Configured
 
     metrics = DataNodeMetrics.create(conf, getMachineName());
 
-    blockPoolManager = new BlockPoolManager(conf);
+    blockPoolManager = new BlockPoolManager(this);
+    blockPoolManager.refreshNamenodes(conf);
   }
   
   /**
@@ -963,11 +888,15 @@ public class DataNode extends Configured
   
   /**
    * get BP registration by machine and port name (host:port)
-   * @param mName
+   * @param mName - the name that the NN used
    * @return BP registration 
    * @throws IOException 
    */
   DatanodeRegistration getDNRegistrationByMachineName(String mName) {
+    // TODO: all the BPs should have the same name as each other, they all come
+    // from getName() here! and the use cases only are in tests where they just
+    // call with getName(). So we could probably just make this method return
+    // the first BPOS's registration. See HDFS-2609.
     BPOfferService [] bposArray = blockPoolManager.getAllNamenodeThreads();
     for (BPOfferService bpos : bposArray) {
       if(bpos.bpRegistration.getName().equals(mName))
@@ -1013,20 +942,6 @@ public class DataNode extends Configured
       throw new IOException(ie.getMessage());
     }
   }
-
-  /**
-   * get the name node address based on the block pool id
-   * @param bpid block pool ID
-   * @return namenode address corresponding to the bpid
-   */
-  public InetSocketAddress getNameNodeAddr(String bpid) {
-    BPOfferService bp = blockPoolManager.get(bpid);
-    if (bp != null) {
-      return bp.getNNSocketAddress();
-    }
-    LOG.warn("No name node address found for block pool ID " + bpid);
-    return null;
-  }
   
   public InetSocketAddress getSelfAddr() {
     return selfAddr;
@@ -1253,12 +1168,7 @@ public class DataNode extends Configured
 
     //inform NameNodes
     for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
-      DatanodeProtocolClientSideTranslatorPB nn = bpos.bpNamenode;
-      try {
-        nn.errorReport(bpos.bpRegistration, dpError, errMsgr);
-      } catch(IOException e) {
-        LOG.warn("Error reporting disk failure to NameNode", e);
-      }
+      bpos.trySendErrorReport(dpError, errMsgr);
     }
     
     if(hasEnoughResources) {
@@ -1275,6 +1185,10 @@ public class DataNode extends Configured
   public int getXceiverCount() {
     return threadGroup == null ? 0 : threadGroup.activeCount();
   }
+  
+  int getXmitsInProgress() {
+    return xmitsInProgress.get();
+  }
     
   UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
     BPOfferService bpos = blockPoolManager.get(bpid);
@@ -1287,15 +1201,15 @@ public class DataNode extends Configured
   private void transferBlock( ExtendedBlock block, 
                               DatanodeInfo xferTargets[] 
                               ) throws IOException {
-    DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
-        .getBlockPoolId());
+    BPOfferService bpos = getBPOSForBlock(block);
     DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
     
     if (!data.isValidBlock(block)) {
       // block does not exist or is under-construction
       String errStr = "Can't send invalid block " + block;
       LOG.info(errStr);
-      nn.errorReport(bpReg, DatanodeProtocol.INVALID_BLOCK, errStr);
+      
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errStr);
       return;
     }
 
@@ -1303,9 +1217,7 @@ public class DataNode extends Configured
     long onDiskLength = data.getLength(block);
     if (block.getNumBytes() > onDiskLength) {
       // Shorter on-disk len indicates corruption so report NN the corrupt block
-      nn.reportBadBlocks(new LocatedBlock[]{
-          new LocatedBlock(block, new DatanodeInfo[] {
-              new DatanodeInfo(bpReg)})});
+      bpos.reportBadBlocks(block);
       LOG.warn("Can't replicate block " + block
           + " because on-disk length " + onDiskLength 
           + " is shorter than NameNode recorded length " + block.getNumBytes());
@@ -1863,6 +1775,13 @@ public class DataNode extends Configured
                                           long newLength) throws IOException {
     ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock,
         recoveryId, newLength);
+    // Notify the namenode of the updated block info. This is important
+    // for HA, since otherwise the standby node may lose track of the
+    // block locations until the next block report.
+    ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
+    newBlock.setGenerationStamp(recoveryId);
+    newBlock.setNumBytes(newLength);
+    notifyNamenodeReceivedBlock(newBlock, "");
     return new ExtendedBlock(oldBlock.getBlockPoolId(), r);
   }
 
@@ -1937,23 +1856,32 @@ public class DataNode extends Configured
    * @return Namenode corresponding to the bpid
    * @throws IOException
    */
-  public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid)
+  public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid)
       throws IOException {
     BPOfferService bpos = blockPoolManager.get(bpid);
     if (bpos == null) {
       throw new IOException("No block pool offer service for bpid=" + bpid);
-    } else if (bpos.bpNamenode == null) {
-      throw new IOException("cannot find a namenode proxy for bpid=" + bpid);
     }
-    return bpos.bpNamenode;
+    
+    DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN();
+    if (activeNN == null) {
+      throw new IOException(
+          "Block pool " + bpid + " has not recognized an active NN");
+    }
+    return activeNN;
   }
 
   /** Block synchronization */
   void syncBlock(RecoveringBlock rBlock,
                          List<BlockRecord> syncList) throws IOException {
     ExtendedBlock block = rBlock.getBlock();
-    DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
-        .getBlockPoolId());
+    DatanodeProtocolClientSideTranslatorPB nn =
+      getActiveNamenodeForBP(block.getBlockPoolId());
+    if (nn == null) {
+      throw new IOException(
+          "Unable to synchronize block " + rBlock + ", since this DN "
+          + " has not acknowledged any NN as active.");
+    }
     
     long recoveryId = rBlock.getNewGenerationStamp();
     if (LOG.isDebugEnabled()) {
@@ -2174,14 +2102,19 @@ public class DataNode extends Configured
 
   /**
    * Returned information is a JSON representation of a map with 
-   * name node host name as the key and block pool Id as the value
+   * name node host name as the key and block pool Id as the value.
+   * Note that, if there are multiple NNs in an NA nameservice,
+   * a given block pool may be represented twice.
    */
   @Override // DataNodeMXBean
   public String getNamenodeAddresses() {
     final Map<String, String> info = new HashMap<String, String>();
     for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
-      if (bpos != null && bpos.bpThread != null) {
-        info.put(bpos.getNNSocketAddress().getHostName(), bpos.getBlockPoolId());
+      if (bpos != null) {
+        for (BPServiceActor actor : bpos.getBPServiceActors()) {
+          info.put(actor.getNNSocketAddress().getHostName(),
+              bpos.getBlockPoolId());
+        }
       }
     }
     return JSON.toString(info);
@@ -2203,13 +2136,7 @@ public class DataNode extends Configured
   }
   
   public void refreshNamenodes(Configuration conf) throws IOException {
-    try {
-      blockPoolManager.refreshNamenodes(conf);
-    } catch (InterruptedException ex) {
-      IOException eio = new IOException();
-      eio.initCause(ex);
-      throw eio;
-    }
+    blockPoolManager.refreshNamenodes(conf);
   }
 
   @Override //ClientDatanodeProtocol
@@ -2236,11 +2163,18 @@ public class DataNode extends Configured
 
   /**
    * @param addr rpc address of the namenode
-   * @return true - if BPOfferService corresponding to the namenode is alive
+   * @return true if the datanode is connected to a NameNode at the
+   * given address
    */
-  public boolean isBPServiceAlive(InetSocketAddress addr) {
-    BPOfferService bp = blockPoolManager.get(addr);
-    return bp != null ? bp.isAlive() : false;
+  public boolean isConnectedToNN(InetSocketAddress addr) {
+    for (BPOfferService bpos : getAllBpOs()) {
+      for (BPServiceActor bpsa : bpos.getBPServiceActors()) {
+        if (addr.equals(bpsa.getNNSocketAddress())) {
+          return bpsa.isAlive();
+        }
+      }
+    }
+    return false;
   }
   
   /**

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Sun Mar 11 17:55:58 2012
@@ -107,6 +107,14 @@ class FSDatasetAsyncDiskService {
     
   }
   
+  synchronized long countPendingDeletions() {
+    long count = 0;
+    for (ThreadPoolExecutor exec : executors.values()) {
+      count += exec.getTaskCount() - exec.getCompletedTaskCount();
+    }
+    return count;
+  }
+  
   /**
    * Execute the task sometime in the future, using ThreadPools.
    */

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java Sun Mar 11 17:55:58 2012
@@ -92,7 +92,7 @@ class UpgradeManagerDatanode extends Upg
           "UpgradeManagerDatanode.currentUpgrades is not null.";
         assert upgradeDaemon == null : 
           "UpgradeManagerDatanode.upgradeDaemon is not null.";
-        DatanodeProtocol nn = dataNode.getBPNamenode(bpid);
+        DatanodeProtocol nn = dataNode.getActiveNamenodeForBP(bpid);
         nn.processUpgradeCommand(broadcastCommand);
         return true;
       }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java Sun Mar 11 17:55:58 2012
@@ -45,7 +45,7 @@ public abstract class UpgradeObjectDatan
   }
   
   protected DatanodeProtocol getNamenode() throws IOException {
-    return dataNode.getBPNamenode(bpid);
+    return dataNode.getActiveNamenodeForBP(bpid);
   }
 
   void setDatanode(DataNode dataNode, String bpid) {
@@ -92,14 +92,7 @@ public abstract class UpgradeObjectDatan
             + " Name-node version = " + nsInfo.getLayoutVersion() + ".";
     DataNode.LOG.fatal( errorMsg );
     String bpid = nsInfo.getBlockPoolID();
-    DatanodeProtocol nn = dataNode.getBPNamenode(bpid);
-    try {
-      nn.errorReport(dataNode.getDNRegistrationForBP(bpid),
-                                    DatanodeProtocol.NOTIFY, errorMsg);
-    } catch(SocketTimeoutException e) {  // namenode is busy
-      DataNode.LOG.info("Problem connecting to server: " 
-                        + dataNode.getNameNodeAddr(nsInfo.getBlockPoolID()));
-    }
+    dataNode.trySendErrorReport(bpid, DatanodeProtocol.NOTIFY, errorMsg);
     throw new IOException(errorMsg);
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Sun Mar 11 17:55:58 2012
@@ -219,7 +219,7 @@ public class BackupImage extends FSImage
       int logVersion = storage.getLayoutVersion();
       backupInputStream.setBytes(data, logVersion);
 
-      int numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream, 
+      long numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream, 
                                                 true, lastAppliedTxId + 1);
       if (numLoaded != numTxns) {
         throw new IOException("Batch of txns starting at txnid " +
@@ -312,7 +312,7 @@ public class BackupImage extends FSImage
             + " txns from in-progress stream " + stream);
         
         FSEditLogLoader loader = new FSEditLogLoader(namesystem);
-        int numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1);
+        long numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1);
         lastAppliedTxId += numLoaded;
         assert numLoaded == remainingTxns :
           "expected to load " + remainingTxns + " but loaded " +
@@ -347,7 +347,7 @@ public class BackupImage extends FSImage
   synchronized void namenodeStartedLogSegment(long txid)
       throws IOException {
     LOG.info("NameNode started a new log segment at txid " + txid);
-    if (editLog.isOpen()) {
+    if (editLog.isSegmentOpen()) {
       if (editLog.getLastWrittenTxId() == txid - 1) {
         // We are in sync with the NN, so end and finalize the current segment
         editLog.endCurrentLogSegment(false);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Sun Mar 11 17:55:58 2012
@@ -58,7 +58,7 @@ class BackupJournalManager implements Jo
   }
 
   @Override
-  public long getNumberOfTransactions(long fromTxnId) 
+  public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
       throws IOException, CorruptionException {
     // This JournalManager is never used for input. Therefore it cannot
     // return any transactions
@@ -66,7 +66,8 @@ class BackupJournalManager implements Jo
   }
   
   @Override
-  public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
+  public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
+      throws IOException {
     // This JournalManager is never used for input. Therefore it cannot
     // return any transactions
     throw new IOException("Unsupported operation");

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Sun Mar 11 17:55:58 2012
@@ -26,13 +26,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -41,7 +41,8 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -69,7 +70,7 @@ public class BackupNode extends NameNode
   private static final String BN_SERVICE_RPC_ADDRESS_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY;
 
   /** Name-node proxy */
-  NamenodeProtocolTranslatorPB namenode;
+  NamenodeProtocol namenode;
   /** Name-node RPC address */
   String nnRpcAddress;
   /** Name-node HTTP address */
@@ -89,13 +90,13 @@ public class BackupNode extends NameNode
   // Common NameNode methods implementation for backup node.
   /////////////////////////////////////////////////////
   @Override // NameNode
-  protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOException {
+  protected InetSocketAddress getRpcServerAddress(Configuration conf) {
     String addr = conf.get(BN_ADDRESS_NAME_KEY, BN_ADDRESS_DEFAULT);
     return NetUtils.createSocketAddr(addr);
   }
   
   @Override
-  protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) throws IOException {
+  protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) {
     String addr = conf.get(BN_SERVICE_RPC_ADDRESS_KEY);
     if (addr == null || addr.isEmpty()) {
       return null;
@@ -143,6 +144,7 @@ public class BackupNode extends NameNode
                  CommonConfigurationKeys.FS_TRASH_INTERVAL_DEFAULT);
     NamespaceInfo nsInfo = handshake(conf);
     super.initialize(conf);
+
     if (false == namesystem.isInSafeMode()) {
       namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
     }
@@ -189,7 +191,7 @@ public class BackupNode extends NameNode
     }
     // Stop the RPC client
     if (namenode != null) {
-      IOUtils.cleanup(LOG, namenode);
+      RPC.stopProxy(namenode);
     }
     namenode = null;
     // Stop the checkpoint manager
@@ -197,6 +199,11 @@ public class BackupNode extends NameNode
       checkpointManager.interrupt();
       checkpointManager = null;
     }
+
+    // Abort current log segment - otherwise the NN shutdown code
+    // will close it gracefully, which is incorrect.
+    getFSImage().getEditLog().abortCurrentLogSegment();
+
     // Stop name-node threads
     super.stop();
   }
@@ -221,58 +228,31 @@ public class BackupNode extends NameNode
           this.clientRpcServer);
       nnRpcAddress = nn.nnRpcAddress;
     }
-  
-    /////////////////////////////////////////////////////
-    // NamenodeProtocol implementation for backup node.
-    /////////////////////////////////////////////////////
-    @Override // NamenodeProtocol
-    public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
-    throws IOException {
-      throw new UnsupportedActionException("getBlocks");
-    }
-  
-    // Only active name-node can register other nodes.
-    @Override // NamenodeProtocol
-    public NamenodeRegistration register(NamenodeRegistration registration
-    ) throws IOException {
-      throw new UnsupportedActionException("register");
-    }
-  
-    @Override // NamenodeProtocol
-    public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
-    throws IOException {
-      throw new UnsupportedActionException("startCheckpoint");
-    }
-  
-    @Override // NamenodeProtocol
-    public void endCheckpoint(NamenodeRegistration registration,
-                              CheckpointSignature sig) throws IOException {
-      throw new UnsupportedActionException("endCheckpoint");
-    }  
-  
+
     /////////////////////////////////////////////////////
     // BackupNodeProtocol implementation for backup node.
     /////////////////////////////////////////////////////
-  
+    @Override
+    public void startLogSegment(NamenodeRegistration registration, long txid)
+        throws IOException {
+      namesystem.checkOperation(OperationCategory.JOURNAL);
+      verifyRequest(registration);
+      
+      getBNImage().namenodeStartedLogSegment(txid);
+    }
+    
     @Override
     public void journal(NamenodeRegistration nnReg,
         long firstTxId, int numTxns,
         byte[] records) throws IOException {
+      namesystem.checkOperation(OperationCategory.JOURNAL);
       verifyRequest(nnReg);
       if(!nnRpcAddress.equals(nnReg.getAddress()))
         throw new IOException("Journal request from unexpected name-node: "
-            + nnReg.getAddress() + " expecting " + clientRpcAddress);
+            + nnReg.getAddress() + " expecting " + nnRpcAddress);
       getBNImage().journal(firstTxId, numTxns, records);
     }
-  
-    @Override
-    public void startLogSegment(NamenodeRegistration registration, long txid)
-        throws IOException {
-      verifyRequest(registration);
-    
-      getBNImage().namenodeStartedLogSegment(txid);
-    }
-    
+
     private BackupImage getBNImage() {
       return (BackupImage)nn.getFSImage();
     }
@@ -295,8 +275,9 @@ public class BackupNode extends NameNode
   private NamespaceInfo handshake(Configuration conf) throws IOException {
     // connect to name node
     InetSocketAddress nnAddress = NameNode.getServiceAddress(conf, true);
-    this.namenode = new NamenodeProtocolTranslatorPB(nnAddress, conf,
-        UserGroupInformation.getCurrentUser());
+    this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddress,
+        NamenodeProtocol.class, UserGroupInformation.getCurrentUser(),
+        true).getProxy();
     this.nnRpcAddress = NetUtils.getHostPortString(nnAddress);
     this.nnHttpAddress = NetUtils.getHostPortString(super.getHttpServerAddress(conf));
     // get version and id info from the name-node
@@ -409,6 +390,28 @@ public class BackupNode extends NameNode
   }
   
   @Override
+  protected NameNodeHAContext createHAContext() {
+    return new BNHAContext();
+  }
+  
+  private class BNHAContext extends NameNodeHAContext {
+    @Override // NameNode
+    public void checkOperation(OperationCategory op)
+        throws StandbyException {
+      if (op == OperationCategory.UNCHECKED ||
+          op == OperationCategory.CHECKPOINT) {
+        return;
+      }
+      if (OperationCategory.JOURNAL != op &&
+          !(OperationCategory.READ == op && allowStaleStandbyReads)) {
+        String msg = "Operation category " + op
+            + " is not supported at the BackupNode";
+        throw new StandbyException(msg);
+      }
+    }
+  }
+  
+  @Override
   protected String getNameServiceId(Configuration conf) {
     return DFSUtil.getBackupNameServiceId(conf);
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Sun Mar 11 17:55:58 2012
@@ -29,7 +29,6 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -58,17 +57,16 @@ class Checkpointer extends Daemon {
 
   private BackupNode backupNode;
   volatile boolean shouldRun;
-  private long checkpointPeriod;    // in seconds
-  // Transactions count to trigger the checkpoint
-  private long checkpointTxnCount; 
 
   private String infoBindAddress;
 
+  private CheckpointConf checkpointConf;
+
   private BackupImage getFSImage() {
     return (BackupImage)backupNode.getFSImage();
   }
 
-  private NamenodeProtocol getNamenode(){
+  private NamenodeProtocol getRemoteNamenodeProxy(){
     return backupNode.namenode;
   }
 
@@ -89,26 +87,24 @@ class Checkpointer extends Daemon {
   /**
    * Initialize checkpoint.
    */
-  @SuppressWarnings("deprecation")
   private void initialize(Configuration conf) throws IOException {
     // Create connection to the namenode.
     shouldRun = true;
 
     // Initialize other scheduling parameters from the configuration
-    checkpointPeriod = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 
-                                    DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
-    checkpointTxnCount = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 
-                                  DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
-    SecondaryNameNode.warnForDeprecatedConfigs(conf);
+    checkpointConf = new CheckpointConf(conf);
 
     // Pull out exact http address for posting url to avoid ip aliasing issues
     String fullInfoAddr = conf.get(DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, 
                                    DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT);
     infoBindAddress = fullInfoAddr.substring(0, fullInfoAddr.indexOf(":"));
 
-    LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " +
-             "(" + checkpointPeriod/60 + " min)");
-    LOG.info("Transactions count is  : " + checkpointTxnCount + ", to trigger checkpoint");
+    LOG.info("Checkpoint Period : " +
+             checkpointConf.getPeriod() + " secs " +
+             "(" + checkpointConf.getPeriod()/60 + " min)");
+    LOG.info("Transactions count is  : " +
+             checkpointConf.getTxnCount() +
+             ", to trigger checkpoint");
   }
 
   /**
@@ -125,8 +121,8 @@ class Checkpointer extends Daemon {
   public void run() {
     // Check the size of the edit log once every 5 minutes.
     long periodMSec = 5 * 60;   // 5 minutes
-    if(checkpointPeriod < periodMSec) {
-      periodMSec = checkpointPeriod;
+    if(checkpointConf.getPeriod() < periodMSec) {
+      periodMSec = checkpointConf.getPeriod();
     }
     periodMSec *= 1000;
 
@@ -142,7 +138,7 @@ class Checkpointer extends Daemon {
           shouldCheckpoint = true;
         } else {
           long txns = countUncheckpointedTxns();
-          if(txns >= checkpointTxnCount)
+          if(txns >= checkpointConf.getTxnCount())
             shouldCheckpoint = true;
         }
         if(shouldCheckpoint) {
@@ -165,7 +161,7 @@ class Checkpointer extends Daemon {
   }
 
   private long countUncheckpointedTxns() throws IOException {
-    long curTxId = getNamenode().getTransactionID();
+    long curTxId = getRemoteNamenodeProxy().getTransactionID();
     long uncheckpointedTxns = curTxId -
       getFSImage().getStorage().getMostRecentCheckpointTxId();
     assert uncheckpointedTxns >= 0;
@@ -183,7 +179,7 @@ class Checkpointer extends Daemon {
     bnImage.freezeNamespaceAtNextRoll();
     
     NamenodeCommand cmd = 
-      getNamenode().startCheckpoint(backupNode.getRegistration());
+      getRemoteNamenodeProxy().startCheckpoint(backupNode.getRegistration());
     CheckpointCommand cpCmd = null;
     switch(cmd.getAction()) {
       case NamenodeProtocol.ACT_SHUTDOWN:
@@ -207,7 +203,7 @@ class Checkpointer extends Daemon {
     long lastApplied = bnImage.getLastAppliedTxId();
     LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
     RemoteEditLogManifest manifest =
-      getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId() + 1);
+      getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + 1);
 
     if (!manifest.getLogs().isEmpty()) {
       RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
@@ -243,11 +239,16 @@ class Checkpointer extends Daemon {
     
     long txid = bnImage.getLastAppliedTxId();
     
-    backupNode.namesystem.dir.setReady();
-    backupNode.namesystem.setBlockTotal();
-    
-    bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
-    bnStorage.writeAll();
+    backupNode.namesystem.writeLock();
+    try {
+      backupNode.namesystem.dir.setReady();
+      backupNode.namesystem.setBlockTotal();
+      
+      bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
+      bnStorage.writeAll();
+    } finally {
+      backupNode.namesystem.writeUnlock();
+    }
 
     if(cpCmd.needToReturnImage()) {
       TransferFsImage.uploadImageFromStorage(
@@ -255,7 +256,7 @@ class Checkpointer extends Daemon {
           bnStorage, txid);
     }
 
-    getNamenode().endCheckpoint(backupNode.getRegistration(), sig);
+    getRemoteNamenodeProxy().endCheckpoint(backupNode.getRegistration(), sig);
 
     if (backupNode.getRole() == NamenodeRole.BACKUP) {
       bnImage.convergeJournalSpool();
@@ -286,7 +287,7 @@ class Checkpointer extends Daemon {
           log.getStartTxId(), log.getEndTxId());
       if (log.getStartTxId() > dstImage.getLastAppliedTxId()) {
         editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(), 
-                                                    log.getEndTxId()));
+                                                    log.getEndTxId(), true));
        }
     }
     LOG.info("Checkpointer about to load edits from " +

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java Sun Mar 11 17:55:58 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.util.StringUtils;
 import org.codehaus.jackson.JsonNode;
@@ -66,9 +67,10 @@ class ClusterJspHelper {
   ClusterStatus generateClusterHealthReport() {
     ClusterStatus cs = new ClusterStatus();
     Configuration conf = new Configuration();
-    List<InetSocketAddress> isas = null;
+    List<ConfiguredNNAddress> nns = null;
     try {
-      isas = DFSUtil.getNNServiceRpcAddresses(conf);
+      nns = DFSUtil.flattenAddressMap(
+          DFSUtil.getNNServiceRpcAddresses(conf));
     } catch (Exception e) {
       // Could not build cluster status
       cs.setError(e);
@@ -76,7 +78,8 @@ class ClusterJspHelper {
     }
     
     // Process each namenode and add it to ClusterStatus
-    for (InetSocketAddress isa : isas) {
+    for (ConfiguredNNAddress cnn : nns) {
+      InetSocketAddress isa = cnn.getAddress();
       NamenodeMXBeanHelper nnHelper = null;
       try {
         nnHelper = new NamenodeMXBeanHelper(isa, conf);
@@ -102,9 +105,10 @@ class ClusterJspHelper {
   DecommissionStatus generateDecommissioningReport() {
     String clusterid = "";
     Configuration conf = new Configuration();
-    List<InetSocketAddress> isas = null;
+    List<ConfiguredNNAddress> cnns = null;
     try {
-      isas = DFSUtil.getNNServiceRpcAddresses(conf);
+      cnns = DFSUtil.flattenAddressMap(
+          DFSUtil.getNNServiceRpcAddresses(conf));
     } catch (Exception e) {
       // catch any exception encountered other than connecting to namenodes
       DecommissionStatus dInfo = new DecommissionStatus(clusterid, e);
@@ -122,7 +126,8 @@ class ClusterJspHelper {
       new HashMap<String, Exception>();
     
     List<String> unreportedNamenode = new ArrayList<String>();
-    for (InetSocketAddress isa : isas) {
+    for (ConfiguredNNAddress cnn : cnns) {
+      InetSocketAddress isa = cnn.getAddress();
       NamenodeMXBeanHelper nnHelper = null;
       try {
         nnHelper = new NamenodeMXBeanHelper(isa, conf);



Mime
View raw message