hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rang...@apache.org
Subject svn commit: r787764 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/
Date Tue, 23 Jun 2009 17:54:25 GMT
Author: rangadi
Date: Tue Jun 23 17:54:24 2009
New Revision: 787764

URL: http://svn.apache.org/viewvc?rev=787764&view=rev
Log:
HDFS-195. Handle expired tokens when write pipeline is restablished.
(Kan Zhang via rangadi)

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=787764&r1=787763&r2=787764&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Jun 23 17:54:24 2009
@@ -25,3 +25,6 @@
 
     HADOOP-6096. Fix Eclipse project and classpath files following project
     split. (tomwhite)
+
+    HDFS-195. Handle expired tokens when write pipeline is restablished.
+    (Kan Zhang via rangadi)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=787764&r1=787763&r2=787764&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jun 23 17:54:24 2009
@@ -2610,12 +2610,11 @@
 
           // If the block recovery generated a new generation stamp, use that
           // from now on.  Also, setup new pipeline
-          //
-          if (newBlock != null) {
-            block = newBlock.getBlock();
-            accessToken = newBlock.getAccessToken();
-            nodes = newBlock.getLocations();
-          }
+          // newBlock should never be null and it should contain a newly 
+          // generated access token.
+          block = newBlock.getBlock();
+          accessToken = newBlock.getAccessToken();
+          nodes = newBlock.getLocations();
 
           this.hasError = false;
           lastException = null;
