hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject svn commit: r1205243 - in /hadoop/common/branches/branch-0.20-security: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/metri...
Date Wed, 23 Nov 2011 00:03:36 GMT
Author: jitendra
Date: Wed Nov 23 00:03:30 2011
New Revision: 1205243

URL: http://svn.apache.org/viewvc?rev=1205243&view=rev
Log:
HDFS-2246. Shortcut a local client reads to a Datanodes files directly. Contributed by Andrew Purtell, Suresh and Jitendra.

Added:
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java
    hadoop/common/branches/branch-0.20-security/src/test/commit-tests
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Wed Nov 23 00:03:30 2011
@@ -90,6 +90,9 @@ Release 0.20.206.0 - unreleased
 
     MAPREDUCE-3419. Don't mark exited TT threads as dead in MiniMRCluster (eli)
 
+    HDFS-2246. Shortcut a local client reads to a Datanodes files directly. 
+    (Andrew Purtell, Suresh, Jitendra)
+
 Release 0.20.205.1 - unreleased
 
   NEW FEATURES

Added: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1205243&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java Wed Nov 23 00:03:30 2011
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.BlockReader;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/**
+ * BlockReaderLocal enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ * {@link BlockReaderLocal} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the path to the file where block is stored using
+ * {@link ClientDatanodeProtocol#getBlockLocalPathInfo(Block, Token)} RPC call</li>
+ * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+ * if security is enabled.</li>
+ * </ul>
+ */
+class BlockReaderLocal extends BlockReader {
+  public static final Log LOG = LogFactory.getLog(DFSClient.class);
+
+  //Stores the cache and proxy for a local datanode.
+  private static class LocalDatanodeInfo {
+    private ClientDatanodeProtocol proxy = null;
+    private final Map<Block, BlockLocalPathInfo> cache;
+
+    LocalDatanodeInfo() {
+      final int cacheSize = 10000;
+      final float hashTableLoadFactor = 0.75f;
+      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
+      cache = Collections
+          .synchronizedMap(new LinkedHashMap<Block, BlockLocalPathInfo>(
+              hashTableCapacity, hashTableLoadFactor, true) {
+            private static final long serialVersionUID = 1;
+
+            @Override
+            protected boolean removeEldestEntry(
+                Map.Entry<Block, BlockLocalPathInfo> eldest) {
+              return size() > cacheSize;
+            }
+          });
+    }
+
+    private synchronized ClientDatanodeProtocol getDatanodeProxy(
+        DatanodeInfo node, Configuration conf, int socketTimeout)
+        throws IOException {
+      if (proxy == null) {
+        proxy = DFSClient.createClientDatanodeProtocolProxy(node, conf,
+            socketTimeout);
+      }
+      return proxy;
+    }
+    
+    private synchronized void resetDatanodeProxy() {
+      if (null != proxy) {
+        RPC.stopProxy(proxy);
+        proxy = null;
+      }
+    }
+
+    private BlockLocalPathInfo getBlockLocalPathInfo(Block b) {
+      return cache.get(b);
+    }
+
+    private void setBlockLocalPathInfo(Block b, BlockLocalPathInfo info) {
+      cache.put(b, info);
+    }
+
+    private void removeBlockLocalPathInfo(Block b) {
+      cache.remove(b);
+    }
+  }
+  
+  // Multiple datanodes could be running on the local machine. Store proxies in
+  // a map keyed by the ipc port of the datanode.
+  private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+
+  private FileInputStream dataIn; // reader for the data file
+  private FileInputStream checksumIn;   // reader for the checksum file
+  
+  /**
+   * The only way this object can be instantiated.
+   */
+  static BlockReaderLocal newBlockReader(Configuration conf,
+    String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, 
+    int socketTimeout, long startOffset, long length) throws IOException {
+    
+    LocalDatanodeInfo localDatanodeInfo =  getLocalDatanodeInfo(node.getIpcPort());
+    // check the cache first
+    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+    if (pathinfo == null) {
+      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+    }
+
+    // check to see if the file exists. It may so happen that the
+    // HDFS file has been deleted and this block-lookup is occurring
+    // on behalf of a new HDFS file. This time, the block file could
+    // be residing in a different portion of the fs.data.dir directory.
+    // In this case, we remove this entry from the cache. The next
+    // call to this method will re-populate the cache.
+    FileInputStream dataIn = null;
+    FileInputStream checksumIn = null;
+    BlockReaderLocal localBlockReader = null;
+    boolean skipChecksum = shortCircuitChecksum(conf);
+    try {
+      // get a local file system
+      File blkfile = new File(pathinfo.getBlockPath());
+      dataIn = new FileInputStream(blkfile);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+            + blkfile.length() + " startOffset " + startOffset + " length "
+            + length + " short circuit checksum " + skipChecksum);
+      }
+
+      if (!skipChecksum) {
+        // get the metadata file
+        File metafile = new File(pathinfo.getMetaPath());
+        checksumIn = new FileInputStream(metafile);
+
+        // read and handle the common header here. For now just a version
+        BlockMetadataHeader header = BlockMetadataHeader
+            .readHeader(new DataInputStream(checksumIn));
+        short version = header.getVersion();
+        if (version != FSDataset.METADATA_VERSION) {
+          LOG.warn("Wrong version (" + version + ") for metadata file for "
+              + blk + " ignoring ...");
+        }
+        DataChecksum checksum = header.getChecksum();
+        localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
+            pathinfo, checksum, true, dataIn, checksumIn);
+      } else {
+        localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
+            pathinfo, dataIn);
+      }
+    } catch (IOException e) {
+      // remove from cache
+      localDatanodeInfo.removeBlockLocalPathInfo(blk);
+      DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk +
+          " from cache because local file " + pathinfo.getBlockPath() +
+          " could not be opened.");
+      throw e;
+    } finally {
+      if (localBlockReader == null) {
+        if (dataIn != null) {
+          dataIn.close();
+        }
+        if (checksumIn != null) {
+          checksumIn.close();
+        }
+      }  
+    }
+    return localBlockReader;
+  }
+  
+  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+    if (ldInfo == null) {
+      ldInfo = new LocalDatanodeInfo();
+      localDatanodeInfoMap.put(port, ldInfo);
+    }
+    return ldInfo;
+  }
+  
+  private static BlockLocalPathInfo getBlockPathInfo(Block blk,
+      DatanodeInfo node, Configuration conf, int timeout,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
+    BlockLocalPathInfo pathinfo = null;
+    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
+        conf, timeout);
+    try {
+      // make RPC to local datanode to find local pathnames of blocks
+      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+      if (pathinfo != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
+        }
+        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+      }
+    } catch (IOException e) {
+      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+      throw e;
+    }
+    return pathinfo;
+  }
+  
+  private static boolean shortCircuitChecksum(Configuration conf) {
+    return conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+  }
+  
+  private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
+      Token<BlockTokenIdentifier> token, long startOffset, long length,
+      BlockLocalPathInfo pathinfo, FileInputStream dataIn) throws IOException {
+    super(
+        new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
+        1);
+    this.startOffset = startOffset;
+    this.dataIn = dataIn;
+    long toSkip = startOffset;
+    while (toSkip > 0) {
+      long skipped = dataIn.skip(toSkip);
+      if (skipped == 0) {
+        throw new IOException("Couldn't initialize input stream");
+      }
+      toSkip -= skipped;
+    }
+  }
+
+  private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
+      Token<BlockTokenIdentifier> token, long startOffset, long length,
+      BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum,
+      FileInputStream dataIn, FileInputStream checksumIn) throws IOException {
+    super(
+        new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
+        1,
+        checksum,
+        verifyChecksum);
+    this.startOffset = startOffset;
+    this.dataIn = dataIn;
+    this.checksumIn = checksumIn;
+    this.checksum = checksum;
+
+    long blockLength = pathinfo.getNumBytes();
+
+    /* If bytesPerChecksum is very large, then the metadata file
+     * is mostly corrupted. For now just truncate bytesPerchecksum to
+     * blockLength.
+     */
+    bytesPerChecksum = checksum.getBytesPerChecksum();
+    if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+      checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+          Math.max((int) blockLength, 10 * 1024 * 1024));
+      bytesPerChecksum = checksum.getBytesPerChecksum();
+    }
+
+    checksumSize = checksum.getChecksumSize();
+
+    long endOffset = blockLength;
+    if (startOffset < 0 || startOffset > endOffset
+        || (length + startOffset) > endOffset) {
+      String msg = " Offset " + startOffset + " and length " + length
+      + " don't match block " + block + " ( blockLen " + endOffset + " )";
+      LOG.warn("BlockReaderLocal requested with incorrect offset: " + msg);
+      throw new IOException(msg);
+    }
+
+    firstChunkOffset = (startOffset - (startOffset % bytesPerChecksum));
+
+    if (length >= 0) {
+      // Make sure endOffset points to end of a checksumed chunk.
+      long tmpLen = startOffset + length;
+      if (tmpLen % bytesPerChecksum != 0) {
+        tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+      }
+      if (tmpLen < endOffset) {
+        endOffset = tmpLen;
+      }
+    }
+
+    // seek to the right offsets
+    if (firstChunkOffset > 0) {
+      dataIn.getChannel().position(firstChunkOffset);
+
+      long checksumSkip = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+      // note blockInStream is  seeked when created below
+      if (checksumSkip > 0) {
+        checksumIn.skip(checksumSkip);
+      }
+    }
+
+    lastChunkOffset = firstChunkOffset;
+    lastChunkLen = -1;
+  }
+
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read off " + off + " len " + len);
+    }
+    if (checksum == null) {
+      return dataIn.read(buf, off, len);
+    } else {
+      return super.read(buf, off, len);
+    }
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("skip " + n);
+    }
+    if (checksum == null) {
+      return dataIn.skip(n);
+    } else {
+     return super.skip(n);
+    }
+  }
+
+  @Override
+  public synchronized void seek(long n) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("seek " + n);
+    }
+    throw new IOException("Seek() is not supported in BlockReaderLocal");
+  }
+
+  @Override
+  protected synchronized int readChunk(long pos, byte[] buf, int offset,
+      int len, byte[] checksumBuf) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reading chunk from position " + pos + " at offset " +
+          offset + " with length " + len);
+    }
+
+    if (gotEOS) {
+      startOffset = -1;
+      return -1;
+    }
+
+    if (checksumBuf.length != checksumSize) {
+      throw new IOException("Cannot read checksum into buffer. "
+          + "The buffer must be exactly '" + checksumSize
+          + "' bytes long to hold the checksum bytes.");
+    }
+
+    if ((pos + firstChunkOffset) != lastChunkOffset) {
+      throw new IOException("Mismatch in pos : " + pos + " + "
+          + firstChunkOffset + " != " + lastChunkOffset);
+    }
+
+    int nRead = dataIn.read(buf, offset, bytesPerChecksum);
+    if (nRead < bytesPerChecksum) {
+      gotEOS = true;
+    }
+
+    lastChunkOffset += nRead;
+    lastChunkLen = nRead;
+
+    // If verifyChecksum is false, we omit reading the checksum
+    if (checksumIn != null) {
+      int nChecksumRead = checksumIn.read(checksumBuf);
+      if (nChecksumRead != checksumSize) {
+        throw new IOException("Could not read checksum at offset " +
+            checksumIn.getChannel().position() + " from the meta file.");
+      }
+    }
+
+    return nRead;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (dataIn != null) {
+      dataIn.close();
+      dataIn = null;
+    }
+    if (checksumIn != null) {
+      checksumIn.close();
+      checksumIn = null;
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Wed Nov 23 00:03:30 2011
@@ -93,6 +93,7 @@ public class DFSClient implements FSCons
   final int writePacketSize;
   private final FileSystem.Statistics stats;
   private int maxBlockAcquireFailures;
+  private boolean shortCircuitLocalReads;
 
   /**
    * We assume we're talking to another CDH server, which supports
@@ -144,6 +145,7 @@ public class DFSClient implements FSCons
         rpcNamenode, methodNameToPolicyMap);
   }
 
+  /** Create {@link ClientDatanodeProtocol} proxy with block/token */
   static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
       DatanodeID datanodeid, Configuration conf, 
       Block block, Token<BlockTokenIdentifier> token, int socketTimeout) throws IOException {
@@ -160,6 +162,20 @@ public class DFSClient implements FSCons
         .getDefaultSocketFactory(conf), socketTimeout);
   }
         
