hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1211735 - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/prot...
Date Thu, 08 Dec 2011 02:00:21 GMT
Author: todd
Date: Thu Dec  8 02:00:20 2011
New Revision: 1211735

URL: http://svn.apache.org/viewvc?rev=1211735&view=rev
Log:
HDFS-2627. Determine DN's view of which NN is active based on heartbeat responses. Contributed
by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java
Modified:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
Thu Dec  8 02:00:20 2011
@@ -45,3 +45,5 @@ HDFS-2626. BPOfferService.verifyAndSetNa
 HDFS-2624. ConfiguredFailoverProxyProvider doesn't correctly stop ProtocolTranslators (todd)
 
 HDFS-2625. TestDfsOverAvroRpc failing after introduction of HeartbeatResponse type (todd)
+
+HDFS-2627. Determine DN's view of which NN is active based on heartbeat responses (todd)

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
Thu Dec  8 02:00:20 2011
@@ -37,14 +37,15 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.ipc.RPC;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
@@ -75,10 +76,31 @@ class BPOfferService {
   UpgradeManagerDatanode upgradeManager = null;
   private final DataNode dn;
 
-  private BPServiceActor bpServiceToActive;
+  /**
+   * A reference to the BPServiceActor associated with the currently
+   * ACTIVE NN. In the case that all NameNodes are in STANDBY mode,
+   * this can be null. If non-null, this must always refer to a member
+   * of the {@link #bpServices} list.
+   */
+  private BPServiceActor bpServiceToActive = null;
+  
+  /**
+   * The list of all actors for namenodes in this nameservice, regardless
+   * of their active or standby states.
+   */
   private List<BPServiceActor> bpServices =
     new CopyOnWriteArrayList<BPServiceActor>();
 
+  /**
+   * Each time we receive a heartbeat from a NN claiming to be ACTIVE,
+   * we record that NN's most recent transaction ID here, so long as it
+   * is more recent than the previous value. This allows us to detect
+   * split-brain scenarios in which a prior NN is still asserting its
+   * ACTIVE state but with a too-low transaction ID. See HDFS-2627
+   * for details. 
+   */
+  private long lastActiveClaimTxId = -1;
+
   BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
     Preconditions.checkArgument(!nnAddrs.isEmpty(),
         "Must pass at least one NN.");
@@ -87,10 +109,6 @@ class BPOfferService {
     for (InetSocketAddress addr : nnAddrs) {
       this.bpServices.add(new BPServiceActor(addr, this));
     }
-    // TODO(HA): currently we just make the first one the initial
-    // active. In reality it should start in an unknown state and then
-    // as we figure out which is active, designate one as such.
-    this.bpServiceToActive = this.bpServices.get(0);
   }
 
   void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
@@ -109,19 +127,23 @@ class BPOfferService {
   }
 
   /**
-   * returns true if BP thread has completed initialization of storage
-   * and has registered with the corresponding namenode
-   * @return true if initialized
+   * @return true if the service has registered with at least one NameNode.
    */
   boolean isInitialized() {
-    // TODO(HA) is this right?
-    return bpServiceToActive != null && bpServiceToActive.isInitialized();
+    return bpRegistration != null;
   }
   
+  /**
+   * @return true if there is at least one actor thread running which is
+   * talking to a NameNode.
+   */
   boolean isAlive() {
-    // TODO: should || all the bp actors probably?
-    return bpServiceToActive != null &&
-      bpServiceToActive.isAlive();
+    for (BPServiceActor actor : bpServices) {
+      if (actor.isAlive()) {
+        return true;
+      }
+    }
+    return false;
   }
   
   String getBlockPoolId() {
@@ -322,7 +344,7 @@ class BPOfferService {
    * Called when an actor shuts down. If this is the last actor
    * to shut down, shuts down the whole blockpool in the DN.
    */
-  void shutdownActor(BPServiceActor actor) {
+  synchronized void shutdownActor(BPServiceActor actor) {
     if (bpServiceToActive == actor) {
       bpServiceToActive = null;
     }
@@ -339,7 +361,7 @@ class BPOfferService {
   }
 
   @Deprecated
-  InetSocketAddress getNNSocketAddress() {
+  synchronized InetSocketAddress getNNSocketAddress() {
     // TODO(HA) this doesn't make sense anymore
     return bpServiceToActive.getNNSocketAddress();
   }
@@ -383,8 +405,61 @@ class BPOfferService {
    * @return a proxy to the active NN
    */
   @Deprecated
-  DatanodeProtocol getActiveNN() {
-    return bpServiceToActive.bpNamenode;
+  synchronized DatanodeProtocol getActiveNN() {
+    if (bpServiceToActive != null) {
+      return bpServiceToActive.bpNamenode;
+    } else {
+      return null;
+    }
+  }
+  
+  /**
+   * Update the BPOS's view of which NN is active, based on a heartbeat
+   * response from one of the actors.
+   * 
+   * @param actor the actor which received the heartbeat
+   * @param nnHaState the HA-related heartbeat contents
+   */
+  synchronized void updateActorStatesFromHeartbeat(
+      BPServiceActor actor,
+      NNHAStatusHeartbeat nnHaState) {
+    final long txid = nnHaState.getTxId();
+    
+    final boolean nnClaimsActive =
+      nnHaState.getState() == NNHAStatusHeartbeat.State.ACTIVE;
+    final boolean bposThinksActive = bpServiceToActive == actor;
+    final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; 
+    
+    if (nnClaimsActive && !bposThinksActive) {
+      LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
+          "txid=" + txid);
+      if (!isMoreRecentClaim) {
+        // Split-brain scenario - an NN is trying to claim active
+        // state when a different NN has already claimed it with a higher
+        // txid.
+        LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
+            txid + " but there was already a more recent claim at txid=" +
+            lastActiveClaimTxId);
+        return;
+      } else {
+        if (bpServiceToActive == null) {
+          LOG.info("Acknowledging ACTIVE Namenode " + actor);
+        } else {
+          LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
+              bpServiceToActive + " at higher txid=" + txid);
+        }
+        bpServiceToActive = actor;
+      }
+    } else if (!nnClaimsActive && bposThinksActive) {
+      LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
+          "txid=" + nnHaState.getTxId());
+      bpServiceToActive = null;
+    }
+    
+    if (bpServiceToActive == actor) {
+      assert txid >= lastActiveClaimTxId;
+      lastActiveClaimTxId = txid;
+    }
   }
 
   /**
@@ -415,7 +490,17 @@ class BPOfferService {
     }
   }
 
-  boolean processCommandFromActor(DatanodeCommand cmd,
+  /**
+   * Run an immediate heartbeat from all actors. Used by tests.
+   */
+  @VisibleForTesting
+  void triggerHeartbeatForTests() throws IOException {
+    for (BPServiceActor actor : bpServices) {
+      actor.triggerHeartbeatForTests();
+    }
+  }
+
+  synchronized boolean processCommandFromActor(DatanodeCommand cmd,
       BPServiceActor actor) throws IOException {
     assert bpServices.contains(actor);
     if (actor == bpServiceToActive) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
Thu Dec  8 02:00:20 2011
@@ -284,6 +284,14 @@ class BPServiceActor implements Runnable
       lastBlockReport = 0;
       blockReport();
   }
+  
+  @VisibleForTesting
+  void triggerHeartbeatForTests() throws IOException {
+    synchronized (receivedAndDeletedBlockList) {
+      lastHeartbeat = 0;
+      receivedAndDeletedBlockList.notifyAll();
+    }
+  }
 
   /**
    * Report the list blocks to the Namenode
@@ -420,8 +428,18 @@ class BPServiceActor implements Runnable
           lastHeartbeat = startTime;
           if (!dn.areHeartbeatsDisabledForTests()) {
             HeartbeatResponse resp = sendHeartBeat();
+            assert resp != null;
             dn.getMetrics().addHeartbeat(now() - startTime);
 
+            // If the state of this NN has changed (eg STANDBY->ACTIVE)
+            // then let the BPOfferService update itself.
+            //
+            // Important that this happens before processCommand below,
+            // since the first heartbeat to a new active might have commands
+            // that we should actually process.
+            bpos.updateActorStatesFromHeartbeat(
+                this, resp.getNameNodeHaState());
+
             long startProcessCommands = now();
             if (!processCommand(resp.getCommands()))
               continue;

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Thu Dec  8 02:00:20 2011
@@ -150,11 +150,16 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage;
 import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage;
 import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.DataNodeMessage;
+import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
+import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -308,6 +313,12 @@ public class FSNamesystem implements Nam
    * Used when this NN is in standby state to read from the shared edit log.
    */
   private EditLogTailer editLogTailer = null;
+
+  /**
+   * Reference to the NN's HAContext object. This is only set once
+   * {@link #startCommonServices(Configuration, HAContext)} is called. 
+   */
+  private HAContext haContext;
   
   PendingDataNodeMessages getPendingDataNodeMessages() {
     return pendingDatanodeMessages;
@@ -434,11 +445,13 @@ public class FSNamesystem implements Nam
   
   /** 
    * Start services common to both active and standby states
+   * @param haContext 
    * @throws IOException
    */
-  void startCommonServices(Configuration conf) throws IOException {
+  void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
     this.registerMBean(); // register the MBean for the FSNamesystemState
     writeLock();
+    this.haContext = haContext;
     try {
       nnResourceChecker = new NameNodeResourceChecker(conf);
       checkAvailableResources();
@@ -2706,12 +2719,28 @@ public class FSNamesystem implements Nam
           cmds = new DatanodeCommand[] {cmd};
         }
       }
-      return new HeartbeatResponse(cmds);
+      
+      return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
     } finally {
       readUnlock();
     }
   }
 
+  private NNHAStatusHeartbeat createHaStatusHeartbeat() {
+    HAState state = haContext.getState();
+    NNHAStatusHeartbeat.State hbState;
+    if (state instanceof ActiveState) {
+      hbState = NNHAStatusHeartbeat.State.ACTIVE;
+    } else if (state instanceof StandbyState) {
+      hbState = NNHAStatusHeartbeat.State.STANDBY;      
+    } else {
+      throw new AssertionError("Invalid state: " + state.getClass());
+    }
+    return new NNHAStatusHeartbeat(hbState,
+        Math.max(getFSImage().getLastAppliedTxId(),
+                 getFSImage().getEditLog().getLastWrittenTxId()));
+  }
+
   /**
    * Returns whether or not there were available resources at the last check of
    * resources.

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
Thu Dec  8 02:00:20 2011
@@ -426,7 +426,7 @@ public class NameNode {
 
   /** Start the services common to active and standby states */
   private void startCommonServices(Configuration conf) throws IOException {
-    namesystem.startCommonServices(conf);
+    namesystem.startCommonServices(conf, haContext);
     startHttpServer(conf);
     rpcServer.start();
     plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
Thu Dec  8 02:00:20 2011
@@ -35,17 +35,26 @@ public class HeartbeatResponse implement
   /** Commands returned from the namenode to the datanode */
   private DatanodeCommand[] commands;
   
+  /** Information about the current HA-related state of the NN */
+  private NNHAStatusHeartbeat haStatus;
+  
   public HeartbeatResponse() {
     // Empty constructor required for Writable
   }
   
-  public HeartbeatResponse(DatanodeCommand[] cmds) {
+  public HeartbeatResponse(DatanodeCommand[] cmds,
+      NNHAStatusHeartbeat haStatus) {
     commands = cmds;
+    this.haStatus = haStatus;
   }
   
   public DatanodeCommand[] getCommands() {
     return commands;
   }
+  
+  public NNHAStatusHeartbeat getNameNodeHaState() {
+    return haStatus;
+  }
 
   ///////////////////////////////////////////
   // Writable
@@ -58,6 +67,7 @@ public class HeartbeatResponse implement
       ObjectWritable.writeObject(out, commands[i], commands[i].getClass(),
                                  null, true);
     }
+    haStatus.write(out);
   }
 
   @Override
@@ -69,5 +79,7 @@ public class HeartbeatResponse implement
       commands[i] = (DatanodeCommand) ObjectWritable.readObject(in,
           objectWritable, null);
     }
+    haStatus = new NNHAStatusHeartbeat();
+    haStatus.readFields(in);
   }
 }

Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java?rev=1211735&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java
(added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java
Thu Dec  8 02:00:20 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class NNHAStatusHeartbeat implements Writable {
+
+  private State state;
+  private long txid = HdfsConstants.INVALID_TXID;
+  
+  public NNHAStatusHeartbeat() {
+  }
+  
+  public NNHAStatusHeartbeat(State state, long txid) {
+    this.state = state;
+    this.txid = txid;
+  }
+
+  public State getState() {
+    return state;
+  }
+  
+  public long getTxId() {
+    return txid;
+  }
+  
+  ///////////////////////////////////////////
+  // Writable
+  ///////////////////////////////////////////
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeEnum(out, state);
+    out.writeLong(txid);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    state = WritableUtils.readEnum(in, State.class);
+    txid = in.readLong();
+  }
+
+  @InterfaceAudience.Private
+  public enum State {
+    ACTIVE,
+    STANDBY;
+  }
+}

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java
Thu Dec  8 02:00:20 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.Writable;
 @InterfaceStability.Evolving
 public class HeartbeatResponseWritable implements Writable {
   private DatanodeCommandWritable[] commands;
+  private NNHAStatusHeartbeatWritable haStatus;
   
   public HeartbeatResponseWritable() {
     // Empty constructor for Writable
@@ -41,7 +42,8 @@ public class HeartbeatResponseWritable i
   }
   
   public HeartbeatResponse convert() {
-    return new HeartbeatResponse(DatanodeCommandWritable.convert(commands));
+    return new HeartbeatResponse(DatanodeCommandWritable.convert(commands),
+        NNHAStatusHeartbeatWritable.convert(haStatus));
   }
   
   ///////////////////////////////////////////
@@ -55,6 +57,7 @@ public class HeartbeatResponseWritable i
       ObjectWritable.writeObject(out, commands[i], commands[i].getClass(),
                                  null, true);
     }
