hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077089 - in /hadoop/common/branches/branch-0.20-security-patches/src: hdfs/org/apache/hadoop/hdfs/ hdfs/org/apache/hadoop/hdfs/protocol/ hdfs/org/apache/hadoop/hdfs/server/datanode/ test/org/apache/hadoop/hdfs/
Date Fri, 04 Mar 2011 03:39:21 GMT
Author: omalley
Date: Fri Mar  4 03:39:21 2011
New Revision: 1077089

URL: http://svn.apache.org/viewvc?rev=1077089&view=rev
Log:
commit 8a62eb768a727018aa78330da0bca3a3e989553b
Author: Jitendra Nath Pandey <jitendra@yahoo-inc.com>
Date:   Tue Dec 22 18:05:31 2009 -0800

    HDFS-195 from https://issues.apache.org/jira/secure/attachment/12428788/HDFS-195-0_20.1.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    HDFS-195. Need to handle access token expiration when re-establishing the
    +    pipeline for dfs write. (Jitendra Nath Pandey)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Fri Mar  4 03:39:21 2011
@@ -2687,12 +2687,11 @@ public class DFSClient implements FSCons
 
         // If the block recovery generated a new generation stamp, use that
         // from now on.  Also, setup new pipeline
-        //
-        if (newBlock != null) {
-          block = newBlock.getBlock();
-          accessToken = newBlock.getAccessToken();
-          nodes = newBlock.getLocations();
-        }
+        // newBlock should never be null and it should contain a newly
+        // generated access token.
+        block = newBlock.getBlock();
+        accessToken = newBlock.getAccessToken();
+        nodes = newBlock.getLocations();
 
         this.hasError = false;
         lastException = null;