+  /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
+  static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf, int socketTimeout)
+      throws IOException {
+    InetSocketAddress addr = NetUtils.createSocketAddr(
+      datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+    if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
+      ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
+    }
+    return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
+        ClientDatanodeProtocol.versionID, addr, conf, NetUtils
+        .getDefaultSocketFactory(conf), socketTimeout);
+  }
+  
   /**
    * Same as this(NameNode.getAddress(conf), conf);
    * @see #DFSClient(InetSocketAddress, Configuration)
@@ -206,7 +222,7 @@ public class DFSClient implements FSCons
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
     this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
-    
+
     ugi = UserGroupInformation.getCurrentUser();
 
     String taskId = conf.get("mapred.task.id");
@@ -229,6 +245,13 @@ public class DFSClient implements FSCons
           "Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
           + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
     }
+    // read directly from the block file if configured.
+    this.shortCircuitLocalReads = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Short circuit read is " + shortCircuitLocalReads);
+    }
   }
 
   static int getMaxBlockAcquireFailures(Configuration conf) {
@@ -325,6 +348,82 @@ public class DFSClient implements FSCons
   }
 
   /**
+   * Get {@link BlockReader} for short circuited local reads.
+   */
+  private static BlockReader getLocalBlockReader(Configuration conf,
+      String src, Block blk, Token<BlockTokenIdentifier> accessToken,
+      DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
+      throws InvalidToken, IOException {
+    try {
+      return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
+          chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
+              - offsetIntoBlock);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(InvalidToken.class,
+          AccessControlException.class);
+    }
+  }
+  
+  private static Set<String> localIpAddresses = Collections
+      .synchronizedSet(new HashSet<String>());
+  
+  private static boolean isLocalAddress(InetSocketAddress targetAddr) {
+    InetAddress addr = targetAddr.getAddress();
+    if (localIpAddresses.contains(addr.getHostAddress())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Address " + targetAddr + " is local");
+      }
+      return true;
+    }
+
+    // Check if the address is any local or loop back
+    boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+    // Check if the address is defined on any interface
+    if (!local) {
+      try {
+        local = NetworkInterface.getByInetAddress(addr) != null;
+      } catch (SocketException e) {
+        local = false;
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Address " + targetAddr + " is local");
+    }
+    if (local == true) {
+      localIpAddresses.add(addr.getHostAddress());
+    }
+    return local;
+  }
+  
+  /**
+   * Should the block access token be refetched on an exception
+   * 
+   * @param ex Exception received
+   * @param targetAddr Target datanode address from where exception was received
+   * @return true if block access token has expired or invalid and it should be
+   *         refetched
+   */
+  private static boolean tokenRefetchNeeded(IOException ex,
+      InetSocketAddress targetAddr) {
+    /*
+     * Get a new access token and retry. Retry is needed in 2 cases. 1) When
+     * both NN and DN re-started while DFSClient holding a cached access token.
+     * 2) In the case that NN fails to update its access key at pre-set interval
+     * (by a wide margin) and subsequently restarts. In this case, DN
+     * re-registers itself with NN and receives a new access key, but DN will
+     * delete the old access key from its memory since it's considered expired
+     * based on the estimated expiration date.
+     */
+    if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
+      LOG.info("Access token was invalid when connecting to " + targetAddr
+          + " : " + ex);
+      return true;
+    }
+    return false;
+  }
+  
+  /**
    * Cancel a delegation token
    * @param token the token to cancel
    * @throws InvalidToken
@@ -1312,16 +1411,16 @@ public class DFSClient implements FSCons
 
     private Socket dnSock; //for now just sending checksumOk.
     private DataInputStream in;
-    private DataChecksum checksum;
-    private long lastChunkOffset = -1;
-    private long lastChunkLen = -1;
+    protected DataChecksum checksum;
+    protected long lastChunkOffset = -1;
+    protected long lastChunkLen = -1;
     private long lastSeqNo = -1;
 
-    private long startOffset;
-    private long firstChunkOffset;
-    private int bytesPerChecksum;
-    private int checksumSize;
-    private boolean gotEOS = false;
+    protected long startOffset;
+    protected long firstChunkOffset;
+    protected int bytesPerChecksum;
+    protected int checksumSize;
+    protected boolean gotEOS = false;
     
     byte[] skipBuf = null;
     ByteBuffer checksumBytes = null;
@@ -1358,7 +1457,8 @@ public class DFSClient implements FSCons
       int nRead = super.read(buf, off, len);
       
       // if gotEOS was set in the previous read and checksum is enabled :
-      if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
+      if (dnSock != null && gotEOS && !eosBefore && nRead >= 0
+          && needChecksum()) {
         //checksum is verified and there are no errors.
         checksumOk(dnSock);
       }
@@ -1536,14 +1636,44 @@ public class DFSClient implements FSCons
       checksumSize = this.checksum.getChecksumSize();
     }
 
+    /**
+     * Public constructor 
+     */  
+    BlockReader(Path file, int numRetries) {
+      super(file, numRetries);
+    }
+
+    protected BlockReader(Path file, int numRetries, DataChecksum checksum,
+        boolean verifyChecksum) {
+      super(file,
+          numRetries,
+          verifyChecksum,
+          checksum.getChecksumSize() > 0? checksum : null,
+              checksum.getBytesPerChecksum(),
+              checksum.getChecksumSize());
+    }
+
     public static BlockReader newBlockReader(Socket sock, String file, long blockId, Token<BlockTokenIdentifier> accessToken, 
         long genStamp, long startOffset, long len, int bufferSize) throws IOException {
       return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
           true);
     }
 
