hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077252 [1/2] - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/http/ core/org/apache/hadoop/ipc/ core/org/apache/hadoop/security/ core/org/apache/hadoop/security/token/delegation/ hdfs/org/apache/hadoop...
Date Fri, 04 Mar 2011 03:56:35 GMT
Author: omalley
Date: Fri Mar  4 03:56:33 2011
New Revision: 1077252

URL: http://svn.apache.org/viewvc?rev=1077252&view=rev
Log:
commit d4a276dbdcb8ca9485910d86480ad630330d1f02
Author: Devaraj Das <ddas@yahoo-inc.com>
Date:   Sat Feb 27 04:04:26 2010 -0800

    HDFS:992 from https://issues.apache.org/jira/secure/attachment/12437340/h992-BK-0.20-07.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    HDFS-992. Refactors block access token implementation to conform to the
    +    generic Token interface. (Kan Zhang via ddas)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockKey.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/token/
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/token/block/
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/AccessTokenHandler.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/BlockAccessKey.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/BlockAccessToken.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/ExportedAccessKeys.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/InvalidAccessTokenException.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/SecurityTestUtil.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestAccessToken.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/http/HttpServer.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/DelegationKey.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestUserGroupInformation.java
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/datanode/browseBlock.jsp
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/datanode/tail.jsp

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/http/HttpServer.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/http/HttpServer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/http/HttpServer.java Fri Mar  4 03:56:33 2011
@@ -132,17 +132,12 @@ public class HttpServer implements Filte
     webAppContext.getServletContext().setAttribute(CONF_CONTEXT_ATTRIBUTE, conf);
     webServer.addHandler(webAppContext);
 
-<<<<<<< HEAD:src/core/org/apache/hadoop/http/HttpServer.java
-    addDefaultApps(contexts, appDir);
+    addDefaultApps(contexts, appDir, conf);
     
     defineFilter(webAppContext, "krb5Filter", 
         Krb5AndCertsSslSocketConnector.Krb5SslFilter.class.getName(), 
         null, null);
     
-=======
-    addDefaultApps(contexts, appDir, conf);
-
->>>>>>> yahoo-hadoop-0.20.1xx:src/core/org/apache/hadoop/http/HttpServer.java
     addGlobalFilter("safety", QuotingInputFilter.class.getName(), null);
     final FilterInitializer[] initializers = getFilterInitializers(conf); 
     if (initializers != null) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java Fri Mar  4 03:56:33 2011
@@ -848,7 +848,13 @@ public abstract class Server {
       if (authMethod == SaslRpcServer.AuthMethod.DIGEST) {
         TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,
             secretManager);
-        return tokenId.getUser();
+        UserGroupInformation ugi = tokenId.getUser();
+        if (ugi == null) {
+          throw new AccessControlException(
+              "Can't retrieve username from tokenIdentifier.");
+        }
+        ugi.addTokenIdentifier(tokenId);
+        return ugi;
       } else {
         return UserGroupInformation.createRemoteUser(authorizedId);
       }
