hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject svn commit: r1205698 - in /hadoop/common/branches/branch-0.20-security-205: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/m...
Date Thu, 24 Nov 2011 01:52:45 GMT
Author: jitendra
Date: Thu Nov 24 01:52:43 2011
New Revision: 1205698

URL: http://svn.apache.org/viewvc?rev=1205698&view=rev
Log:
Merged r1205243 from branch-0.20-security for HDFS-2246.

Added:
    hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
      - copied unchanged from r1205243, hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
      - copied unchanged from r1205243, hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
      - copied unchanged from r1205243, hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
Modified:
    hadoop/common/branches/branch-0.20-security-205/CHANGES.txt
    hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
    hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java
    hadoop/common/branches/branch-0.20-security-205/src/test/commit-tests
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/common/branches/branch-0.20-security-205/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/CHANGES.txt?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-205/CHANGES.txt Thu Nov 24 01:52:43 2011
@@ -46,6 +46,9 @@ Release 0.20.205.1 - unreleased
 
     HADOOP-7810. Add tools jar to the end of classpath. (John George via omalley)
 
+    HDFS-2246. Shortcut a local client reads to a Datanodes files directly. 
+    (Andrew Purtell, Suresh, Jitendra)
+
   BUG FIXES
 
     HADOOP-7827. jsp pages missing DOCTYPE. (Dave Vronay via mattf)

Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Thu Nov 24 01:52:43 2011
@@ -93,6 +93,7 @@ public class DFSClient implements FSCons
   final int writePacketSize;
   private final FileSystem.Statistics stats;
   private int maxBlockAcquireFailures;
+  private boolean shortCircuitLocalReads;
 
   /**
    * We assume we're talking to another CDH server, which supports
@@ -144,6 +145,7 @@ public class DFSClient implements FSCons
         rpcNamenode, methodNameToPolicyMap);
   }
 
+  /** Create {@link ClientDatanodeProtocol} proxy with block/token */
   static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
       DatanodeID datanodeid, Configuration conf, 
       Block block, Token<BlockTokenIdentifier> token, int socketTimeout) throws IOException
{
@@ -160,6 +162,20 @@ public class DFSClient implements FSCons
         .getDefaultSocketFactory(conf), socketTimeout);
   }
         