-    /** Java Doc required */
-    public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
+    /** 
+     * Creates a new {@link BlockReader} for the given blockId.
+     * @param sock Socket to read the block.
+     * @param file File to which this block belongs.
+     * @param blockId Block id.
+     * @param accessToken Block access token.
+     * @param genStamp Generation stamp of the block.
+     * @param startOffset Start offset for the data.
+     * @param len Length to be read.
+     * @param bufferSize Buffer size to use.
+     * @param verifyChecksum Checksum verification is required or not.
+     * @return BlockReader object.
+     * @throws IOException
+     */
+    public static BlockReader newBlockReader(Socket sock, String file, long blockId, 
                                        Token<BlockTokenIdentifier> accessToken,
                                        long genStamp,
                                        long startOffset, long len,
@@ -1887,6 +2017,14 @@ public class DFSClient implements FSCons
       return blockRange;
     }
 
+    private boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
+        throws IOException {
+      if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
+        return true;
+      }
+      return false;
+    }
+    
     /**
      * Open a DataInputStream to a DataNode so that it can be read from.
      * We get block ID and the IDs of the destinations at startup, from the namenode.
@@ -1923,13 +2061,37 @@ public class DFSClient implements FSCons
         chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
 
+        // try reading the block locally. if this fails, then go via
+        // the datanode
+        Block blk = targetBlock.getBlock();
+        Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+        if (shouldTryShortCircuitRead(targetAddr)) {
+          try {
+            blockReader = getLocalBlockReader(conf, src, blk, accessToken,
+                chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock);
+            return chosenNode;
+          } catch (AccessControlException ex) {
+            LOG.warn("Short circuit access failed ", ex);
+            //Disable short circuit reads
+            shortCircuitLocalReads = false;
+          } catch (IOException ex) {
+            if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
+              /* Get a new access token and retry. */
+              refetchToken--;
+              fetchBlockAt(target);
+              continue;
+            } else {
+              LOG.info("Failed to read block " + targetBlock.getBlock()
+                  + " on local machine" + StringUtils.stringifyException(ex));
+              LOG.info("Try reading via the datanode on " + targetAddr);
+            }
+          }
+        }
+
         try {
           s = socketFactory.createSocket();
           NetUtils.connect(s, targetAddr, socketTimeout);
           s.setSoTimeout(socketTimeout);
-          Block blk = targetBlock.getBlock();
-          Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
-          
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
               accessToken, 
               blk.getGenerationStamp(),
@@ -1937,20 +2099,7 @@ public class DFSClient implements FSCons
               buffersize, verifyChecksum, clientName);
           return chosenNode;
         } catch (IOException ex) {
-          if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
-            LOG.info("Will fetch a new access token and retry, " 
-                + "access token was invalid when connecting to " + targetAddr
-                + " : " + ex);
-            /*
-             * Get a new access token and retry. Retry is needed in 2 cases. 1)
-             * When both NN and DN re-started while DFSClient holding a cached
-             * access token. 2) In the case that NN fails to update its
-             * access key at pre-set interval (by a wide margin) and
-             * subsequently restarts. In this case, DN re-registers itself with
-             * NN and receives a new access key, but DN will delete the old
-             * access key from its memory since it's considered expired based on
-             * the estimated expiration date.
-             */
+          if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
             refetchToken--;
             fetchBlockAt(target);
           } else {
@@ -1965,8 +2114,7 @@ public class DFSClient implements FSCons
           if (s != null) {
             try {
               s.close();
-            } catch (IOException iex) {
-            }                        
+            } catch (IOException iex) { }                        
           }
           s = null;
         }