+    haStatus.write(out);
   }
 
   @Override
@@ -66,6 +69,8 @@ public class HeartbeatResponseWritable i
       commands[i] = (DatanodeCommandWritable) ObjectWritable.readObject(in,
           objectWritable, null);
     }
+    haStatus = new NNHAStatusHeartbeatWritable();
+    haStatus.readFields(in);
   }
 
   public static HeartbeatResponseWritable convert(
@@ -73,4 +78,4 @@ public class HeartbeatResponseWritable i
     return new HeartbeatResponseWritable(DatanodeCommandWritable.convert(resp
         .getCommands()));
   }
-}
\ No newline at end of file
+}

Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java?rev=1211735&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java
(added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java
Thu Dec  8 02:00:20 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocolR23Compatible;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+/**
+ * Response to {@link DatanodeProtocol#sendHeartbeat}
+ */
+public class NNHAStatusHeartbeatWritable implements Writable {
+
+  private State state;
+  private long txid = HdfsConstants.INVALID_TXID;
+  
+  public NNHAStatusHeartbeatWritable() {
+  }
+  
+  public NNHAStatusHeartbeatWritable(State state, long txid) {
+    this.state = state;
+    this.txid = txid;
+  }
+
+  public State getState() {
+    return state;
+  }
+  
+  public long getTxId() {
+    return txid;
+  }
+  
+  ///////////////////////////////////////////
+  // Writable
+  ///////////////////////////////////////////
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeEnum(out, state);
+    out.writeLong(txid);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    state = WritableUtils.readEnum(in, State.class);
+    txid = in.readLong();
+  }
+
+  public static NNHAStatusHeartbeat convert(
+      NNHAStatusHeartbeatWritable haStatus) {
+    return new NNHAStatusHeartbeat(haStatus.getState(), haStatus.getTxId());
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
Thu Dec  8 02:00:20 2011
@@ -21,6 +21,7 @@ import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -32,9 +33,12 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -43,6 +47,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
@@ -63,13 +69,15 @@ public class TestBPOfferService {
 
   private DatanodeProtocol mockNN1;
   private DatanodeProtocol mockNN2;
+  private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
+  private int heartbeatCounts[] = new int[2];
   private DataNode mockDn;
   private FSDatasetInterface mockFSDataset;
   
   @Before
   public void setupMocks() throws Exception {
-    mockNN1 = setupNNMock();
-    mockNN2 = setupNNMock();
+    mockNN1 = setupNNMock(0);
+    mockNN2 = setupNNMock(1);
 
     // Set up a mock DN with the bare-bones configuration
     // objects, etc.
@@ -92,14 +100,17 @@ public class TestBPOfferService {
   /**
    * Set up a mock NN with the bare minimum for a DN to register to it.
    */
-  private DatanodeProtocol setupNNMock() throws Exception {
+  private DatanodeProtocol setupNNMock(int nnIdx) throws Exception {
     DatanodeProtocol mock = Mockito.mock(DatanodeProtocol.class);
     Mockito.doReturn(
         new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID,
             0, HdfsConstants.LAYOUT_VERSION))
       .when(mock).versionRequest();
     
-    Mockito.doReturn(new HeartbeatResponse(null))
+    Mockito.doReturn(new DatanodeRegistration("fake-node"))
+      .when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class));
+    
+    Mockito.doAnswer(new HeartbeatAnswer(nnIdx))
       .when(mock).sendHeartbeat(
           Mockito.any(DatanodeRegistration.class),
           Mockito.anyLong(),
@@ -109,11 +120,32 @@ public class TestBPOfferService {
           Mockito.anyInt(),
           Mockito.anyInt(),
           Mockito.anyInt());
-
+    mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(State.STANDBY, 0);
     return mock;
   }
   
   /**
+   * Mock answer for heartbeats which returns an empty set of commands
+   * and the HA status for the chosen NN from the
+   * {@link TestBPOfferService#mockHaStatuses} array.
+   */
+  private class HeartbeatAnswer implements Answer<HeartbeatResponse> {
+    private final int nnIdx;
+
+    public HeartbeatAnswer(int nnIdx) {
+      this.nnIdx = nnIdx;
+    }
+
+    @Override
+    public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
+      heartbeatCounts[nnIdx]++;
+      return new HeartbeatResponse(new DatanodeCommand[0],
+          mockHaStatuses[nnIdx]);
+    }
+  }
+
+
+  /**
    * Test that the BPOS can register to talk to two different NNs,
    * sends block reports to both, etc.
    */
@@ -204,6 +236,53 @@ public class TestBPOfferService {
       bpos.stop();
     }
   }
+  
+  /**
+   * Test that the DataNode determines the active NameNode correctly
+   * based on the HA-related information in heartbeat responses.
+   * See HDFS-2627.
+   */
+  @Test
+  public void testPickActiveNameNode() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 1);
+      waitForHeartbeats(bpos);
+      assertSame(mockNN1, bpos.getActiveNN());
+
+      // NN2 claims active at a higher txid
+      mockHaStatuses[1] = new NNHAStatusHeartbeat(State.ACTIVE, 2);
+      waitForHeartbeats(bpos);
+      assertSame(mockNN2, bpos.getActiveNN());
+      
+      // Even after another heartbeat from the first NN, it should
+      // think NN2 is active, since it claimed a higher txid
+      waitForHeartbeats(bpos);
+      assertSame(mockNN2, bpos.getActiveNN());
+      
+      // Even if NN2 goes to standby, DN shouldn't reset to talking to NN1,
+      // because NN1's txid is lower than the last active txid. Instead,
+      // it should consider neither active.
+      mockHaStatuses[1] = new NNHAStatusHeartbeat(State.STANDBY, 2);
+      waitForHeartbeats(bpos);
+      assertNull(bpos.getActiveNN());
+      
+      // Now if NN1 goes back to a higher txid, it should be considered active
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 3);
+      waitForHeartbeats(bpos);
+      assertSame(mockNN1, bpos.getActiveNN());
+
+    } finally {
+      bpos.stop();
+    }
+  }
 
   private void waitForOneToFail(final BPOfferService bpos)
       throws Exception {
@@ -269,6 +348,30 @@ public class TestBPOfferService {
     }, 500, 10000);
   }
   
+  private void waitForHeartbeats(BPOfferService bpos)
+    throws Exception {
+    final int countAtStart[];
+    synchronized (heartbeatCounts) {
+      countAtStart = Arrays.copyOf(
+          heartbeatCounts, heartbeatCounts.length);
+    }
+    bpos.triggerHeartbeatForTests();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (heartbeatCounts) {
+          for (int i = 0; i < countAtStart.length; i++) {
+            if (heartbeatCounts[i] <= countAtStart[i]) {
+              return false;
+            }
+          }
+          return true;
+        }
+      }
+    }, 200, 10000);
+  }
+
+  
   private ReceivedDeletedBlockInfo[] waitForBlockReceived(
       ExtendedBlock fakeBlock,
       DatanodeProtocol mockNN) throws Exception {



Mime
View raw message