+  /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
+  static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf, int socketTimeout)
+      throws IOException {
+    InetSocketAddress addr = NetUtils.createSocketAddr(
+      datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+    if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
+      ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
+    }
+    return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
+        ClientDatanodeProtocol.versionID, addr, conf, NetUtils
+        .getDefaultSocketFactory(conf), socketTimeout);
+  }
+  
   /**
    * Same as this(NameNode.getAddress(conf), conf);
    * @see #DFSClient(InetSocketAddress, Configuration)
@@ -206,7 +222,7 @@ public class DFSClient implements FSCons
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
     this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
-    
+
     ugi = UserGroupInformation.getCurrentUser();
 
     String taskId = conf.get("mapred.task.id");
@@ -229,6 +245,13 @@ public class DFSClient implements FSCons
           "Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
           + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
     }
+    // read directly from the block file if configured.
+    this.shortCircuitLocalReads = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Short circuit read is " + shortCircuitLocalReads);
+    }
   }
 
   static int getMaxBlockAcquireFailures(Configuration conf) {
@@ -325,6 +348,82 @@ public class DFSClient implements FSCons
   }
 
   /**
+   * Get {@link BlockReader} for short circuited local reads.
+   */
+  private static BlockReader getLocalBlockReader(Configuration conf,
+      String src, Block blk, Token<BlockTokenIdentifier> accessToken,
+      DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
+      throws InvalidToken, IOException {
+    try {
+      return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
+          chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
+              - offsetIntoBlock);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(InvalidToken.class,
+          AccessControlException.class);
+    }
+  }
+  
+  private static Set<String> localIpAddresses = Collections
+      .synchronizedSet(new HashSet<String>());
+  
+  private static boolean isLocalAddress(InetSocketAddress targetAddr) {
+    InetAddress addr = targetAddr.getAddress();
+    if (localIpAddresses.contains(addr.getHostAddress())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Address " + targetAddr + " is local");
+      }
+      return true;
+    }
+
+    // Check if the address is any local or loop back
+    boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+    // Check if the address is defined on any interface
+    if (!local) {
+      try {
+        local = NetworkInterface.getByInetAddress(addr) != null;
+      } catch (SocketException e) {
+        local = false;
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Address " + targetAddr + " is local");
+    }
+    if (local == true) {
+      localIpAddresses.add(addr.getHostAddress());
+    }
+    return local;
+  }
+  
+  /**
+   * Should the block access token be refetched on an exception
+   * 
+   * @param ex Exception received
+   * @param targetAddr Target datanode address from where exception was received
+   * @return true if block access token has expired or invalid and it should be
+   *         refetched
+   */
+  private static boolean tokenRefetchNeeded(IOException ex,
+      InetSocketAddress targetAddr) {
+    /*
+     * Get a new access token and retry. Retry is needed in 2 cases. 1) When
+     * both NN and DN re-started while DFSClient holding a cached access token.
+     * 2) In the case that NN fails to update its access key at pre-set interval
+     * (by a wide margin) and subsequently restarts. In this case, DN
+     * re-registers itself with NN and receives a new access key, but DN will
+     * delete the old access key from its memory since it's considered expired
+     * based on the estimated expiration date.
+     */
+    if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
+      LOG.info("Access token was invalid when connecting to " + targetAddr
+          + " : " + ex);
+      return true;
+    }
+    return false;
+  }
+  
+  /**
    * Cancel a delegation token
    * @param token the token to cancel
    * @throws InvalidToken
@@ -1312,16 +1411,16 @@ public class DFSClient implements FSCons
 
     private Socket dnSock; //for now just sending checksumOk.
     private DataInputStream in;
-    private DataChecksum checksum;
-    private long lastChunkOffset = -1;
-    private long lastChunkLen = -1;
+    protected DataChecksum checksum;
+    protected long lastChunkOffset = -1;
+    protected long lastChunkLen = -1;
     private long lastSeqNo = -1;
 
-    private long startOffset;
-    private long firstChunkOffset;
-    private int bytesPerChecksum;
-    private int checksumSize;
-    private boolean gotEOS = false;
+    protected long startOffset;
+    protected long firstChunkOffset;
+    protected int bytesPerChecksum;
+    protected int checksumSize;
+    protected boolean gotEOS = false;
     
     byte[] skipBuf = null;
     ByteBuffer checksumBytes = null;
@@ -1358,7 +1457,8 @@ public class DFSClient implements FSCons
       int nRead = super.read(buf, off, len);
       
       // if gotEOS was set in the previous read and checksum is enabled :
-      if (gotEOS && !eosBefore && nRead >= 0 && needChecksum())
{
+      if (dnSock != null && gotEOS && !eosBefore && nRead >= 0
+          && needChecksum()) {
         //checksum is verified and there are no errors.
         checksumOk(dnSock);
       }
@@ -1536,14 +1636,44 @@ public class DFSClient implements FSCons
       checksumSize = this.checksum.getChecksumSize();
     }
 
+    /**
+     * Public constructor 
+     */  
+    BlockReader(Path file, int numRetries) {
+      super(file, numRetries);
+    }
+
+    protected BlockReader(Path file, int numRetries, DataChecksum checksum,
+        boolean verifyChecksum) {
+      super(file,
+          numRetries,
+          verifyChecksum,
+          checksum.getChecksumSize() > 0? checksum : null,
+              checksum.getBytesPerChecksum(),
+              checksum.getChecksumSize());
+    }
+
     public static BlockReader newBlockReader(Socket sock, String file, long blockId, Token<BlockTokenIdentifier>
accessToken, 
         long genStamp, long startOffset, long len, int bufferSize) throws IOException {
       return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len,
bufferSize,
           true);
     }
 