@@ -1444,7 +1450,7 @@ public abstract class Server {
   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
 
   /** Starts the service.  Must be called before any calls will be handled. */
-  public synchronized void start() throws IOException {
+  public synchronized void start() {
     responder.start();
     listener.start();
     handlers = new Handler[handlerCount];

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java Fri Mar  4 03:56:33 2011
@@ -68,10 +68,10 @@ public class SaslRpcServer {
     return Base64.decodeBase64(identifier.getBytes());
   }
 
-  public static TokenIdentifier getIdentifier(String id,
-      SecretManager<TokenIdentifier> secretManager) throws InvalidToken {
+  public static <T extends TokenIdentifier> T getIdentifier(String id,
+      SecretManager<T> secretManager) throws InvalidToken {
     byte[] tokenId = decodeIdentifier(id);
-    TokenIdentifier tokenIdentifier = secretManager.createIdentifier();
+    T tokenIdentifier = secretManager.createIdentifier();
     try {
       tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream(
           tokenId)));
@@ -201,11 +201,12 @@ public class SaslRpcServer {
           ac.setAuthorized(false);
         }
         if (ac.isAuthorized()) {
-          String username = getIdentifier(authzid, secretManager).getUser()
-              .getUserName().toString();
-          if (LOG.isDebugEnabled())
+          if (LOG.isDebugEnabled()) {
+            String username = getIdentifier(authzid, secretManager).getUser()
+            .getUserName().toString();
             LOG.debug("SASL server DIGEST-MD5 callback: setting "
                 + "canonicalized client ID: " + username);
+          }
           ac.setAuthorizedID(authzid);
         }
       }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java Fri Mar  4 03:56:33 2011
@@ -599,6 +599,28 @@ public class UserGroupInformation {
   }
 
   /**
+   * Add a TokenIdentifier to this UGI. The TokenIdentifier has typically been
+   * authenticated by the RPC layer as belonging to the user represented by this
+   * UGI.
+   * 
+   * @param tokenId
+   *          tokenIdentifier to be added
+   * @return true on successful add of new tokenIdentifier
+   */
+  public synchronized boolean addTokenIdentifier(TokenIdentifier tokenId) {
+    return subject.getPublicCredentials().add(tokenId);
+  }
+
+  /**
+   * Get the set of TokenIdentifiers belonging to this UGI
+   * 
+   * @return the set of TokenIdentifiers belonging to this UGI
+   */
+  public synchronized Set<TokenIdentifier> getTokenIdentifiers() {
+    return subject.getPublicCredentials(TokenIdentifier.class);
+  }
+  
+  /**
    * Add a token to this UGI
    * 
    * @param token Token to be added

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/DelegationKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/DelegationKey.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/DelegationKey.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/DelegationKey.java Fri Mar  4 03:56:33 2011
@@ -70,9 +70,13 @@ public class DelegationKey implements Wr
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeVInt(out, keyId);
     WritableUtils.writeVLong(out, expiryDate);
-    byte[] keyBytes = key.getEncoded();
-    WritableUtils.writeVInt(out, keyBytes.length);
-    out.write(keyBytes);
+    if (key == null) {
+      WritableUtils.writeVInt(out, -1);
+    } else {
+      byte[] keyBytes = key.getEncoded();
+      WritableUtils.writeVInt(out, keyBytes.length);
+      out.write(keyBytes);
+    }
   }
 
   /**
@@ -81,8 +85,12 @@ public class DelegationKey implements Wr
     keyId = WritableUtils.readVInt(in);
     expiryDate = WritableUtils.readVLong(in);
     int len = WritableUtils.readVInt(in);
-    byte[] keyBytes = new byte[len];
-    in.readFully(keyBytes);
-    key = AbstractDelegationTokenSecretManager.createSecretKey(keyBytes);
+    if (len == -1) {
+      key = null;
+    } else {
+      byte[] keyBytes = new byte[len];
+      in.readFully(keyBytes);
+      key = AbstractDelegationTokenSecretManager.createSecretKey(keyBytes);
+    }
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar  4 03:56:33 2011
@@ -29,8 +29,8 @@ import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
 import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -132,14 +132,19 @@ public class DFSClient implements FSCons
   }
 
   static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
-      DatanodeID datanodeid, Configuration conf) throws IOException {
+      DatanodeID datanodeid, Configuration conf, 
+      Block block, Token<BlockTokenIdentifier> token) throws IOException {
     InetSocketAddress addr = NetUtils.createSocketAddr(
       datanodeid.getHost() + ":" + datanodeid.getIpcPort());
     if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
       ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
     }
+    UserGroupInformation ticket = UserGroupInformation
+        .createRemoteUser(block.toString());
+    ticket.addToken(token);
     return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
-        ClientDatanodeProtocol.versionID, addr, conf);
+        ClientDatanodeProtocol.versionID, addr, ticket, conf, NetUtils
+        .getDefaultSocketFactory(conf));
   }
         
   /**
@@ -718,7 +723,7 @@ public class DFSClient implements FSCons
           out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM);
           out.writeLong(block.getBlockId());
           out.writeLong(block.getGenerationStamp());
-          lb.getAccessToken().write(out);
+          lb.getBlockToken().write(out);
           out.flush();
          
           final short reply = in.readShort();
@@ -1382,7 +1387,7 @@ public class DFSClient implements FSCons
       checksumSize = this.checksum.getChecksumSize();
     }
 
-    public static BlockReader newBlockReader(Socket sock, String file, long blockId, BlockAccessToken accessToken, 
+    public static BlockReader newBlockReader(Socket sock, String file, long blockId, Token<BlockTokenIdentifier> accessToken, 
         long genStamp, long startOffset, long len, int bufferSize) throws IOException {
       return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
           true);
@@ -1390,7 +1395,7 @@ public class DFSClient implements FSCons
 
     /** Java Doc required */
     public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
-                                       BlockAccessToken accessToken,
+                                       Token<BlockTokenIdentifier> accessToken,
                                        long genStamp,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum)
@@ -1401,7 +1406,7 @@ public class DFSClient implements FSCons
 
     public static BlockReader newBlockReader( Socket sock, String file,
                                        long blockId, 
-                                       BlockAccessToken accessToken,
+                                       Token<BlockTokenIdentifier> accessToken,
                                        long genStamp,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum,
@@ -1433,7 +1438,7 @@ public class DFSClient implements FSCons
       short status = in.readShort();
       if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
         if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
-          throw new InvalidAccessTokenException(
+          throw new InvalidBlockTokenException(
               "Got access token error for OP_READ_BLOCK, self="
                   + sock.getLocalSocketAddress() + ", remote="
                   + sock.getRemoteSocketAddress() + ", for file " + file
@@ -1723,7 +1728,7 @@ public class DFSClient implements FSCons
           NetUtils.connect(s, targetAddr, socketTimeout);
           s.setSoTimeout(socketTimeout);
           Block blk = targetBlock.getBlock();
-          BlockAccessToken accessToken = targetBlock.getAccessToken();
+          Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
           
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
               accessToken, 
@@ -1732,7 +1737,7 @@ public class DFSClient implements FSCons
               buffersize, verifyChecksum, clientName);
           return chosenNode;
         } catch (IOException ex) {
-          if (ex instanceof InvalidAccessTokenException && refetchToken > 0) {
+          if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
             LOG.info("Will fetch a new access token and retry, " 
                 + "access token was invalid when connecting to " + targetAddr
                 + " : " + ex);
@@ -1951,7 +1956,7 @@ public class DFSClient implements FSCons
           dn = socketFactory.createSocket();
           NetUtils.connect(dn, targetAddr, socketTimeout);
           dn.setSoTimeout(socketTimeout);
-          BlockAccessToken accessToken = block.getAccessToken();
+          Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
               
           int len = (int) (end - start + 1);
               
@@ -1973,7 +1978,7 @@ public class DFSClient implements FSCons
                    e.getPos() + " from " + chosenNode.getName());
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
-          if (e instanceof InvalidAccessTokenException && refetchToken > 0) {
+          if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
             LOG.info("Will get a new access token and retry, "
                 + "access token was invalid when connecting to " + targetAddr
                 + " : " + e);
@@ -2217,7 +2222,7 @@ public class DFSClient implements FSCons
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
     private Block block;
-    private BlockAccessToken accessToken;
+    private Token<BlockTokenIdentifier> accessToken;
     final private long blockSize;
     private DataChecksum checksum;
     private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
@@ -2243,7 +2248,7 @@ public class DFSClient implements FSCons
     private volatile boolean appendChunk = false;   // appending to existing partial block
     private long initialFileSize = 0; // at time of file open
 
-    BlockAccessToken getAccessToken() {
+    Token<BlockTokenIdentifier> getAccessToken() {
       return accessToken;
     }
 
@@ -2685,7 +2690,7 @@ public class DFSClient implements FSCons
         try {
           // Pick the "least" datanode as the primary datanode to avoid deadlock.
           primaryNode = Collections.min(Arrays.asList(newnodes));
-          primary = createClientDatanodeProtocolProxy(primaryNode, conf);
+          primary = createClientDatanodeProtocolProxy(primaryNode, conf, block, accessToken);
           newBlock = primary.recoverBlock(block, isAppend, newnodes);
         } catch (IOException e) {
           recoveryErrorCount++;
@@ -2741,7 +2746,7 @@ public class DFSClient implements FSCons
         // newBlock should never be null and it should contain a newly
         // generated access token.
         block = newBlock.getBlock();
-        accessToken = newBlock.getAccessToken();
+        accessToken = newBlock.getBlockToken();
         nodes = newBlock.getLocations();
 
         this.hasError = false;
@@ -2837,7 +2842,7 @@ public class DFSClient implements FSCons
       //
       if (lastBlock != null) {
         block = lastBlock.getBlock();
-        accessToken = lastBlock.getAccessToken();
+        accessToken = lastBlock.getBlockToken();
         long usedInLastBlock = stat.getLen() % blockSize;
         int freeInLastBlock = (int)(blockSize - usedInLastBlock);
 
@@ -2927,7 +2932,7 @@ public class DFSClient implements FSCons
         long startTime = System.currentTimeMillis();
         lb = locateFollowingBlock(startTime);
         block = lb.getBlock();
-        accessToken = lb.getAccessToken();
+        accessToken = lb.getBlockToken();
         nodes = lb.getLocations();
   
         //
@@ -3014,7 +3019,7 @@ public class DFSClient implements FSCons
         firstBadLink = Text.readString(blockReplyStream);
         if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
           if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
-            throw new InvalidAccessTokenException(
+            throw new InvalidBlockTokenException(
                 "Got access token error for connect ack with firstBadLink as "
                     + firstBadLink);
           } else {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Mar  4 03:56:33 2011
@@ -21,10 +21,13 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.token.TokenInfo;
 
 /** An client-datanode protocol for block recovery
  */
+@TokenInfo(BlockTokenSelector.class)
 public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Fri Mar  4 03:56:33 2011
@@ -17,8 +17,9 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.security.token.Token;
 
 import java.io.*;
 
@@ -44,7 +45,7 @@ public class LocatedBlock implements Wri
   // else false. If block has few corrupt replicas, they are filtered and 
   // their locations are not part of this object
   private boolean corrupt;
-  private BlockAccessToken accessToken = new BlockAccessToken();
+  private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
 
   /**
    */
@@ -78,12 +79,12 @@ public class LocatedBlock implements Wri
     }
   }
 
-  public BlockAccessToken getAccessToken() {
-    return accessToken;
+  public Token<BlockTokenIdentifier> getBlockToken() {
+    return blockToken;
   }
 
-  public void setAccessToken(BlockAccessToken token) {
-    this.accessToken = token;
+  public void setBlockToken(Token<BlockTokenIdentifier> token) {
+    this.blockToken = token;
   }
 
   /**
@@ -122,7 +123,7 @@ public class LocatedBlock implements Wri
   // Writable
   ///////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    accessToken.write(out);
+    blockToken.write(out);
     out.writeBoolean(corrupt);
     out.writeLong(offset);
     b.write(out);
@@ -133,7 +134,7 @@ public class LocatedBlock implements Wri
   }
 
   public void readFields(DataInput in) throws IOException {
-    accessToken.readFields(in);
+    blockToken.readFields(in);
     this.corrupt = in.readBoolean();
     offset = in.readLong();
     this.b = new Block();

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockKey.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockKey.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockKey.java Fri Mar  4 03:56:33 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+/**
+ * Key used for generating and verifying block tokens
+ */
+public class BlockKey extends DelegationKey {
+
+  public BlockKey() {
+    super();
+  }
+
+  public BlockKey(int keyId, long expiryDate, SecretKey key) {
+    super(keyId, expiryDate, key);
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Fri Mar  4 03:56:33 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+public class BlockTokenIdentifier extends TokenIdentifier {
+  static final Text KIND_NAME = new Text("HDFS_BLOCK_TOKEN");
+
+  private long expiryDate;
+  private int keyId;
+  private String userId;
+  private long blockId;
+  private EnumSet<AccessMode> modes;
+
+  public BlockTokenIdentifier() {
+    this(null, 0, EnumSet.noneOf(AccessMode.class));
+  }
+
+  public BlockTokenIdentifier(String userId, long blockId,
+      EnumSet<AccessMode> modes) {
+    this.userId = userId;
+    this.blockId = blockId;
+    this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
+  }
+
+  @Override
+  public Text getKind() {
+    return KIND_NAME;
+  }
+
+  @Override
+  public UserGroupInformation getUser() {
+    if (userId == null || "".equals(userId)) {
+      return UserGroupInformation.createRemoteUser(Long.toString(blockId));
+    }
+    return UserGroupInformation.createRemoteUser(userId);
+  }
+
+  public long getExpiryDate() {
+    return expiryDate;
+  }
+
+  public void setExpiryDate(long expiryDate) {
+    this.expiryDate = expiryDate;
+  }
+
+  public int getKeyId() {
+    return this.keyId;
+  }
+
+  public void setKeyId(int keyId) {
+    this.keyId = keyId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public long getBlockId() {
+    return blockId;
+  }
+
+  public EnumSet<AccessMode> getAccessModes() {
+    return modes;
+  }
+
+  public String toString() {
+    return "block_token_identifier (expiryDate=" + this.getExpiryDate()
+        + ", keyId=" + this.getKeyId() + ", userId=" + this.getUserId()
+        + ", blockId=" + this.getBlockId() + ", access modes="
+        + this.getAccessModes() + ")";
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof BlockTokenIdentifier) {
+      BlockTokenIdentifier that = (BlockTokenIdentifier) obj;
+      return this.expiryDate == that.expiryDate && this.keyId == that.keyId
+          && isEqual(this.userId, that.userId) && this.blockId == that.blockId
+          && isEqual(this.modes, that.modes);
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
+        ^ (userId == null ? 0 : userId.hashCode());
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    expiryDate = WritableUtils.readVLong(in);
+    keyId = WritableUtils.readVInt(in);
+    userId = WritableUtils.readString(in);
+    blockId = WritableUtils.readVLong(in);
+    int length = WritableUtils.readVInt(in);
+    for (int i = 0; i < length; i++) {
+      modes.add(WritableUtils.readEnum(in, AccessMode.class));
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVLong(out, expiryDate);
+    WritableUtils.writeVInt(out, keyId);
+    WritableUtils.writeString(out, userId);
+    WritableUtils.writeVLong(out, blockId);
+    WritableUtils.writeVInt(out, modes.size());
+    for (AccessMode aMode : modes) {
+      WritableUtils.writeEnum(out, aMode);
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Fri Mar  4 03:56:33 2011
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * BlockTokenSecretManager can be instantiated in 2 modes, master mode and slave
+ * mode. Master can generate new block keys and export block keys to slaves,
+ * while slaves can only import and use block keys received from master. Both
+ * master and slave can generate and verify block tokens. Typically, master mode
+ * is used by NN and slave mode is used by DN.
+ */
+public class BlockTokenSecretManager extends
+    SecretManager<BlockTokenIdentifier> {
+  public static final Log LOG = LogFactory
+      .getLog(BlockTokenSecretManager.class);
+  public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();
+
+  private final boolean isMaster;
+  /*
+   * keyUpdateInterval is the interval that NN updates its block keys. It should
+   * be set long enough so that all live DN's and Balancer should have sync'ed
+   * their block keys with NN at least once during each interval.
+   */
+  private final long keyUpdateInterval;
+  private volatile long tokenLifetime;
+  private int serialNo = new SecureRandom().nextInt();
+  private BlockKey currentKey;
+  private BlockKey nextKey;
+  private Map<Integer, BlockKey> allKeys;
+
+  public static enum AccessMode {
+    READ, WRITE, COPY, REPLACE
+  };
+
+  /**
+   * Constructor
+   * 
+   * @param isMaster
+   * @param keyUpdateInterval
+   * @param tokenLifetime
+   * @throws IOException
+   */
+  public BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
+      long tokenLifetime) throws IOException {
+    this.isMaster = isMaster;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.allKeys = new HashMap<Integer, BlockKey>();
+    generateKeys();
+  }
+
+  /** Initialize block keys */
+  private synchronized void generateKeys() {
+    if (!isMaster)
+      return;
+    /*
+     * Need to set estimated expiry dates for currentKey and nextKey so that if
+     * NN crashes, DN can still expire those keys. NN will stop using the newly
+     * generated currentKey after the first keyUpdateInterval, however it may
+     * still be used by DN and Balancer to generate new tokens before they get a
+     * chance to sync their keys with NN. Since we require keyUpdInterval to be
+     * long enough so that all live DN's and Balancer will sync their keys with
+     * NN at least once during the period, the estimated expiry date for
+     * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
+     * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
+     * more.
+     */
+    serialNo++;
+    currentKey = new BlockKey(serialNo, System.currentTimeMillis() + 2
+        * keyUpdateInterval + tokenLifetime, generateSecret());
+    serialNo++;
+    nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
+        * keyUpdateInterval + tokenLifetime, generateSecret());
+    allKeys.put(currentKey.getKeyId(), currentKey);
+    allKeys.put(nextKey.getKeyId(), nextKey);
+  }
+
+  /** Export block keys, only to be used in master mode */
+  public synchronized ExportedBlockKeys exportKeys() {
+    if (!isMaster)
+      return null;
+    if (LOG.isDebugEnabled())
+      LOG.debug("Exporting access keys");
+    return new ExportedBlockKeys(true, keyUpdateInterval, tokenLifetime,
+        currentKey, allKeys.values().toArray(new BlockKey[0]));
+  }
+
+  private synchronized void removeExpiredKeys() {
+    long now = System.currentTimeMillis();
+    for (Iterator<Map.Entry<Integer, BlockKey>> it = allKeys.entrySet()
+        .iterator(); it.hasNext();) {
+      Map.Entry<Integer, BlockKey> e = it.next();
+      if (e.getValue().getExpiryDate() < now) {
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * Set block keys, only to be used in slave mode
+   */
+  public synchronized void setKeys(ExportedBlockKeys exportedKeys)
+      throws IOException {
+    if (isMaster || exportedKeys == null)
+      return;
+    LOG.info("Setting block keys");
+    removeExpiredKeys();
+    this.currentKey = exportedKeys.getCurrentKey();
+    BlockKey[] receivedKeys = exportedKeys.getAllKeys();
+    for (int i = 0; i < receivedKeys.length; i++) {
+      if (receivedKeys[i] == null)
+        continue;
+      this.allKeys.put(receivedKeys[i].getKeyId(), receivedKeys[i]);
+    }
+  }
+
+  /**
+   * Update block keys, only to be used in master mode
+   */
+  public synchronized void updateKeys() throws IOException {
+    if (!isMaster)
+      return;
+    LOG.info("Updating block keys");
+    removeExpiredKeys();
+    // set final expiry date of retiring currentKey
+    allKeys.put(currentKey.getKeyId(), new BlockKey(currentKey.getKeyId(),
+        System.currentTimeMillis() + keyUpdateInterval + tokenLifetime,
+        currentKey.getKey()));
+    // update the estimated expiry date of new currentKey
+    currentKey = new BlockKey(nextKey.getKeyId(), System.currentTimeMillis()
+        + 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
+    allKeys.put(currentKey.getKeyId(), currentKey);
+    // generate a new nextKey
+    serialNo++;
+    nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
+        * keyUpdateInterval + tokenLifetime, generateSecret());
+    allKeys.put(nextKey.getKeyId(), nextKey);
+  }
+
+  /** Generate an block token for current user */
+  public Token<BlockTokenIdentifier> generateToken(Block block,
+      EnumSet<AccessMode> modes) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String userID = (ugi == null ? null : ugi.getShortUserName());
+    return generateToken(userID, block, modes);
+  }
+
+  /** Generate a block token for a specified user */
+  public Token<BlockTokenIdentifier> generateToken(String userId, Block block,
+      EnumSet<AccessMode> modes) throws IOException {
+    BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
+        .getBlockId(), modes);
+    return new Token<BlockTokenIdentifier>(id, this);
+  }
+
+  /**
+   * Check if access should be allowed. userID is not checked if null. This
+   * method doesn't check if token password is correct. It should be used only
+   * when token password has already been verified (e.g., in the RPC layer).
+   */
+  public void checkAccess(BlockTokenIdentifier id, String userId, Block block,
+      AccessMode mode) throws InvalidToken {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Checking access for user=" + userId + ", block=" + block
+          + ", access mode=" + mode + " using " + id.toString());
+    }
+    if (userId != null && !userId.equals(id.getUserId())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't belong to user " + userId);
+    }
+    if (id.getBlockId() != block.getBlockId()) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't apply to block " + block);
+    }
+    if (isExpired(id.getExpiryDate())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " is expired.");
+    }
+    if (!id.getAccessModes().contains(mode)) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't have " + mode + " permission");
+    }
+  }
+
+  /** Check if access should be allowed. userID is not checked if null */
+  public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
+      Block block, AccessMode mode) throws InvalidToken {
+    BlockTokenIdentifier id = new BlockTokenIdentifier();
+    try {
+      id.readFields(new DataInputStream(new ByteArrayInputStream(token
+          .getIdentifier())));
+    } catch (IOException e) {
+      throw new InvalidToken(
+          "Unable to de-serialize block token identifier for user=" + userId
+              + ", block=" + block + ", access mode=" + mode);
+    }
+    checkAccess(id, userId, block, mode);
+    if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't have the correct token password");
+    }
+  }
+
+  private static boolean isExpired(long expiryDate) {
+    return System.currentTimeMillis() > expiryDate;
+  }
+
+  /**
+   * check if a token is expired. for unit test only. return true when token is
+   * expired, false otherwise
+   */
+  static boolean isTokenExpired(Token<BlockTokenIdentifier> token)
+      throws IOException {
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    long expiryDate = WritableUtils.readVLong(in);
+    return isExpired(expiryDate);
+  }
+
+  /** set token lifetime. */
+  public void setTokenLifetime(long tokenLifetime) {
+    this.tokenLifetime = tokenLifetime;
+  }
+
+  /**
+   * Create an empty block token identifier
+   * 
+   * @return a newly created empty block token identifier
+   */
+  @Override
+  public BlockTokenIdentifier createIdentifier() {
+    return new BlockTokenIdentifier();
+  }
+
+  /**
+   * Create a new password/secret for the given block token identifier.
+   * 
+   * @param identifier
+   *          the block token identifier
+   * @return token password/secret
+   */
+  @Override
+  protected byte[] createPassword(BlockTokenIdentifier identifier) {
+    BlockKey key = null;
+    synchronized (this) {
+      key = currentKey;
+    }
+    if (key == null)
+      throw new IllegalStateException("currentKey hasn't been initialized.");
+    identifier.setExpiryDate(System.currentTimeMillis() + tokenLifetime);
+    identifier.setKeyId(key.getKeyId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generating block token for " + identifier.toString());
+    }
+    return createPassword(identifier.getBytes(), key.getKey());
+  }
+
+  /**
+   * Look up the token password/secret for the given block token identifier.
+   * 
+   * @param identifier
+   *          the block token identifier to look up
+   * @return token password/secret as byte[]
+   * @throws InvalidToken
+   */
+  @Override
+  public byte[] retrievePassword(BlockTokenIdentifier identifier)
+      throws InvalidToken {
+    if (isExpired(identifier.getExpiryDate())) {
+      throw new InvalidToken("Block token with " + identifier.toString()
+          + " is expired.");
+    }
+    BlockKey key = null;
+    synchronized (this) {
+      key = allKeys.get(identifier.getKeyId());
+    }
+    if (key == null) {
+      throw new InvalidToken("Can't re-compute password for "
+          + identifier.toString() + ", since the required block key (keyID="
+          + identifier.getKeyId() + ") doesn't exist.");
+    }
+    return createPassword(identifier.getBytes(), key.getKey());
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java Fri Mar  4 03:56:33 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * A block token selector for HDFS
+ */
+public class BlockTokenSelector implements TokenSelector<BlockTokenIdentifier> {
+
+  @SuppressWarnings("unchecked")
+  public Token<BlockTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) {
+      return null;
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (BlockTokenIdentifier.KIND_NAME.equals(token.getKind())) {
+        return (Token<BlockTokenIdentifier>) token;
+      }
+    }
+    return null;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java Fri Mar  4 03:56:33 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Object for passing block keys
+ */
+public class ExportedBlockKeys implements Writable {
+  public static final ExportedBlockKeys DUMMY_KEYS = new ExportedBlockKeys();
+  private boolean isBlockTokenEnabled;
+  private long keyUpdateInterval;
+  private long tokenLifetime;
+  private BlockKey currentKey;
+  private BlockKey[] allKeys;
+
+  public ExportedBlockKeys() {
+    this(false, 0, 0, new BlockKey(), new BlockKey[0]);
+  }
+
+  ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
+      long tokenLifetime, BlockKey currentKey, BlockKey[] allKeys) {
+    this.isBlockTokenEnabled = isBlockTokenEnabled;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.currentKey = currentKey == null ? new BlockKey() : currentKey;
+    this.allKeys = allKeys == null ? new BlockKey[0] : allKeys;
+  }
+
+  public boolean isBlockTokenEnabled() {
+    return isBlockTokenEnabled;
+  }
+
+  public long getKeyUpdateInterval() {
+    return keyUpdateInterval;
+  }
+
+  public long getTokenLifetime() {
+    return tokenLifetime;
+  }
+
+  public BlockKey getCurrentKey() {
+    return currentKey;
+  }
+
+  public BlockKey[] getAllKeys() {
+    return allKeys;
+  }
+  
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  static { // register a ctor
+    WritableFactories.setFactory(ExportedBlockKeys.class,
+        new WritableFactory() {
+          public Writable newInstance() {
+            return new ExportedBlockKeys();
+          }
+        });
+  }
+
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(isBlockTokenEnabled);
+    out.writeLong(keyUpdateInterval);
+    out.writeLong(tokenLifetime);
+    currentKey.write(out);
+    out.writeInt(allKeys.length);
+    for (int i = 0; i < allKeys.length; i++) {
+      allKeys[i].write(out);
+    }
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    isBlockTokenEnabled = in.readBoolean();
+    keyUpdateInterval = in.readLong();
+    tokenLifetime = in.readLong();
+    currentKey.readFields(in);
+    this.allKeys = new BlockKey[in.readInt()];
+    for (int i = 0; i < allKeys.length; i++) {
+      allKeys[i] = new BlockKey();
+      allKeys[i].readFields(in);
+    }
+  }
+
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java Fri Mar  4 03:56:33 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.IOException;
+
+/**
+ * Access token verification failed.
+ */
+public class InvalidBlockTokenException extends IOException {
+  private static final long serialVersionUID = 168L;
+
+  public InvalidBlockTokenException() {
+    super();
+  }
+
+  public InvalidBlockTokenException(String msg) {
+    super(msg);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Mar  4 03:56:33 2011
@@ -56,9 +56,9 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -76,6 +76,7 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -194,10 +195,10 @@ public class Balancer implements Tool {
   private NamenodeProtocol namenode;
   private ClientProtocol client;
   private FileSystem fs;
-  private boolean isAccessTokenEnabled;
+  private boolean isBlockTokenEnabled;
   private boolean shouldRun;
   private long keyUpdaterInterval;
-  private AccessTokenHandler accessTokenHandler;
+  private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
   private final static Random rnd = new Random();
   
@@ -368,11 +369,11 @@ public class Balancer implements Tool {
       out.writeLong(block.getBlock().getGenerationStamp());
       Text.writeString(out, source.getStorageID());
       proxySource.write(out);
-      BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
-      if (isAccessTokenEnabled) {
-        accessToken = accessTokenHandler.generateToken(null, block.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
-            AccessTokenHandler.AccessMode.COPY));
+      Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+      if (isBlockTokenEnabled) {
+        accessToken = blockTokenSecretManager.generateToken(null, block.getBlock(), 
+            EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
+                BlockTokenSecretManager.AccessMode.COPY));
       }
       accessToken.write(out);
       out.flush();
@@ -859,25 +860,25 @@ public class Balancer implements Tool {
     this.namenode = createNamenode(conf);
     this.client = DFSClient.createNamenode(conf);
     this.fs = FileSystem.get(conf);
-    ExportedAccessKeys keys = namenode.getAccessKeys();
-    this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
-    if (isAccessTokenEnabled) {
-      long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
-      long accessTokenLifetime = keys.getTokenLifetime();
-      LOG.info("Access token params received from NN: keyUpdateInterval="
-          + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-          + accessTokenLifetime / (60 * 1000) + " min(s)");
-      this.accessTokenHandler = new AccessTokenHandler(false,
-          accessKeyUpdateInterval, accessTokenLifetime);
-      this.accessTokenHandler.setKeys(keys);
+    ExportedBlockKeys keys = namenode.getBlockKeys();
+    this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
+    if (isBlockTokenEnabled) {
+      long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+      long blockTokenLifetime = keys.getTokenLifetime();
+      LOG.info("Block token params received from NN: keyUpdateInterval="
+          + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+          + blockTokenLifetime / (60 * 1000) + " min(s)");
+      this.blockTokenSecretManager = new BlockTokenSecretManager(false,
+          blockKeyUpdateInterval, blockTokenLifetime);
+      this.blockTokenSecretManager.setKeys(keys);
       /*
-       * Balancer should sync its access keys with NN more frequently than NN
-       * updates its access keys
+       * Balancer should sync its block keys with NN more frequently than NN
+       * updates its block keys
        */
-      this.keyUpdaterInterval = accessKeyUpdateInterval / 4;
-      LOG.info("Balancer will update its access keys every "
+      this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
+      LOG.info("Balancer will update its block keys every "
           + keyUpdaterInterval / (60 * 1000) + " minute(s)");
-      this.keyupdaterthread = new Daemon(new AccessKeyUpdater());
+      this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
       this.shouldRun = true;
       this.keyupdaterthread.start();
     }
@@ -886,12 +887,12 @@ public class Balancer implements Tool {
   /**
    * Periodically updates access keys.
    */
-  class AccessKeyUpdater implements Runnable {
+  class BlockKeyUpdater implements Runnable {
 
     public void run() {
       while (shouldRun) {
         try {
-          accessTokenHandler.setKeys(namenode.getAccessKeys());
+          blockTokenSecretManager.setKeys(namenode.getBlockKeys());
         } catch (Exception e) {
           LOG.error(StringUtils.stringifyException(e));
         }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Mar  4 03:56:33 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -40,6 +42,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -62,6 +65,9 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
@@ -97,10 +103,9 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -198,9 +203,9 @@ public class DataNode extends Configured
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
   int writePacketSize = 0;
-  boolean isAccessTokenEnabled;
-  AccessTokenHandler accessTokenHandler;
-  boolean isAccessTokenInitialized = false;
+  boolean isBlockTokenEnabled;
+  BlockTokenSecretManager blockTokenSecretManager;
+  boolean isBlockTokenInitialized = false;
   
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
@@ -410,12 +415,16 @@ public class DataNode extends Configured
       ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());
     }
 
+    // BlockTokenSecretManager is created here, but it shouldn't be
+    // used until it is initialized in register().
+    this.blockTokenSecretManager = new BlockTokenSecretManager(false,
+        0, 0);
     //init ipc server
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
         conf.get("dfs.datanode.ipc.address"));
     ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(), 
-        conf.getInt("dfs.datanode.handler.count", 3), false, conf);
-    ipcServer.start();
+        conf.getInt("dfs.datanode.handler.count", 3), false, conf,
+        blockTokenSecretManager);
     dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
 
     LOG.info("dnRegistration = " + dnRegistration);
@@ -575,25 +584,24 @@ public class DataNode extends Configured
           + ". Expecting " + storage.getStorageID());
     }
     
-    if (!isAccessTokenInitialized) {
+    if (!isBlockTokenInitialized) {
       /* first time registering with NN */
-      ExportedAccessKeys keys = dnRegistration.exportedKeys;
-      this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
-      if (isAccessTokenEnabled) {
-        long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
-        long accessTokenLifetime = keys.getTokenLifetime();
-        LOG.info("Access token params received from NN: keyUpdateInterval="
-            + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-            + accessTokenLifetime / (60 * 1000) + " min(s)");
-        this.accessTokenHandler = new AccessTokenHandler(false,
-            accessKeyUpdateInterval, accessTokenLifetime);
-      }
-      isAccessTokenInitialized = true;
-    }
-
-    if (isAccessTokenEnabled) {
-      accessTokenHandler.setKeys(dnRegistration.exportedKeys);
-      dnRegistration.exportedKeys = ExportedAccessKeys.DUMMY_KEYS;
+      ExportedBlockKeys keys = dnRegistration.exportedKeys;
+      this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
+      if (isBlockTokenEnabled) {
+        long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+        long blockTokenLifetime = keys.getTokenLifetime();
+        LOG.info("Block token params received from NN: keyUpdateInterval="
+            + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+            + blockTokenLifetime / (60 * 1000) + " min(s)");
+        blockTokenSecretManager.setTokenLifetime(blockTokenLifetime);
+      }
+      isBlockTokenInitialized = true;
+    }
+
+    if (isBlockTokenEnabled) {
+      blockTokenSecretManager.setKeys(dnRegistration.exportedKeys);
+      dnRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
     }
 
     // random short delay - helps scatter the BR from all DNs
@@ -962,8 +970,8 @@ public class DataNode extends Configured
       break;
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
-      if (isAccessTokenEnabled) {
-        accessTokenHandler.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
+      if (isBlockTokenEnabled) {
+        blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
       }
       break;
     default:
@@ -1222,10 +1230,10 @@ public class DataNode extends Configured
         for (int i = 1; i < targets.length; i++) {
           targets[i].write(out);
         }
-        BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
-        if (isAccessTokenEnabled) {
-          accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
-              EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
+        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+        if (isBlockTokenEnabled) {
+          accessToken = blockTokenSecretManager.generateToken(null, b,
+              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
         accessToken.write(out);
         // send data & checksum
@@ -1261,6 +1269,7 @@ public class DataNode extends Configured
 
     // start dataXceiveServer
     dataXceiverServer.start();
+    ipcServer.start();
         
     while (shouldRun) {
       try {
@@ -1631,9 +1640,9 @@ public class DataNode extends Configured
           DatanodeID.EMPTY_ARRAY);
       //always return a new access token even if everything else stays the same
       LocatedBlock b = new LocatedBlock(block, targets);
-      if (isAccessTokenEnabled) {
-        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+      if (isBlockTokenEnabled) {
+        b.setBlockToken(blockTokenSecretManager.generateToken(null, b.getBlock(), 
+            EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
       }
       return b;
     }
@@ -1666,9 +1675,9 @@ public class DataNode extends Configured
       LocatedBlock b = new LocatedBlock(newblock, info); // success
       // should have used client ID to generate access token, but since 
       // owner ID is not checked, we simply pass null for now.
-      if (isAccessTokenEnabled) {
-        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+      if (isBlockTokenEnabled) {
+        b.setBlockToken(blockTokenSecretManager.generateToken(null, b.getBlock(), 
+            EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
       }
       return b;
     }
@@ -1687,6 +1696,23 @@ public class DataNode extends Configured
   public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
       ) throws IOException {
     logRecoverBlock("Client", block, targets);
+    if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+      Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
+          .getTokenIdentifiers();
+      if (tokenIds.size() != 1) {
+        throw new IOException("Can't continue with recoverBlock() "
+            + "authorization since " + tokenIds.size() + " BlockTokenIdentifier "
+            + "is found.");
+      }
+      for (TokenIdentifier tokenId : tokenIds) {
+        BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got: " + id.toString());
+        }
+        blockTokenSecretManager.checkAccess(id, null, block,
+            BlockTokenSecretManager.AccessMode.WRITE);
+      }
+    }
     return recoverBlock(block, keepLength, targets, false);
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Mar  4 03:56:33 2011
@@ -32,14 +32,16 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
@@ -151,23 +153,26 @@ class DataXceiver implements Runnable, F
     long startOffset = in.readLong();
     long length = in.readLong();
     String clientName = Text.readString(in);
-    BlockAccessToken accessToken = new BlockAccessToken();
+    Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
     accessToken.readFields(in);
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
     
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
-            AccessTokenHandler.AccessMode.READ)) {
+    if (datanode.isBlockTokenEnabled) {
       try {
-        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
-        out.flush();
-        throw new IOException("Access token verification failed, for client "
-            + remoteAddress + " for OP_READ_BLOCK for block " + block);
-      } finally {
-        IOUtils.closeStream(out);
+        datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
+            BlockTokenSecretManager.AccessMode.READ);
+      } catch (InvalidToken e) {
+        try {
+          out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+          out.flush();
+          throw new IOException("Access token verification failed, for client "
+              + remoteAddress + " for OP_READ_BLOCK for block " + block);
+        } finally {
+          IOUtils.closeStream(out);
+        }
       }
     }
     // send the block
@@ -258,24 +263,27 @@ class DataXceiver implements Runnable, F
       tmp.readFields(in);
       targets[i] = tmp;
     }
-    BlockAccessToken accessToken = new BlockAccessToken();
+    Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
     accessToken.readFields(in);
     DataOutputStream replyOut = null;   // stream to prev target
     replyOut = new DataOutputStream(
                    NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
-            .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
+    if (datanode.isBlockTokenEnabled) {
       try {
-        if (client.length() != 0) {
-          replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
-          Text.writeString(replyOut, datanode.dnRegistration.getName());
-          replyOut.flush();
+        datanode.blockTokenSecretManager.checkAccess(accessToken, null, block, 
+            BlockTokenSecretManager.AccessMode.WRITE);
+      } catch (InvalidToken e) {
+        try {
+          if (client.length() != 0) {
+            replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+            Text.writeString(replyOut, datanode.dnRegistration.getName());
+            replyOut.flush();
+          }
+          throw new IOException("Access token verification failed, for client "
+              + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
+        } finally {
+          IOUtils.closeStream(replyOut);
         }
-        throw new IOException("Access token verification failed, for client "
-            + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
-      } finally {
-        IOUtils.closeStream(replyOut);
       }
     }
 
@@ -423,21 +431,24 @@ class DataXceiver implements Runnable, F
    */
   void getBlockChecksum(DataInputStream in) throws IOException {
     final Block block = new Block(in.readLong(), 0 , in.readLong());
-    BlockAccessToken accessToken = new BlockAccessToken();
+    Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
     accessToken.readFields(in);
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
         datanode.socketWriteTimeout));
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
-            .getBlockId(), AccessTokenHandler.AccessMode.READ)) {
+    if (datanode.isBlockTokenEnabled) {
       try {
-        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
-        out.flush();
-        throw new IOException(
-            "Access token verification failed, for client " + remoteAddress
-                + " for OP_BLOCK_CHECKSUM for block " + block);
-      } finally {
-        IOUtils.closeStream(out);
+        datanode.blockTokenSecretManager.checkAccess(accessToken, null, block, 
+            BlockTokenSecretManager.AccessMode.READ);
+      } catch (InvalidToken e) {
+        try {
+          out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+          out.flush();
+          throw new IOException(
+              "Access token verification failed, for client " + remoteAddress
+                  + " for OP_BLOCK_CHECKSUM for block " + block);
+        } finally {
+          IOUtils.closeStream(out);
+        }
       }
     }
 
@@ -484,17 +495,20 @@ class DataXceiver implements Runnable, F
     // Read in the header
     long blockId = in.readLong(); // read block id
     Block block = new Block(blockId, 0, in.readLong());
-    BlockAccessToken accessToken = new BlockAccessToken();
+    Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
     accessToken.readFields(in);
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
-            AccessTokenHandler.AccessMode.COPY)) {
-      LOG.warn("Invalid access token in request from "
-          + remoteAddress + " for OP_COPY_BLOCK for block " + block);
-      sendResponse(s,
-          (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
-          datanode.socketWriteTimeout);
-      return;
+    if (datanode.isBlockTokenEnabled) {
+      try {
+        datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
+            BlockTokenSecretManager.AccessMode.COPY);
+      } catch (InvalidToken e) {
+        LOG.warn("Invalid access token in request from "
+            + remoteAddress + " for OP_COPY_BLOCK for block " + block);
+        sendResponse(s,
+            (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
+            datanode.socketWriteTimeout);
+        return;
+      }
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
@@ -562,16 +576,19 @@ class DataXceiver implements Runnable, F
     String sourceID = Text.readString(in); // read del hint
     DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
     proxySource.readFields(in);
-    BlockAccessToken accessToken = new BlockAccessToken();
+    Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
     accessToken.readFields(in);
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
-            AccessTokenHandler.AccessMode.REPLACE)) {
-      LOG.warn("Invalid access token in request from "
-          + remoteAddress + " for OP_REPLACE_BLOCK for block " + block);
-      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
-          datanode.socketWriteTimeout);
-      return;
+    if (datanode.isBlockTokenEnabled) {
+      try {
+        datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
+            BlockTokenSecretManager.AccessMode.REPLACE);
+      } catch (InvalidToken e) {
+        LOG.warn("Invalid access token in request from "
+            + remoteAddress + " for OP_REPLACE_BLOCK for block " + block);
+        sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
+            datanode.socketWriteTimeout);
+        return;
+      }
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar  4 03:56:33 2011
@@ -20,8 +20,11 @@ package org.apache.hadoop.hdfs.server.na
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -30,8 +33,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.Token;
@@ -139,7 +140,7 @@ public class FSNamesystem implements FSC
   private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
   private int totalLoad = 0;
   boolean isAccessTokenEnabled;
-  AccessTokenHandler accessTokenHandler;
+  BlockTokenSecretManager accessTokenHandler;
   private long accessKeyUpdateInterval;
   private long accessTokenLifetime;
   
@@ -344,7 +345,7 @@ public class FSNamesystem implements FSC
                             conf.getInt("dfs.replication.pending.timeout.sec", 
                                         -1) * 1000L);
     if (isAccessTokenEnabled) {
-      accessTokenHandler = new AccessTokenHandler(true,
+      accessTokenHandler = new BlockTokenSecretManager(true,
           accessKeyUpdateInterval, accessTokenLifetime);
     }
     this.hbthread = new Daemon(new HeartbeatMonitor());
@@ -463,12 +464,12 @@ public class FSNamesystem implements FSC
     this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0);
     this.supportAppends = conf.getBoolean("dfs.support.append", false);
     this.isAccessTokenEnabled = conf.getBoolean(
-        AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, false);
     if (isAccessTokenEnabled) {
       this.accessKeyUpdateInterval = conf.getLong(
-          AccessTokenHandler.STRING_ACCESS_KEY_UPDATE_INTERVAL, 600) * 60 * 1000L; // 10 hrs
+          DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY, 600) * 60 * 1000L; // 10 hrs
       this.accessTokenLifetime = conf.getLong(
-          AccessTokenHandler.STRING_ACCESS_TOKEN_LIFETIME, 600) * 60 * 1000L; // 10 hrs
+          DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, 600) * 60 * 1000L; // 10 hrs
     }
     LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
         + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
@@ -714,9 +715,9 @@ public class FSNamesystem implements FSC
    * 
    * @return current access keys
    */
-  ExportedAccessKeys getAccessKeys() {
+  ExportedBlockKeys getBlockKeys() {
     return isAccessTokenEnabled ? accessTokenHandler.exportKeys()
-        : ExportedAccessKeys.DUMMY_KEYS;
+        : ExportedBlockKeys.DUMMY_KEYS;
   }
 
   /**
@@ -913,8 +914,8 @@ public class FSNamesystem implements FSC
       LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
           blockCorrupt);
       if (isAccessTokenEnabled) {
-        b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+        b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), 
+            EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
       }
       results.add(b); 
       curPos += blocks[curBlk].getNumBytes();
@@ -1265,8 +1266,8 @@ public class FSNamesystem implements FSC
           lb = new LocatedBlock(last, targets, 
                                 fileLength-storedBlock.getNumBytes());
           if (isAccessTokenEnabled) {
-            lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
-                .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+            lb.setBlockToken(accessTokenHandler.generateToken(lb.getBlock(), 
+                EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
           }
 
           // Remove block from replication queue.
@@ -1379,8 +1380,8 @@ public class FSNamesystem implements FSC
     // Create next block
     LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
     if (isAccessTokenEnabled) {
-      b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
-          .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+      b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), 
+          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
     }
     return b;
   }
@@ -2139,7 +2140,7 @@ public class FSNamesystem implements FSC
                                       nodeReg.getInfoPort(),
                                       nodeReg.getIpcPort());
     nodeReg.updateRegInfo(dnReg);
-    nodeReg.exportedKeys = getAccessKeys();
+    nodeReg.exportedKeys = getBlockKeys();
       
     NameNode.stateChangeLog.info(
                                  "BLOCK* NameSystem.registerDatanode: "



Mime
View raw message