@@ -2154,21 +2302,31 @@ public class DFSClient implements FSCons
         DatanodeInfo chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
         BlockReader reader = null;
-            
+
+        int len = (int) (end - start + 1);
         try {
-          dn = socketFactory.createSocket();
-          NetUtils.connect(dn, targetAddr, socketTimeout);
-          dn.setSoTimeout(socketTimeout);
           Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
-              
-          int len = (int) (end - start + 1);
-              
-          reader = BlockReader.newBlockReader(dn, src, 
-                                              block.getBlock().getBlockId(),
-                                              accessToken,
-                                              block.getBlock().getGenerationStamp(),
-                                              start, len, buffersize, 
-                                              verifyChecksum, clientName);
+          // first try reading the block locally.
+          if (shouldTryShortCircuitRead(targetAddr)) {
+            try {
+              reader = getLocalBlockReader(conf, src, block.getBlock(),
+                  accessToken, chosenNode, DFSClient.this.socketTimeout, start);
+            } catch (AccessControlException ex) {
+              LOG.warn("Short circuit access failed ", ex);
+              //Disable short circuit reads
+              shortCircuitLocalReads = false;
+              continue;
+            }
+          } else {
+            // go to the datanode
+            dn = socketFactory.createSocket();
+            NetUtils.connect(dn, targetAddr, socketTimeout);
+            dn.setSoTimeout(socketTimeout);
+            reader = BlockReader.newBlockReader(dn, src, 
+                block.getBlock().getBlockId(), accessToken,
+                block.getBlock().getGenerationStamp(), start, len, buffersize, 
+                verifyChecksum, clientName);
+          }
           int nread = reader.readAll(buf, offset, len);
           if (nread != len) {
             throw new IOException("truncated return from reader.read(): " +
@@ -2181,10 +2339,7 @@ public class DFSClient implements FSCons
                    e.getPos() + " from " + chosenNode.getName());
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
-          if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
-            LOG.info("Will get a new access token and retry, "
-                + "access token was invalid when connecting to " + targetAddr
-                + " : " + e);
+          if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
             refetchToken--;
             fetchBlockAt(block.getStartOffset());
             continue;
@@ -3314,7 +3469,8 @@ public class DFSClient implements FSCons
 
       } catch (IOException ie) {
 
-        LOG.info("Exception in createBlockOutputStream " + ie);
+        LOG.info("Exception in createBlockOutputStream " + nodes[0].getName() +
+            " " + ie);
 
         // find the datanode that matches
         if (firstBadLink.length() != 0) {

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Nov 23 00:03:30 2011
@@ -197,6 +197,10 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
   public static final String  DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
   public static final int     DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 100;
+  public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
+  public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
+  public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
+  public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
 
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
@@ -219,4 +223,5 @@ public class DFSConfigKeys extends Commo
   
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
+  public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
 }

Added: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java?rev=1205243&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java Wed Nov 23 00:03:30 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+
+/**
+ * A block and the full path information to the block data file and
+ * the metadata file stored on the local file system.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockLocalPathInfo implements Writable {
+  static final WritableFactory FACTORY = new WritableFactory() {
+    public Writable newInstance() { return new BlockLocalPathInfo(); }
+  };
+  static {                                      // register a ctor
+    WritableFactories.setFactory(BlockLocalPathInfo.class, FACTORY);
+  }
+
+  private Block block;
+  private String localBlockPath = "";  // local file storing the data
+  private String localMetaPath = "";   // local file storing the checksum
+
+  public BlockLocalPathInfo() {}
+
+  /**
+   * Constructs BlockLocalPathInfo.
+   * @param b The block corresponding to this lock path info.
+   * @param file Block data file.
+   * @param metafile Metadata file for the block.
+   */
+  public BlockLocalPathInfo(Block b, String file, String metafile) {
+    block = b;
+    localBlockPath = file;
+    localMetaPath = metafile;
+  }
+
+  /**
+   * Get the Block data file.
+   * @return Block data file.
+   */
+  public String getBlockPath() {return localBlockPath;}
+  
+  /**
+   * Get the Block metadata file.
+   * @return Block metadata file.
+   */
+  public String getMetaPath() {return localMetaPath;}
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    block.write(out);
+    Text.writeString(out, localBlockPath);
+    Text.writeString(out, localMetaPath);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    block = new Block();
+    block.readFields(in);
+    localBlockPath = Text.readString(in);
+    localMetaPath = Text.readString(in);
+  }
+  
+  /**
+   * Get number of bytes in the block.
+   * @return Number of bytes in the block.
+   */
+  public long getNumBytes() {
+    return block.getNumBytes();
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Wed Nov 23 00:03:30 2011
@@ -21,12 +21,18 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
 
 /** An client-datanode protocol for block recovery
  */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
 @TokenInfo(BlockTokenSelector.class)
 public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
@@ -55,4 +61,29 @@ public interface ClientDatanodeProtocol 
    * @throws IOException if the block does not exist
    */
   Block getBlockInfo(Block block) throws IOException;
+
+  /**
+   * Retrieves the path names of the block file and metadata file stored on the
+   * local file system.
+   * 
+   * In order for this method to work, one of the following should be satisfied:
+   * <ul>
+   * <li>
+   * The client user must be configured at the datanode to be able to use this
+   * method.</li>
+   * <li>
+   * When security is enabled, kerberos authentication must be used to connect
+   * to the datanode.</li>
+   * </ul>
+   * 
+   * @param block
+   *          the specified block on the local datanode
+   * @param token 
+   *          the block access token.
+   * @return the BlockLocalPathInfo of a block
+   * @throws IOException
+   *           on error
+   */
+  BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+      Token<BlockTokenIdentifier> token) throws IOException;           
 }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Wed Nov 23 00:03:30 2011