-    /** Java Doc required */
-    public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
+    /** 
+     * Creates a new {@link BlockReader} for the given blockId.
+     * @param sock Socket to read the block.
+     * @param file File to which this block belongs.
+     * @param blockId Block id.
+     * @param accessToken Block access token.
+     * @param genStamp Generation stamp of the block.
+     * @param startOffset Start offset for the data.
+     * @param len Length to be read.
+     * @param bufferSize Buffer size to use.
+     * @param verifyChecksum Checksum verification is required or not.
+     * @return BlockReader object.
+     * @throws IOException
+     */
+    public static BlockReader newBlockReader(Socket sock, String file, long blockId, 
                                        Token<BlockTokenIdentifier> accessToken,
                                        long genStamp,
                                        long startOffset, long len,
@@ -1887,6 +2017,14 @@ public class DFSClient implements FSCons
       return blockRange;
     }
 
+    private boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
+        throws IOException {
+      if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
+        return true;
+      }
+      return false;
+    }
+    
     /**
      * Open a DataInputStream to a DataNode so that it can be read from.
      * We get block ID and the IDs of the destinations at startup, from the namenode.
@@ -1923,13 +2061,37 @@ public class DFSClient implements FSCons
         chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
 
+        // try reading the block locally. if this fails, then go via
+        // the datanode
+        Block blk = targetBlock.getBlock();
+        Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+        if (shouldTryShortCircuitRead(targetAddr)) {
+          try {
+            blockReader = getLocalBlockReader(conf, src, blk, accessToken,
+                chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock);
+            return chosenNode;
+          } catch (AccessControlException ex) {
+            LOG.warn("Short circuit access failed ", ex);
+            //Disable short circuit reads
+            shortCircuitLocalReads = false;
+          } catch (IOException ex) {
+            if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
+              /* Get a new access token and retry. */
+              refetchToken--;
+              fetchBlockAt(target);
+              continue;
+            } else {
+              LOG.info("Failed to read block " + targetBlock.getBlock()
+                  + " on local machine" + StringUtils.stringifyException(ex));
+              LOG.info("Try reading via the datanode on " + targetAddr);
+            }
+          }
+        }
+
         try {
           s = socketFactory.createSocket();
           NetUtils.connect(s, targetAddr, socketTimeout);
           s.setSoTimeout(socketTimeout);
-          Block blk = targetBlock.getBlock();
-          Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
-          
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
               accessToken, 
               blk.getGenerationStamp(),
@@ -1937,20 +2099,7 @@ public class DFSClient implements FSCons
               buffersize, verifyChecksum, clientName);
           return chosenNode;
         } catch (IOException ex) {
-          if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
-            LOG.info("Will fetch a new access token and retry, " 
-                + "access token was invalid when connecting to " + targetAddr
-                + " : " + ex);
-            /*
-             * Get a new access token and retry. Retry is needed in 2 cases. 1)
-             * When both NN and DN re-started while DFSClient holding a cached
-             * access token. 2) In the case that NN fails to update its
-             * access key at pre-set interval (by a wide margin) and
-             * subsequently restarts. In this case, DN re-registers itself with
-             * NN and receives a new access key, but DN will delete the old
-             * access key from its memory since it's considered expired based on
-             * the estimated expiration date.
-             */
+          if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
             refetchToken--;
             fetchBlockAt(target);
           } else {
@@ -1965,8 +2114,7 @@ public class DFSClient implements FSCons
           if (s != null) {
             try {
               s.close();
-            } catch (IOException iex) {
-            }                        
+            } catch (IOException iex) { }                        
           }
           s = null;
         }
@@ -2154,21 +2302,31 @@ public class DFSClient implements FSCons
         DatanodeInfo chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
         BlockReader reader = null;