@@ -2787,6 +2786,7 @@ public class DFSClient implements FSCons
       //
       if (lastBlock != null) {
         block = lastBlock.getBlock();
+        accessToken = lastBlock.getAccessToken();
         long usedInLastBlock = stat.getLen() % blockSize;
         int freeInLastBlock = (int)(blockSize - usedInLastBlock);
 
@@ -2911,6 +2911,7 @@ public class DFSClient implements FSCons
     //
     private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
                     boolean recoveryFlag) {
+      short pipelineStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
       String firstBadLink = "";
       if (LOG.isDebugEnabled()) {
         for (int i = 0; i < nodes.length; i++) {
@@ -2958,9 +2959,17 @@ public class DFSClient implements FSCons
         out.flush();
 
         // receive ack for connect
+        pipelineStatus = blockReplyStream.readShort();
         firstBadLink = Text.readString(blockReplyStream);
-        if (firstBadLink.length() != 0) {
-          throw new IOException("Bad connect ack with firstBadLink " + firstBadLink);
+        if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
+          if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+            throw new InvalidAccessTokenException(
+                "Got access token error for connect ack with firstBadLink as "
+                    + firstBadLink);
+          } else {
+            throw new IOException("Bad connect ack with firstBadLink as "
+                + firstBadLink);
+          }
         }
 
         blockStream = out;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
Fri Mar  4 03:39:21 2011
@@ -29,17 +29,17 @@ public interface ClientDatanodeProtocol 
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 3: add keepLength parameter.
+   * 4: never return null and always return a newly generated access token
    */
-  public static final long versionID = 3L;
+  public static final long versionID = 4L;
 
   /** Start generation-stamp recovery for specified block
    * @param block the specified block
    * @param keepLength keep the block length
    * @param targets the list of possible locations of specified block
-   * @return the new blockid if recovery successful and the generation stamp
-   * got updated as part of the recovery, else returns null if the block id
-   * not have any data and the block was deleted.
+   * @return either a new generation stamp, or the original generation stamp. 
+   * Regardless of whether a new generation stamp is returned, a newly 
+   * generated access token is returned as part of the return value.
    * @throws IOException
    */
   LocatedBlock recoverBlock(Block block, boolean keepLength,

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
Fri Mar  4 03:39:21 2011
@@ -31,11 +31,12 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 15:
-   *    Added a new status OP_STATUS_ERROR_ACCESS_TOKEN
-   *    Access token is now required on all DN operations
+   * Version 16:
+   *    Datanode now needs to send back a status code together 
+   *    with firstBadLink during pipeline setup for dfs write
+   *    (only for DFSClients, not for other datanodes).
    */
-  public static final int DATA_TRANSFER_VERSION = 15;
+  public static final int DATA_TRANSFER_VERSION = 16;
 
   // Processed at datanode stream-handler
   public static final byte OP_WRITE_BLOCK = (byte) 80;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Fri Mar  4 03:39:21 2011
@@ -1537,8 +1537,9 @@ public class DataNode extends Configured
 
   /** Recover a block */
   private LocatedBlock recoverBlock(Block block, boolean keepLength,
-      DatanodeID[] datanodeids, boolean closeFile) throws IOException {
+      DatanodeInfo[] targets, boolean closeFile) throws IOException {
 
+    DatanodeID[] datanodeids = (DatanodeID[])targets;
     // If the block is already being recovered, then skip recovering it.
     // This can happen if the namenode and client start recovering the same
     // file at the same time.
@@ -1592,7 +1593,7 @@ public class DataNode extends Configured
       if (!keepLength) {
         block.setNumBytes(minlength);
       }
-      return syncBlock(block, syncList, closeFile);
+      return syncBlock(block, syncList, targets, closeFile);
     } finally {
       synchronized (ongoingRecovery) {
         ongoingRecovery.remove(block);
@@ -1602,7 +1603,7 @@ public class DataNode extends Configured
 
   /** Block synchronization */
   private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
-      boolean closeFile) throws IOException {
+      DatanodeInfo[] targets, boolean closeFile) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
           + "), syncList=" + syncList + ", closeFile=" + closeFile);
@@ -1613,7 +1614,13 @@ public class DataNode extends Configured
     if (syncList.isEmpty()) {
       namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
           DatanodeID.EMPTY_ARRAY);
-      return null;
+      //always return a new access token even if everything else stays the same
+      LocatedBlock b = new LocatedBlock(block, targets);
+      if (isAccessTokenEnabled) {
+        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
+            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+      }
+      return b;
     }
 
     List<DatanodeID> successList = new ArrayList<DatanodeID>();
@@ -1641,7 +1648,14 @@ public class DataNode extends Configured
       for (int i = 0; i < nlist.length; i++) {
         info[i] = new DatanodeInfo(nlist[i]);
       }
-      return new LocatedBlock(newblock, info); // success
+      LocatedBlock b = new LocatedBlock(newblock, info); // success
+      // should have used client ID to generate access token, but since 
+      // owner ID is not checked, we simply pass null for now.
+      if (isAccessTokenEnabled) {
+        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
+            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+      }
+      return b;
     }
 
     //failed

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Fri Mar  4 03:39:21 2011
@@ -268,6 +268,7 @@ class DataXceiver implements Runnable, F
             .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
       try {
         if (client.length() != 0) {
+          replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
           Text.writeString(replyOut, datanode.dnRegistration.getName());
           replyOut.flush();
         }
@@ -284,6 +285,7 @@ class DataXceiver implements Runnable, F
     BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
+    short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
     try {
       // open a block receiver and check if the block does not exist
       blockReceiver = new BlockReceiver(block, in, 
@@ -337,8 +339,9 @@ class DataXceiver implements Runnable, F
 
           // read connect ack (only for clients, not for replication req)
           if (client.length() != 0) {
+            mirrorInStatus = mirrorIn.readShort();
             firstBadLink = Text.readString(mirrorIn);
-            if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+            if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS)
{
               LOG.info("Datanode " + targets.length +
                        " got response for connect ack " +
                        " from downstream datanode with firstbadlink as " +
@@ -348,6 +351,7 @@ class DataXceiver implements Runnable, F
 
         } catch (IOException e) {
           if (client.length() != 0) {
+            replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
             Text.writeString(replyOut, mirrorNode);
             replyOut.flush();
           }
@@ -370,11 +374,12 @@ class DataXceiver implements Runnable, F
 
       // send connect ack back to source (only for clients)
       if (client.length() != 0) {
-        if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+        if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS)
{
           LOG.info("Datanode " + targets.length +
                    " forwarding connect ack to upstream firstbadlink is " +
                    firstBadLink);
         }
+        replyOut.writeShort(mirrorInStatus);
         Text.writeString(replyOut, firstBadLink);
         replyOut.flush();
       }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1077089&r1=1077088&r2=1077089&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Fri Mar  4 03:39:21 2011
@@ -225,6 +225,7 @@ public class TestDataTransferProtocol ex
     
     // bad data chunk length
     sendOut.writeInt(-1-random.nextInt(oneMil));
+    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
     Text.writeString(recvOut, ""); // first bad node
     recvOut.writeLong(100);        // sequencenumber
     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
@@ -254,6 +255,7 @@ public class TestDataTransferProtocol ex
     sendOut.writeInt(0);           // chunk length
     sendOut.writeInt(0);           // zero checksum
     //ok finally write a block with 0 len
+    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
     Text.writeString(recvOut, ""); // first bad node
     recvOut.writeLong(100);        // sequencenumber
     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);



Mime
View raw message