Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 41734 invoked from network); 4 Mar 2011 03:39:44 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:39:44 -0000 Received: (qmail 63773 invoked by uid 500); 4 Mar 2011 03:39:43 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 63739 invoked by uid 500); 4 Mar 2011 03:39:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 63728 invoked by uid 99); 4 Mar 2011 03:39:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:39:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:39:41 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C14672388901; Fri, 4 Mar 2011 03:39:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077089 - in /hadoop/common/branches/branch-0.20-security-patches/src: hdfs/org/apache/hadoop/hdfs/ hdfs/org/apache/hadoop/hdfs/protocol/ hdfs/org/apache/hadoop/hdfs/server/datanode/ test/org/apache/hadoop/hdfs/ Date: Fri, 04 Mar 2011 03:39:21 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304033921.C14672388901@eris.apache.org> Author: omalley Date: Fri Mar 4 03:39:21 2011 New Revision: 1077089 URL: http://svn.apache.org/viewvc?rev=1077089&view=rev Log: commit 8a62eb768a727018aa78330da0bca3a3e989553b Author: Jitendra Nath Pandey Date: Tue Dec 22 18:05:31 2009 -0800 HDFS-195 from https://issues.apache.org/jira/secure/attachment/12428788/HDFS-195-0_20.1.patch +++ b/YAHOO-CHANGES.txt + HDFS-195. Need to handle access token expiration when re-establishing the + pipeline for dfs write. (Jitendra Nath Pandey) + Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077089&r1=1077088&r2=1077089&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar 4 03:39:21 2011 @@ -2687,12 +2687,11 @@ public class DFSClient implements FSCons // If the block recovery generated a new generation stamp, use that // from now on. Also, setup new pipeline - // - if (newBlock != null) { - block = newBlock.getBlock(); - accessToken = newBlock.getAccessToken(); - nodes = newBlock.getLocations(); - } + // newBlock should never be null and it should contain a newly + // generated access token. + block = newBlock.getBlock(); + accessToken = newBlock.getAccessToken(); + nodes = newBlock.getLocations(); this.hasError = false; lastException = null; @@ -2787,6 +2786,7 @@ public class DFSClient implements FSCons // if (lastBlock != null) { block = lastBlock.getBlock(); + accessToken = lastBlock.getAccessToken(); long usedInLastBlock = stat.getLen() % blockSize; int freeInLastBlock = (int)(blockSize - usedInLastBlock); @@ -2911,6 +2911,7 @@ public class DFSClient implements FSCons // private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client, boolean recoveryFlag) { + short pipelineStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS; String firstBadLink = ""; if (LOG.isDebugEnabled()) { for (int i = 0; i < nodes.length; i++) { @@ -2958,9 +2959,17 @@ public class DFSClient implements FSCons out.flush(); // receive ack for connect + pipelineStatus = blockReplyStream.readShort(); firstBadLink = Text.readString(blockReplyStream); - if (firstBadLink.length() != 0) { - throw new IOException("Bad connect ack with firstBadLink " + firstBadLink); + if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) { + if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) { + throw new InvalidAccessTokenException( + "Got access token error for connect ack with firstBadLink as " + + firstBadLink); + } else { + throw new IOException("Bad connect ack with firstBadLink as " + + firstBadLink); + } } blockStream = out; Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1077089&r1=1077088&r2=1077089&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Mar 4 03:39:21 2011 @@ -29,17 +29,17 @@ public interface ClientDatanodeProtocol public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class); /** - * 3: add keepLength parameter. + * 4: never return null and always return a newly generated access token */ - public static final long versionID = 3L; + public static final long versionID = 4L; /** Start generation-stamp recovery for specified block * @param block the specified block * @param keepLength keep the block length * @param targets the list of possible locations of specified block - * @return the new blockid if recovery successful and the generation stamp - * got updated as part of the recovery, else returns null if the block id - * not have any data and the block was deleted. + * @return either a new generation stamp, or the original generation stamp. + * Regardless of whether a new generation stamp is returned, a newly + * generated access token is returned as part of the return value. * @throws IOException */ LocatedBlock recoverBlock(Block block, boolean keepLength, Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1077089&r1=1077088&r2=1077089&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Fri Mar 4 03:39:21 2011 @@ -31,11 +31,12 @@ public interface DataTransferProtocol { * when protocol changes. It is not very obvious. */ /* - * Version 15: - * Added a new status OP_STATUS_ERROR_ACCESS_TOKEN - * Access token is now required on all DN operations + * Version 16: + * Datanode now needs to send back a status code together + * with firstBadLink during pipeline setup for dfs write + * (only for DFSClients, not for other datanodes). */ - public static final int DATA_TRANSFER_VERSION = 15; + public static final int DATA_TRANSFER_VERSION = 16; // Processed at datanode stream-handler public static final byte OP_WRITE_BLOCK = (byte) 80; Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077089&r1=1077088&r2=1077089&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Mar 4 03:39:21 2011 @@ -1537,8 +1537,9 @@ public class DataNode extends Configured /** Recover a block */ private LocatedBlock recoverBlock(Block block, boolean keepLength, - DatanodeID[] datanodeids, boolean closeFile) throws IOException { + DatanodeInfo[] targets, boolean closeFile) throws IOException { + DatanodeID[] datanodeids = (DatanodeID[])targets; // If the block is already being recovered, then skip recovering it. // This can happen if the namenode and client start recovering the same // file at the same time. @@ -1592,7 +1593,7 @@ public class DataNode extends Configured if (!keepLength) { block.setNumBytes(minlength); } - return syncBlock(block, syncList, closeFile); + return syncBlock(block, syncList, targets, closeFile); } finally { synchronized (ongoingRecovery) { ongoingRecovery.remove(block); @@ -1602,7 +1603,7 @@ public class DataNode extends Configured /** Block synchronization */ private LocatedBlock syncBlock(Block block, List syncList, - boolean closeFile) throws IOException { + DatanodeInfo[] targets, boolean closeFile) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("block=" + block + ", (length=" + block.getNumBytes() + "), syncList=" + syncList + ", closeFile=" + closeFile); @@ -1613,7 +1614,13 @@ public class DataNode extends Configured if (syncList.isEmpty()) { namenode.commitBlockSynchronization(block, 0, 0, closeFile, true, DatanodeID.EMPTY_ARRAY); - return null; + //always return a new access token even if everything else stays the same + LocatedBlock b = new LocatedBlock(block, targets); + if (isAccessTokenEnabled) { + b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock() + .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE))); + } + return b; } List successList = new ArrayList(); @@ -1641,7 +1648,14 @@ public class DataNode extends Configured for (int i = 0; i < nlist.length; i++) { info[i] = new DatanodeInfo(nlist[i]); } - return new LocatedBlock(newblock, info); // success + LocatedBlock b = new LocatedBlock(newblock, info); // success + // should have used client ID to generate access token, but since + // owner ID is not checked, we simply pass null for now. + if (isAccessTokenEnabled) { + b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock() + .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE))); + } + return b; } //failed Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1077089&r1=1077088&r2=1077089&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Mar 4 03:39:21 2011 @@ -268,6 +268,7 @@ class DataXceiver implements Runnable, F .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) { try { if (client.length() != 0) { + replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN); Text.writeString(replyOut, datanode.dnRegistration.getName()); replyOut.flush(); } @@ -284,6 +285,7 @@ class DataXceiver implements Runnable, F BlockReceiver blockReceiver = null; // responsible for data handling String mirrorNode = null; // the name:port of next target String firstBadLink = ""; // first datanode that failed in connection setup + short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS; try { // open a block receiver and check if the block does not exist blockReceiver = new BlockReceiver(block, in, @@ -337,8 +339,9 @@ class DataXceiver implements Runnable, F // read connect ack (only for clients, not for replication req) if (client.length() != 0) { + mirrorInStatus = mirrorIn.readShort(); firstBadLink = Text.readString(mirrorIn); - if (LOG.isDebugEnabled() || firstBadLink.length() > 0) { + if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) { LOG.info("Datanode " + targets.length + " got response for connect ack " + " from downstream datanode with firstbadlink as " + @@ -348,6 +351,7 @@ class DataXceiver implements Runnable, F } catch (IOException e) { if (client.length() != 0) { + replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR); Text.writeString(replyOut, mirrorNode); replyOut.flush(); } @@ -370,11 +374,12 @@ class DataXceiver implements Runnable, F // send connect ack back to source (only for clients) if (client.length() != 0) { - if (LOG.isDebugEnabled() || firstBadLink.length() > 0) { + if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) { LOG.info("Datanode " + targets.length + " forwarding connect ack to upstream firstbadlink is " + firstBadLink); } + replyOut.writeShort(mirrorInStatus); Text.writeString(replyOut, firstBadLink); replyOut.flush(); } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1077089&r1=1077088&r2=1077089&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Fri Mar 4 03:39:21 2011 @@ -225,6 +225,7 @@ public class TestDataTransferProtocol ex // bad data chunk length sendOut.writeInt(-1-random.nextInt(oneMil)); + recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS); Text.writeString(recvOut, ""); // first bad node recvOut.writeLong(100); // sequencenumber recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR); @@ -254,6 +255,7 @@ public class TestDataTransferProtocol ex sendOut.writeInt(0); // chunk length sendOut.writeInt(0); // zero checksum //ok finally write a block with 0 len + recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS); Text.writeString(recvOut, ""); // first bad node recvOut.writeLong(100); // sequencenumber recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);