-            
+
+        int len = (int) (end - start + 1);
         try {
-          dn = socketFactory.createSocket();
-          NetUtils.connect(dn, targetAddr, socketTimeout);
-          dn.setSoTimeout(socketTimeout);
           Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
-              
-          int len = (int) (end - start + 1);
-              
-          reader = BlockReader.newBlockReader(dn, src, 
-                                              block.getBlock().getBlockId(),
-                                              accessToken,
-                                              block.getBlock().getGenerationStamp(),
-                                              start, len, buffersize, 
-                                              verifyChecksum, clientName);
+          // first try reading the block locally.
+          if (shouldTryShortCircuitRead(targetAddr)) {
+            try {
+              reader = getLocalBlockReader(conf, src, block.getBlock(),
+                  accessToken, chosenNode, DFSClient.this.socketTimeout, start);
+            } catch (AccessControlException ex) {
+              LOG.warn("Short circuit access failed ", ex);
+              //Disable short circuit reads
+              shortCircuitLocalReads = false;
+              continue;
+            }
+          } else {
+            // go to the datanode
+            dn = socketFactory.createSocket();
+            NetUtils.connect(dn, targetAddr, socketTimeout);
+            dn.setSoTimeout(socketTimeout);
+            reader = BlockReader.newBlockReader(dn, src, 
+                block.getBlock().getBlockId(), accessToken,
+                block.getBlock().getGenerationStamp(), start, len, buffersize, 
+                verifyChecksum, clientName);
+          }
           int nread = reader.readAll(buf, offset, len);
           if (nread != len) {
             throw new IOException("truncated return from reader.read(): " +
@@ -2181,10 +2339,7 @@ public class DFSClient implements FSCons
                    e.getPos() + " from " + chosenNode.getName());
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
-          if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
-            LOG.info("Will get a new access token and retry, "
-                + "access token was invalid when connecting to " + targetAddr
-                + " : " + e);
+          if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
             refetchToken--;
             fetchBlockAt(block.getStartOffset());
             continue;
@@ -3314,7 +3469,8 @@ public class DFSClient implements FSCons
 
       } catch (IOException ie) {
 
-        LOG.info("Exception in createBlockOutputStream " + ie);
+        LOG.info("Exception in createBlockOutputStream " + nodes[0].getName() +
+            " " + ie);
 
         // find the datanode that matches
         if (firstBadLink.length() != 0) {

Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
(original)
+++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
Thu Nov 24 01:52:43 2011
@@ -197,6 +197,10 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
   public static final String  DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
   public static final int     DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 100;
+  public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
+  public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
+  public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
+  public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
 
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
@@ -219,4 +223,5 @@ public class DFSConfigKeys extends Commo
   
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
+  public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
 }

Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
(original)
+++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
Thu Nov 24 01:52:43 2011
@@ -21,12 +21,18 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
 
 /** An client-datanode protocol for block recovery
  */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
 @TokenInfo(BlockTokenSelector.class)
 public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
@@ -55,4 +61,29 @@ public interface ClientDatanodeProtocol 
    * @throws IOException if the block does not exist
    */
   Block getBlockInfo(Block block) throws IOException;
+
+  /**
+   * Retrieves the path names of the block file and metadata file stored on the
+   * local file system.
+   * 
+   * In order for this method to work, one of the following should be satisfied:
+   * <ul>
+   * <li>
+   * The client user must be configured at the datanode to be able to use this
+   * method.</li>
+   * <li>
+   * When security is enabled, kerberos authentication must be used to connect
+   * to the datanode.</li>
+   * </ul>
+   * 
+   * @param block
+   *          the specified block on the local datanode
+   * @param token 
+   *          the block access token.
+   * @return the BlockLocalPathInfo of a block
+   * @throws IOException
+   *           on error
+   */
+  BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+      Token<BlockTokenIdentifier> token) throws IOException;           
 }

Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
(original)
+++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
Thu Nov 24 01:52:43 2011
@@ -26,6 +26,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
 
 /**
@@ -33,7 +35,9 @@ import org.apache.hadoop.util.DataChecks
  * This is not related to the Block related functionality in Namenode.
  * The biggest part of data block metadata is CRC for the block.
  */