@@ -2687,6 +2686,7 @@
       //
       private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
           boolean recoveryFlag) {
+        short pipelineStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
         String firstBadLink = "";
         if (LOG.isDebugEnabled()) {
           for (int i = 0; i < nodes.length; i++) {
@@ -2725,10 +2725,17 @@
           out.flush();
 
           // receive ack for connect
+          pipelineStatus = blockReplyStream.readShort();
           firstBadLink = Text.readString(blockReplyStream);
-          if (firstBadLink.length() != 0) {
-            throw new IOException("Bad connect ack with firstBadLink "
-                + firstBadLink);
+          if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
+            if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+              throw new InvalidAccessTokenException(
+                  "Got access token error for connect ack with firstBadLink as "
+                      + firstBadLink);
+            } else {
+              throw new IOException("Bad connect ack with firstBadLink as "
+                  + firstBadLink);
+            }
           }
 
           blockStream = out;
@@ -2799,6 +2806,7 @@
       void initAppend(LocatedBlock lastBlock, FileStatus stat,
           int bytesPerChecksum) throws IOException {
         block = lastBlock.getBlock();
+        accessToken = lastBlock.getAccessToken();
         long usedInLastBlock = stat.getLen() % blockSize;
         int freeInLastBlock = (int)(blockSize - usedInLastBlock);
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=787764&r1=787763&r2=787764&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
(original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
Tue Jun 23 17:54:24 2009
@@ -29,17 +29,17 @@
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 3: add keepLength parameter.
+   * 4: never return null and always return a newly generated access token
    */
-  public static final long versionID = 3L;
+  public static final long versionID = 4L;
 
   /** Start generation-stamp recovery for specified block
    * @param block the specified block
    * @param keepLength keep the block length
    * @param targets the list of possible locations of specified block
-   * @return the new blockid if recovery successful and the generation stamp
-   * got updated as part of the recovery, else returns null if the block id
-   * not have any data and the block was deleted.
+   * @return either a new generation stamp, or the original generation stamp. 
+   * Regardless of whether a new generation stamp is returned, a newly 
+   * generated access token is returned as part of the return value.
    * @throws IOException
    */
   LocatedBlock recoverBlock(Block block, boolean keepLength,

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=787764&r1=787763&r2=787764&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue
Jun 23 17:54:24 2009
@@ -35,11 +35,12 @@
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 15:
-   *    Added a new status OP_STATUS_ERROR_ACCESS_TOKEN
-   *    Access token is now required on all DN operations
+   * Version 16:
+   *    Datanode now needs to send back a status code together 
+   *    with firstBadLink during pipeline setup for dfs write
+   *    (only for DFSClients, not for other datanodes).
    */
-  public static final int DATA_TRANSFER_VERSION = 15;
+  public static final int DATA_TRANSFER_VERSION = 16;
 
   // Processed at datanode stream-handler
   public static final byte OP_WRITE_BLOCK = (byte) 80;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=787764&r1=787763&r2=787764&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Jun
23 17:54:24 2009
@@ -1545,8 +1545,9 @@
 
   /** Recover a block */
   private LocatedBlock recoverBlock(Block block, boolean keepLength,
-      DatanodeID[] datanodeids, boolean closeFile) throws IOException {
+      DatanodeInfo[] targets, boolean closeFile) throws IOException {
 
+    DatanodeID[] datanodeids = (DatanodeID[])targets;
     // If the block is already being recovered, then skip recovering it.
     // This can happen if the namenode and client start recovering the same
     // file at the same time.
@@ -1600,7 +1601,7 @@
       if (!keepLength) {
         block.setNumBytes(minlength);
       }
-      return syncBlock(block, syncList, closeFile);
+      return syncBlock(block, syncList, targets, closeFile);
     } finally {
       synchronized (ongoingRecovery) {
         ongoingRecovery.remove(block);
@@ -1610,7 +1611,7 @@
 
   /** Block synchronization */
   private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
-      boolean closeFile) throws IOException {
+      DatanodeInfo[] targets, boolean closeFile) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
           + "), syncList=" + syncList + ", closeFile=" + closeFile);
@@ -1621,7 +1622,13 @@
     if (syncList.isEmpty()) {
       namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
           DatanodeID.EMPTY_ARRAY);
-      return null;
+      //always return a new access token even if everything else stays the same
+      LocatedBlock b = new LocatedBlock(block, targets);
+      if (isAccessTokenEnabled) {
+        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
+            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+      }
+      return b;
     }
 
     List<DatanodeID> successList = new ArrayList<DatanodeID>();
@@ -1649,7 +1656,14 @@
       for (int i = 0; i < nlist.length; i++) {
         info[i] = new DatanodeInfo(nlist[i]);
       }
-      return new LocatedBlock(newblock, info); // success
+      LocatedBlock b = new LocatedBlock(newblock, info); // success
+      // should have used client ID to generate access token, but since 
+      // owner ID is not checked, we simply pass null for now.
+      if (isAccessTokenEnabled) {
+        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
+            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+      }
+      return b;
     }
 
     //failed

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=787764&r1=787763&r2=787764&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue
Jun 23 17:54:24 2009
@@ -238,6 +238,7 @@
             .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
       try {
         if (client.length() != 0) {
+          replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
           Text.writeString(replyOut, datanode.dnRegistration.getName());
           replyOut.flush();
         }
@@ -254,6 +255,7 @@
     BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
+    short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
     try {
       // open a block receiver and check if the block does not exist
       blockReceiver = new BlockReceiver(block, in, 
@@ -294,8 +296,9 @@
 
           // read connect ack (only for clients, not for replication req)
           if (client.length() != 0) {
+            mirrorInStatus = mirrorIn.readShort();
             firstBadLink = Text.readString(mirrorIn);
-            if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+            if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS)
{
               LOG.info("Datanode " + targets.length +
                        " got response for connect ack " +
                        " from downstream datanode with firstbadlink as " +
@@ -305,6 +308,7 @@
 
         } catch (IOException e) {
           if (client.length() != 0) {
+            replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
             Text.writeString(replyOut, mirrorNode);
             replyOut.flush();
           }
@@ -327,11 +331,12 @@
 
       // send connect ack back to source (only for clients)
       if (client.length() != 0) {
-        if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+        if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS)
{
           LOG.info("Datanode " + targets.length +
                    " forwarding connect ack to upstream firstbadlink is " +
                    firstBadLink);
         }
+        replyOut.writeShort(mirrorInStatus);
         Text.writeString(replyOut, firstBadLink);
         replyOut.flush();
       }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=787764&r1=787763&r2=787764&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue
Jun 23 17:54:24 2009
@@ -228,6 +228,7 @@
     
     // bad data chunk length
     sendOut.writeInt(-1-random.nextInt(oneMil));
+    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
     Text.writeString(recvOut, ""); // first bad node
     recvOut.writeLong(100);        // sequencenumber
     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
@@ -257,6 +258,7 @@
     sendOut.writeInt(0);           // chunk length
     sendOut.writeInt(0);           // zero checksum
     //ok finally write a block with 0 len
+    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
     Text.writeString(recvOut, ""); // first bad node
     recvOut.writeLong(100);        // sequencenumber
     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);



Mime
View raw message