@@ -26,6 +26,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
 
 /**
@@ -33,7 +35,9 @@ import org.apache.hadoop.util.DataChecks
  * This is not related to the Block related functionality in Namenode.
  * The biggest part of data block metadata is CRC for the block.
  */
-class BlockMetadataHeader {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMetadataHeader {
 
   static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
   
@@ -49,12 +53,14 @@ class BlockMetadataHeader {
     this.checksum = checksum;
     this.version = version;
   }
-    
-  short getVersion() {
+  
+  /** Get the version */
+  public short getVersion() {
     return version;
   }
 
-  DataChecksum getChecksum() {
+  /** Get the version */
+  public DataChecksum getChecksum() {
     return checksum;
   }
 
@@ -65,7 +71,7 @@ class BlockMetadataHeader {
    * @return Metadata Header
    * @throws IOException
    */
-  static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+  public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
     return readHeader(in.readShort(), in);
   }
   

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Nov 23 00:03:30 2011
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
@@ -62,6 +64,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -71,6 +74,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -110,8 +114,10 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -225,6 +231,7 @@ public class DataNode extends Configured
   boolean isBlockTokenEnabled;
   BlockTokenSecretManager blockTokenSecretManager;
   boolean isBlockTokenInitialized = false;
+  final String userWithLocalPathAccess;
 
   /**
    * Testing hook that allows tests to delay the sending of blockReceived RPCs
@@ -286,6 +293,8 @@ public class DataNode extends Configured
 
     datanodeObject = this;
     supportAppends = conf.getBoolean("dfs.support.append", false);
+    this.userWithLocalPathAccess = conf
+        .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
     try {
       startDataNode(conf, dataDirs, resources);
     } catch (IOException ie) {
@@ -1751,6 +1760,89 @@ public class DataNode extends Configured
     throw new IOException("Unknown protocol to " + getClass().getSimpleName()
         + ": " + protocol);
   }
+  
+  /** Ensure the authentication method is kerberos */
+  private void checkKerberosAuthMethod(String msg) throws IOException {
+    // User invoking the call must be same as the datanode user
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() != 
+        AuthenticationMethod.KERBEROS) {
+      throw new AccessControlException("Error in "+msg+". Only "
+          + "kerberos based authentication is allowed.");
+    }
+  }
+  
+  private void checkBlockLocalPathAccess() throws IOException {
+    checkKerberosAuthMethod("getBlockLocalPathInfo()");
+    String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (!currentUser.equals(this.userWithLocalPathAccess)) {
+      throw new AccessControlException(
+          "Can't continue with getBlockLocalPathInfo() "
+              + "authorization. The user " + currentUser
+              + " is not allowed to call getBlockLocalPathInfo");
+    }
+  }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    checkBlockLocalPathAccess();
+    checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+    BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
+    if (LOG.isDebugEnabled()) {
+      if (info != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("getBlockLocalPathInfo successful block=" + block
+              + " blockfile " + info.getBlockPath() + " metafile "
+              + info.getMetaPath());
+        }
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("getBlockLocalPathInfo for block=" + block
+              + " returning null");
+        }
+      }
+    }
+    myMetrics.incrBlocksGetLocalPathInfo();
+    return info;
+  }
+  
+  private void checkBlockToken(Block block, Token<BlockTokenIdentifier> token,
+      AccessMode accessMode) throws IOException {
+    if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+      BlockTokenIdentifier id = new BlockTokenIdentifier();
+      ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+      DataInputStream in = new DataInputStream(buf);
+      id.readFields(in);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got: " + id.toString());
+      }
+      blockTokenSecretManager.checkAccess(id, null, block, accessMode);
+    }
+  }
+
+  /** Check block access token for the given access mode */
+  private void checkBlockToken(Block block,
+      BlockTokenSecretManager.AccessMode accessMode) throws IOException {
+    if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+      Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
+          .getTokenIdentifiers();
+      if (tokenIds.size() != 1) {
+        throw new IOException("Can't continue with "
+            + "authorization since " + tokenIds.size()
+            + " BlockTokenIdentifier " + "is found.");
+      }
+      for (TokenIdentifier tokenId : tokenIds) {
+        BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got: " + id.toString());
+        }
+        blockTokenSecretManager.checkAccess(id, null, block, accessMode);
+      }
+    }
+  }
 
   /** A convenient class used in lease recovery */
   private static class BlockRecord { 
@@ -1952,28 +2044,13 @@ public class DataNode extends Configured
   public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
       ) throws IOException {
     logRecoverBlock("Client", block, targets);
-    if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
-      Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
-          .getTokenIdentifiers();
-      if (tokenIds.size() != 1) {
-        throw new IOException("Can't continue with recoverBlock() "
-            + "authorization since " + tokenIds.size() + " BlockTokenIdentifier "
-            + "is found.");
-      }
-      for (TokenIdentifier tokenId : tokenIds) {
-        BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Got: " + id.toString());
-        }
-        blockTokenSecretManager.checkAccess(id, null, block,
-            BlockTokenSecretManager.AccessMode.WRITE);
-      }
-    }
+    checkBlockToken(block, BlockTokenSecretManager.AccessMode.WRITE);
     return recoverBlock(block, keepLength, targets, false);
   }
 
   /** {@inheritDoc} */
   public Block getBlockInfo(Block block) throws IOException {
+    checkBlockToken(block, BlockTokenSecretManager.AccessMode.READ);
     Block stored = data.getStoredBlock(block.getBlockId());
     return stored;
   }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Nov 23 00:03:30 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