-class BlockMetadataHeader {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMetadataHeader {
 
   static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
   
@@ -49,12 +53,14 @@ class BlockMetadataHeader {
     this.checksum = checksum;
     this.version = version;
   }
-    
-  short getVersion() {
+  
+  /** Get the version */
+  public short getVersion() {
     return version;
   }
 
-  DataChecksum getChecksum() {
+  /** Get the version */
+  public DataChecksum getChecksum() {
     return checksum;
   }
 
@@ -65,7 +71,7 @@ class BlockMetadataHeader {
    * @return Metadata Header
    * @throws IOException
    */
-  static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+  public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
     return readHeader(in.readShort(), in);
   }
   

Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Thu Nov 24 01:52:43 2011
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
@@ -62,6 +64,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -71,6 +74,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -110,8 +114,10 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -225,6 +231,7 @@ public class DataNode extends Configured
   boolean isBlockTokenEnabled;
   BlockTokenSecretManager blockTokenSecretManager;
   boolean isBlockTokenInitialized = false;
+  final String userWithLocalPathAccess;
 
   /**
    * Testing hook that allows tests to delay the sending of blockReceived RPCs
@@ -277,6 +284,8 @@ public class DataNode extends Configured
 
     datanodeObject = this;
     supportAppends = conf.getBoolean("dfs.support.append", false);
+    this.userWithLocalPathAccess = conf
+        .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
     try {
       startDataNode(conf, dataDirs, resources);
     } catch (IOException ie) {
@@ -1722,6 +1731,89 @@ public class DataNode extends Configured
     throw new IOException("Unknown protocol to " + getClass().getSimpleName()
         + ": " + protocol);
   }
+  
+  /** Ensure the authentication method is kerberos */
+  private void checkKerberosAuthMethod(String msg) throws IOException {
+    // User invoking the call must be same as the datanode user
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() != 
+        AuthenticationMethod.KERBEROS) {
+      throw new AccessControlException("Error in "+msg+". Only "
+          + "kerberos based authentication is allowed.");
+    }
+  }
+  
+  private void checkBlockLocalPathAccess() throws IOException {
+    checkKerberosAuthMethod("getBlockLocalPathInfo()");
+    String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (!currentUser.equals(this.userWithLocalPathAccess)) {
+      throw new AccessControlException(
+          "Can't continue with getBlockLocalPathInfo() "
+              + "authorization. The user " + currentUser
+              + " is not allowed to call getBlockLocalPathInfo");
+    }
+  }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    checkBlockLocalPathAccess();
+    checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+    BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
+    if (LOG.isDebugEnabled()) {
+      if (info != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("getBlockLocalPathInfo successful block=" + block
+              + " blockfile " + info.getBlockPath() + " metafile "
+              + info.getMetaPath());
+        }
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("getBlockLocalPathInfo for block=" + block
+              + " returning null");
+        }
+      }
+    }
+    myMetrics.incrBlocksGetLocalPathInfo();
+    return info;
+  }
+  
+  private void checkBlockToken(Block block, Token<BlockTokenIdentifier> token,
+      AccessMode accessMode) throws IOException {
+    if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+      BlockTokenIdentifier id = new BlockTokenIdentifier();
+      ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+      DataInputStream in = new DataInputStream(buf);
+      id.readFields(in);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got: " + id.toString());
+      }
+      blockTokenSecretManager.checkAccess(id, null, block, accessMode);
+    }
+  }
+
+  /** Check block access token for the given access mode */
+  private void checkBlockToken(Block block,
+      BlockTokenSecretManager.AccessMode accessMode) throws IOException {
+    if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+      Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
+          .getTokenIdentifiers();
+      if (tokenIds.size() != 1) {
+        throw new IOException("Can't continue with "
+            + "authorization since " + tokenIds.size()
+            + " BlockTokenIdentifier " + "is found.");
+      }
+      for (TokenIdentifier tokenId : tokenIds) {
+        BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got: " + id.toString());
+        }
+        blockTokenSecretManager.checkAccess(id, null, block, accessMode);
+      }
+    }
+  }
 
   /** A convenient class used in lease recovery */
   private static class BlockRecord { 
@@ -1923,28 +2015,13 @@ public class DataNode extends Configured
   public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
       ) throws IOException {
     logRecoverBlock("Client", block, targets);
-    if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
-      Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
-          .getTokenIdentifiers();
-      if (tokenIds.size() != 1) {
-        throw new IOException("Can't continue with recoverBlock() "
-            + "authorization since " + tokenIds.size() + " BlockTokenIdentifier "
-            + "is found.");
-      }
-      for (TokenIdentifier tokenId : tokenIds) {
-        BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Got: " + id.toString());
-        }
-        blockTokenSecretManager.checkAccess(id, null, block,
-            BlockTokenSecretManager.AccessMode.WRITE);
-      }
-    }
+    checkBlockToken(block, BlockTokenSecretManager.AccessMode.WRITE);
     return recoverBlock(block, keepLength, targets, false);
   }
 
   /** {@inheritDoc} */
   public Block getBlockInfo(Block block) throws IOException {
+    checkBlockToken(block, BlockTokenSecretManager.AccessMode.READ);
     Block stored = data.getStoredBlock(block.getBlockId());
     return stored;
   }

Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Thu Nov 24 01:52:43 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
@@ -1047,6 +1048,16 @@ public class FSDataset implements FSCons
     return f;
   }
   
+  @Override //FSDatasetInterface
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
+      throws IOException {
+    File datafile = getBlockFile(block);
+    File metafile = getMetaFile(datafile, block);
+    BlockLocalPathInfo info = new BlockLocalPathInfo(block,
+        datafile.getAbsolutePath(), metafile.getAbsolutePath());
+    return info;
+  }
+  
   public synchronized InputStream getBlockInputStream(Block b) throws IOException {
     return new FileInputStream(getBlockFile(b));
   }

Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
(original)
+++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
Thu Nov 24 01:52:43 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
 
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,6 +31,7 @@ import java.io.OutputStream;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
@@ -309,4 +311,8 @@ public interface FSDatasetInterface exte
   public boolean hasEnoughResource();
 
   public BlockRecoveryInfo startBlockRecovery(long blockId) throws IOException;
+  /**
+   * Get {@link BlockLocalPathInfo} for the given block.
+   **/
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block b) throws IOException;
 }

Modified: hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java
(original)
+++ hadoop/common/branches/branch-0.20-security-205/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java
Thu Nov 24 01:52:43 2011
@@ -49,6 +49,8 @@ public class DataNodeInstrumentation imp
       registry.newCounter("blocks_verified", "", 0);
   final MetricMutableCounterInt blockVerificationFailures =
       registry.newCounter("block_verification_failures", "", 0);
+  final MetricMutableCounterInt blocksGetLocalPathInfo = 
+      registry.newCounter("blocks_get_local_pathinfo", "", 0);
 
   final MetricMutableCounterInt readsFromLocalClient =
       registry.newCounter("reads_from_local_client", "", 0);
@@ -131,6 +133,11 @@ public class DataNodeInstrumentation imp
   }
 
   //@Override
+  public void incrBlocksGetLocalPathInfo() {
+    blocksGetLocalPathInfo.incr();
+  }
+
+  //@Override
   public void addReadBlockOp(long latency) {
     readBlockOp.add(latency);
   }

Modified: hadoop/common/branches/branch-0.20-security-205/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/test/commit-tests?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/test/commit-tests (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/test/commit-tests Thu Nov 24 01:52:43
2011
@@ -110,6 +110,7 @@
 **/TestFileAppend.java
 **/TestFileCorruption.java
 **/TestFileLimit.java
+**/TestShortCircuitLocalRead.java
 **/TestFileStatus.java
 **/TestFSInputChecker.java
 **/TestFSOutputSummer.java

Modified: hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1205698&r1=1205697&r2=1205698&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
(original)
+++ hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Thu Nov 24 01:52:43 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -31,6 +32,7 @@ import javax.management.StandardMBean;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
@@ -695,4 +697,9 @@ public class SimulatedFSDataset  impleme
     Block stored = getStoredBlock(blockId);
     return new BlockRecoveryInfo(stored, false);
   }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block blk) throws IOException {
+    throw new IOException("getBlockLocalPathInfo not supported.");
+  }
 }



Mime
View raw message