@@ -1072,6 +1073,16 @@ public class FSDataset implements FSCons
     return f;
   }
   
+  @Override //FSDatasetInterface
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
+      throws IOException {
+    File datafile = getBlockFile(block);
+    File metafile = getMetaFile(datafile, block);
+    BlockLocalPathInfo info = new BlockLocalPathInfo(block,
+        datafile.getAbsolutePath(), metafile.getAbsolutePath());
+    return info;
+  }
+  
   public synchronized InputStream getBlockInputStream(Block b) throws IOException {
     return new FileInputStream(getBlockFile(b));
   }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Nov 23 00:03:30 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
 
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,6 +31,7 @@ import java.io.OutputStream;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
@@ -333,4 +335,8 @@ public interface FSDatasetInterface exte
 
   public BlockRecoveryInfo startBlockRecovery(long blockId) throws IOException;
 
+  /**
+   * Get {@link BlockLocalPathInfo} for the given block.
+   **/
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block b) throws IOException;
 }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java Wed Nov 23 00:03:30 2011
@@ -49,6 +49,8 @@ public class DataNodeInstrumentation imp
       registry.newCounter("blocks_verified", "", 0);
   final MetricMutableCounterInt blockVerificationFailures =
       registry.newCounter("block_verification_failures", "", 0);
+  final MetricMutableCounterInt blocksGetLocalPathInfo = 
+      registry.newCounter("blocks_get_local_pathinfo", "", 0);
 
   final MetricMutableCounterInt readsFromLocalClient =
       registry.newCounter("reads_from_local_client", "", 0);
@@ -131,6 +133,11 @@ public class DataNodeInstrumentation imp
   }
 
   //@Override
+  public void incrBlocksGetLocalPathInfo() {
+    blocksGetLocalPathInfo.incr();
+  }
+
+  //@Override
   public void addReadBlockOp(long latency) {
     readBlockOp.add(latency);
   }

Modified: hadoop/common/branches/branch-0.20-security/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/commit-tests?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/commit-tests (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/commit-tests Wed Nov 23 00:03:30 2011
@@ -110,6 +110,7 @@
 **/TestFileAppend.java
 **/TestFileCorruption.java
 **/TestFileLimit.java
+**/TestShortCircuitLocalRead.java
 **/TestFileStatus.java
 **/TestFSInputChecker.java
 **/TestFSOutputSummer.java

Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1205243&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Wed Nov 23 00:03:30 2011
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Test for short circuit read functionality using {@link BlockReaderLocal}.
+ * When a block is being read by a client is on the local datanode, instead of
+ * using {@link DataTransferProtocol} and connect to datanode, the short circuit
+ * read allows reading the file directly from the files on the local file
+ * system.
+ */
+public class TestShortCircuitLocalRead {
+  static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 5120;
+  boolean simulatedStorage = false;
+  
+  // creates a file but does not close it
+  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, (long)blockSize);
+    return stm;
+  }
+
+  static private void checkData(byte[] actual, int from, byte[] expected,
+      String message) {
+    checkData(actual, from, expected, actual.length, message);
+  }
+  
+  static private void checkData(byte[] actual, int from, byte[] expected,
+      int len, String message) {
+    for (int idx = 0; idx < len; idx++) {
+      if (expected[from + idx] != actual[idx]) {
+        Assert.fail(message + " byte " + (from + idx) + " differs. expected "
+            + expected[from + idx] + " actual " + actual[idx]);
+      }
+    }
+  }
+
+  static void checkFileContent(FileSystem fs, Path name, byte[] expected,
+      int readOffset) throws IOException {
+    FSDataInputStream stm = fs.open(name);
+    byte[] actual = new byte[expected.length-readOffset];
+    stm.readFully(readOffset, actual);
+    checkData(actual, readOffset, expected, "Read 2");
+    stm.close();
+    // Now read using a different API.
+    actual = new byte[expected.length-readOffset];
+    stm = fs.open(name);
+    long skipped = stm.skip(readOffset);
+    Assert.assertEquals(skipped, readOffset);
+    //Read a small number of bytes first.
+    int nread = stm.read(actual, 0, 3);
+    nread += stm.read(actual, nread, 2);
+    //Read across chunk boundary
+    nread += stm.read(actual, nread, 517);
+    checkData(actual, readOffset, expected, nread, "A few bytes");
+    //Now read rest of it
+    while (nread < actual.length) {
+      int nbytes = stm.read(actual, nread, actual.length - nread);
+      if (nbytes < 0) {
+        throw new EOFException("End of file reached before reading fully.");
+      }
+      nread += nbytes;
+    }
+    checkData(actual, readOffset, expected, "Read 3");
+    stm.close();
+  }
+
+  /**
+   * Test that file data can be read by reading the block file
+   * directly from the local store.
+   */
+  public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+      int readOffset) throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        ignoreChecksum);
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      // check that / exists
+      Path path = new Path("/"); 
+      assertTrue("/ should be a directory", 
+                 fs.getFileStatus(path).isDir() == true);
+      
+      byte[] fileData = AppendTestUtil.randomBytes(seed, size);
+      // create a new file in home directory. Do not close it.
+      Path file1 = new Path("filelocal.dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1);
+
+      // write to file
+      stm.write(fileData);
+      stm.close();
+      checkFileContent(fs, file1, fileData, readOffset);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFileLocalReadNoChecksum() throws IOException {
+    doTestShortCircuitRead(true, 3*blockSize+100, 0);
+  }
+
+  @Test
+  public void testFileLocalReadChecksum() throws IOException {
+    doTestShortCircuitRead(false, 3*blockSize+100, 0);
+  }
+  
+  @Test
+  public void testSmallFileLocalRead() throws IOException {
+    doTestShortCircuitRead(false, 13, 0);
+    doTestShortCircuitRead(false, 13, 5);
+    doTestShortCircuitRead(true, 13, 0);
+    doTestShortCircuitRead(true, 13, 5);
+  }
+  
+  @Test
+  public void testReadFromAnOffset() throws IOException {
+    doTestShortCircuitRead(false, 3*blockSize+100, 777);
+    doTestShortCircuitRead(true, 3*blockSize+100, 777);
+  }
+
+  @Test
+  public void testLongFile() throws IOException {
+    doTestShortCircuitRead(false, 10*blockSize+100, 777);
+    doTestShortCircuitRead(true, 10*blockSize+100, 777);
+  }
+  
+  @Test
+  public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+    final Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser");
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster.waitActive();
+    final DataNode dn = cluster.getDataNodes().get(0);
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
+      UserGroupInformation aUgi = UserGroupInformation
+          .createRemoteUser("alloweduser");
+      LocatedBlocks lb = cluster.getNameNode().getBlockLocations("/tmp/x", 0,
+          16);
+      // Create a new block object, because the block inside LocatedBlock at
+      // namenode is of type BlockInfo.
+      Block blk = new Block(lb.get(0).getBlock());
+      Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
+      final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
+      ClientDatanodeProtocol proxy = aUgi
+          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+            @Override
+            public ClientDatanodeProtocol run() throws Exception {
+              return DFSClient.createClientDatanodeProtocolProxy(
+                  dnInfo, conf, 60000);
+            }
+          });
+      
+      //This should succeed
+      BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
+      Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(),
+          blpi.getBlockPath());
+      RPC.stopProxy(proxy);
+
+      // Now try with a not allowed user.
+      UserGroupInformation bUgi = UserGroupInformation
+          .createRemoteUser("notalloweduser");
+      proxy = bUgi
+          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+            @Override
+            public ClientDatanodeProtocol run() throws Exception {
+              return DFSClient.createClientDatanodeProtocolProxy(
+                  dnInfo, conf, 60000);
+            }
+          });
+      try {
+        proxy.getBlockLocalPathInfo(blk, token);
+        Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+            + " is not allowed to call getBlockLocalPathInfo");
+      } catch (IOException ex) {
+        Assert.assertTrue(ex.getMessage().contains(
+            "not allowed to call getBlockLocalPathInfo"));
+      } finally {
+        RPC.stopProxy(proxy);
+      }
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Test to run benchmarks between shortcircuit read vs regular read with
+   * specified number of threads simultaneously reading.
+   * <br>
+   * Run this using the following command:
+   * bin/hadoop --config confdir \
+   * org.apache.hadoop.hdfs.TestShortCircuitLocalRead \
+   * <shortcircuit on?> <checsum on?> <Number of threads>
+   */
+  public static void main(String[] args) throws Exception {
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.INFO);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.INFO);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.INFO);
+    
+    if (args.length != 3) {
+      System.out.println("Usage: test shortcircuit checksum threadCount");
+      System.exit(1);
+    }
+    boolean shortcircuit = Boolean.valueOf(args[0]);
+    boolean checksum = Boolean.valueOf(args[1]);
+    int threadCount = Integer.valueOf(args[2]);
+
+    // Setup create a file
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        checksum);
+    
+    //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
+    int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
+    final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
+    
+    // create a new file in home directory. Do not close it.
+    final Path file1 = new Path("filelocal.dat");
+    final FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream stm = createFile(fs, file1, 1);
+    
+    stm.write(dataToWrite);
+    stm.close();
+
+    long start = System.currentTimeMillis();
+    final int iteration = 20;
+    Thread[] threads = new Thread[threadCount];
+    for (int i = 0; i < threadCount; i++) {
+      threads[i] = new Thread() {
+        public void run() {
+          for (int i = 0; i < iteration; i++) {
+            try {
+              checkFileContent(fs, file1, dataToWrite, 0);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+      };
+    }
+    for (int i = 0; i < threadCount; i++) {
+      threads[i].start();
+    }
+    for (int i = 0; i < threadCount; i++) {
+      threads[i].join();
+    }
+    long end = System.currentTimeMillis();
+    System.out.println("Iteration " + iteration + " took " + (end - start));
+    fs.delete(file1, false);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Nov 23 00:03:30 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -31,6 +32,7 @@ import javax.management.StandardMBean;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
@@ -710,4 +712,9 @@ public class SimulatedFSDataset  impleme
     Block stored = getStoredBlock(blockId);
     return new BlockRecoveryInfo(stored, false);
   }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block blk) throws IOException {
+    throw new IOException("getBlockLocalPathInfo not supported.");
+  }
 }



Mime
View raw message