hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1456048 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/
Date Wed, 13 Mar 2013 16:54:11 GMT
Author: suresh
Date: Wed Mar 13 16:54:10 2013
New Revision: 1456048

URL: http://svn.apache.org/r1456048
Log:
HDFS-4595. Merging 1456047 from trunk.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1456048&r1=1456047&r2=1456048&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Mar 13
16:54:10 2013
@@ -74,6 +74,9 @@ Release 2.0.5-beta - UNRELEASED
     HDFS-4484. libwebhdfs compilation broken with gcc 4.6.2. (Colin Patrick
     McCabe via atm)
 
+    HDFS-4595. When short circuit read is fails, DFSClient does not fallback
+    to regular reads. (suresh)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1456048&r1=1456047&r2=1456048&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
Wed Mar 13 16:54:10 2013
@@ -23,6 +23,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -31,6 +32,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -86,11 +89,21 @@ class BlockReaderLocal implements BlockR
     }
 
     private synchronized ClientDatanodeProtocol getDatanodeProxy(
-        DatanodeInfo node, Configuration conf, int socketTimeout,
-        boolean connectToDnViaHostname) throws IOException {
+        UserGroupInformation ugi, final DatanodeInfo node,
+        final Configuration conf, final int socketTimeout,
+        final boolean connectToDnViaHostname) throws IOException {
       if (proxy == null) {
-        proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
-            socketTimeout, connectToDnViaHostname);
+        try {
+          proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>()
{
+            @Override
+            public ClientDatanodeProtocol run() throws Exception {
+              return DFSUtil.createClientDatanodeProtocolProxy(node, conf,
+                  socketTimeout, connectToDnViaHostname);
+            }
+          });
+        } catch (InterruptedException e) {
+          LOG.warn("encountered exception ", e);
+        }
       }
       return proxy;
     }
@@ -154,17 +167,18 @@ class BlockReaderLocal implements BlockR
   /**
    * The only way this object can be instantiated.
    */
-  static BlockReaderLocal newBlockReader(Configuration conf, String file,
-      ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
-      int socketTimeout, long startOffset, long length,
-      boolean connectToDnViaHostname) throws IOException {
+  static BlockReaderLocal newBlockReader(UserGroupInformation ugi,
+      Configuration conf, String file, ExtendedBlock blk,
+      Token<BlockTokenIdentifier> token, DatanodeInfo node, int socketTimeout,
+      long startOffset, long length, boolean connectToDnViaHostname)
+      throws IOException {
 
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
         .getIpcPort());
     // check the cache first
     BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
     if (pathinfo == null) {
-      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
+      pathinfo = getBlockPathInfo(ugi, blk, node, conf, socketTimeout, token,
           connectToDnViaHostname);
     }
 
@@ -241,13 +255,13 @@ class BlockReaderLocal implements BlockR
     return ldInfo;
   }
   
-  private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
-      DatanodeInfo node, Configuration conf, int timeout,
+  private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
+      ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
       Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
-          throws IOException {
+      throws IOException {
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
     BlockLocalPathInfo pathinfo = null;
-    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
+    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
         conf, timeout, connectToDnViaHostname);
     try {
       // make RPC to local datanode to find local pathnames of blocks

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1456048&r1=1456047&r2=1456048&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
Wed Mar 13 16:54:10 2013
@@ -414,6 +414,7 @@ public class DFSClient implements java.i
           "null URI");
       NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo =
         NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
+      
       this.dtService = proxyInfo.getDelegationTokenService();
       this.namenode = proxyInfo.getProxy();
     }
@@ -782,12 +783,13 @@ public class DFSClient implements java.i
   /**
    * Get {@link BlockReader} for short circuited local reads.
    */
-  static BlockReader getLocalBlockReader(Configuration conf,
-      String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
-      DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
-      boolean connectToDnViaHostname) throws InvalidToken, IOException {
+  static BlockReader getLocalBlockReader(UserGroupInformation ugi,
+      Configuration conf, String src, ExtendedBlock blk,
+      Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
+      int socketTimeout, long offsetIntoBlock, boolean connectToDnViaHostname)
+      throws InvalidToken, IOException {
     try {
-      return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
+      return BlockReaderLocal.newBlockReader(ugi, conf, src, blk, accessToken,
           chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
               - offsetIntoBlock, connectToDnViaHostname);
     } catch (RemoteException re) {
@@ -1638,7 +1640,7 @@ public class DFSClient implements java.i
    * @param socketFactory to create sockets to connect to DNs
    * @param socketTimeout timeout to use when connecting and waiting for a response
    * @param encryptionKey the key needed to communicate with DNs in this cluster
-   * @param connectToDnViaHostname {@see #connectToDnViaHostname()}
+   * @param connectToDnViaHostname {@link #connectToDnViaHostname()}
    * @return The checksum 
    */
   static MD5MD5CRC32FileChecksum getFileChecksum(String src,
@@ -2250,6 +2252,12 @@ public class DFSClient implements java.i
   }
 
   void disableShortCircuit() {
+    LOG.info("Short circuit is disabled");
     shortCircuitLocalReads = false;
   }
+  
+  @VisibleForTesting
+  boolean getShortCircuitLocalReads() {
+    return shortCircuitLocalReads;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1456048&r1=1456047&r2=1456048&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
Wed Mar 13 16:54:10 2013
@@ -460,6 +460,10 @@ public class DFSInputStream extends FSIn
                              " for " + blk);
         }
         return chosenNode;
+      } catch (AccessControlException ex) {
+        DFSClient.LOG.warn("Short circuit access failed " + ex);
+        dfsClient.disableShortCircuit();
+        continue;
       } catch (IOException ex) {
         if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey >
0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
@@ -806,7 +810,7 @@ public class DFSInputStream extends FSIn
         // we want to remember what we have tried
         addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
       } catch (AccessControlException ex) {
-        DFSClient.LOG.warn("Short circuit access failed ", ex);
+        DFSClient.LOG.warn("Short circuit access failed " + ex);
         dfsClient.disableShortCircuit();
         continue;
       } catch (IOException e) {
@@ -885,9 +889,9 @@ public class DFSInputStream extends FSIn
     // Can't local read a block under construction, see HDFS-2757
     if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
         !blockUnderConstruction()) {
-      return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
-          blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
-          dfsClient.connectToDnViaHostname());
+      return DFSClient.getLocalBlockReader(dfsClient.ugi, dfsClient.conf,
+          src, block, blockToken, chosenNode, dfsClient.hdfsTimeout,
+          startOffset, dfsClient.connectToDnViaHostname());
     }
     
     IOException err = null;
@@ -1027,8 +1031,8 @@ public class DFSInputStream extends FSIn
    * only report if the total number of replica is 1. We do not
    * report otherwise since this maybe due to the client is a handicapped client
    * (who can not read).
-   * @param corruptedBlockMap, map of corrupted blocks
-   * @param dataNodeCount, number of data nodes who contains the block replicas
+   * @param corruptedBlockMap map of corrupted blocks
+   * @param dataNodeCount number of data nodes who contains the block replicas
    */
   private void reportCheckSumFailure(
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1456048&r1=1456047&r2=1456048&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
Wed Mar 13 16:54:10 2013
@@ -67,6 +67,8 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /****************************************************************
  * Implementation of the abstract FileSystem for the DFS system.
@@ -564,9 +566,8 @@ public class DistributedFileSystem exten
     return "DFS[" + dfs + "]";
   }
 
-  /** @deprecated DFSClient should not be accessed directly. */
   @InterfaceAudience.Private
-  @Deprecated
+  @VisibleForTesting
   public DFSClient getClient() {
     return dfs;
   }        

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1456048&r1=1456047&r2=1456048&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
Wed Mar 13 16:54:10 2013
@@ -18,9 +18,11 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 
@@ -32,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -85,9 +88,20 @@ public class TestShortCircuitLocalRead {
       }
     }
   }
+  
+  private static String getCurrentUser() throws IOException {
+    return UserGroupInformation.getCurrentUser().getShortUserName();
+  }
 
-  static void checkFileContent(FileSystem fs, Path name, byte[] expected,
-      int readOffset) throws IOException {
+  /** Check file content, reading as user {@code readingUser} */
+  static void checkFileContent(URI uri, Path name, byte[] expected,
+      int readOffset, String readingUser, Configuration conf,
+      boolean shortCircuitFails)
+      throws IOException, InterruptedException {
+    // Ensure short circuit is enabled
+    DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
+    assertTrue(fs.getClient().getShortCircuitLocalReads());
+    
     FSDataInputStream stm = fs.open(name);
     byte[] actual = new byte[expected.length-readOffset];
     stm.readFully(readOffset, actual);
@@ -112,6 +126,11 @@ public class TestShortCircuitLocalRead {
       nread += nbytes;
     }
     checkData(actual, readOffset, expected, "Read 3");
+    
+    if (shortCircuitFails) {
+      // short circuit should be disabled due to failure
+      assertFalse(fs.getClient().getShortCircuitLocalReads());
+    }
     stm.close();
   }
 
@@ -123,11 +142,15 @@ public class TestShortCircuitLocalRead {
     return arr;
   }
   
-  /**
-   * Verifies that reading a file with the direct read(ByteBuffer) api gives the expected
set of bytes.
-   */
-  static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected,
-      int readOffset) throws IOException {
+  /** Check the file content, reading as user {@code readingUser} */
+  static void checkFileContentDirect(URI uri, Path name, byte[] expected,
+      int readOffset, String readingUser, Configuration conf,
+      boolean shortCircuitFails)
+      throws IOException, InterruptedException {
+    // Ensure short circuit is enabled
+    DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
+    assertTrue(fs.getClient().getShortCircuitLocalReads());
+    
     DFSDataInputStream stm = (DFSDataInputStream)fs.open(name);
 
     ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
@@ -157,21 +180,33 @@ public class TestShortCircuitLocalRead {
       nread += nbytes;
     }
     checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
+    if (shortCircuitFails) {
+      // short circuit should be disabled due to failure
+      assertFalse(fs.getClient().getShortCircuitLocalReads());
+    }
     stm.close();
   }
 
+  public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+      int readOffset) throws IOException, InterruptedException {
+    String shortCircuitUser = getCurrentUser();
+    doTestShortCircuitRead(ignoreChecksum, size, readOffset, shortCircuitUser,
+        shortCircuitUser, false);
+  }
+  
   /**
    * Test that file data can be read by reading the block file
    * directly from the local store.
    */
   public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
-      int readOffset) throws IOException {
+      int readOffset, String shortCircuitUser, String readingUser,
+      boolean shortCircuitFails) throws IOException, InterruptedException {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
         ignoreChecksum);
     conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
+        shortCircuitUser);
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }
@@ -184,53 +219,88 @@ public class TestShortCircuitLocalRead {
       assertTrue("/ should be a directory", fs.getFileStatus(path)
           .isDirectory() == true);
       
-      byte[] fileData = AppendTestUtil.randomBytes(seed, size);
       // create a new file in home directory. Do not close it.
-      Path file1 = new Path("filelocal.dat");
+      byte[] fileData = AppendTestUtil.randomBytes(seed, size);
+      Path file1 = fs.makeQualified(new Path("filelocal.dat"));
       FSDataOutputStream stm = createFile(fs, file1, 1);
-
-      // write to file
       stm.write(fileData);
       stm.close();
-      checkFileContent(fs, file1, fileData, readOffset);
-      checkFileContentDirect(fs, file1, fileData, readOffset);
+      
+      URI uri = cluster.getURI();
+      checkFileContent(uri, file1, fileData, readOffset, readingUser, conf,
+          shortCircuitFails);
+      checkFileContentDirect(uri, file1, fileData, readOffset, readingUser,
+          conf, shortCircuitFails);
     } finally {
       fs.close();
       cluster.shutdown();
     }
   }
 
-  @Test
-  public void testFileLocalReadNoChecksum() throws IOException {
+  @Test(timeout=10000)
+  public void testFileLocalReadNoChecksum() throws Exception {
     doTestShortCircuitRead(true, 3*blockSize+100, 0);
   }
 
-  @Test
-  public void testFileLocalReadChecksum() throws IOException {
+  @Test(timeout=10000)
+  public void testFileLocalReadChecksum() throws Exception {
     doTestShortCircuitRead(false, 3*blockSize+100, 0);
   }
   
-  @Test
-  public void testSmallFileLocalRead() throws IOException {
+  @Test(timeout=10000)
+  public void testSmallFileLocalRead() throws Exception {
     doTestShortCircuitRead(false, 13, 0);
     doTestShortCircuitRead(false, 13, 5);
     doTestShortCircuitRead(true, 13, 0);
     doTestShortCircuitRead(true, 13, 5);
   }
   
-  @Test
-  public void testReadFromAnOffset() throws IOException {
+  /**
+   * Try a short circuit from a reader that is not allowed to
+   * to use short circuit. The test ensures reader falls back to non
+   * shortcircuit reads when shortcircuit is disallowed.
+   */
+  @Test(timeout=10000)
+  public void testLocalReadFallback() throws Exception {
+    doTestShortCircuitRead(true, 13, 0, getCurrentUser(), "notallowed", true);
+  }
+  
+  @Test(timeout=10000)
+  public void testReadFromAnOffset() throws Exception {
     doTestShortCircuitRead(false, 3*blockSize+100, 777);
     doTestShortCircuitRead(true, 3*blockSize+100, 777);
   }
   
-  @Test
-  public void testLongFile() throws IOException {
+  @Test(timeout=10000)
+  public void testLongFile() throws Exception {
     doTestShortCircuitRead(false, 10*blockSize+100, 777);
     doTestShortCircuitRead(true, 10*blockSize+100, 777);
   }
    
-  @Test
+  private ClientDatanodeProtocol getProxy(UserGroupInformation ugi,
+      final DatanodeID dnInfo, final Configuration conf) throws IOException,
+      InterruptedException {
+    return ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+      @Override
+      public ClientDatanodeProtocol run() throws Exception {
+        return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000,
+            false);
+      }
+    });
+  }
+  
+  private static DistributedFileSystem getFileSystem(String user, final URI uri,
+      final Configuration conf) throws InterruptedException, IOException {
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+    return ugi.doAs(new PrivilegedExceptionAction<DistributedFileSystem>() {
+      @Override
+      public DistributedFileSystem run() throws Exception {
+        return (DistributedFileSystem)FileSystem.get(uri, conf);
+      }
+    });
+  }
+  
+  @Test(timeout=10000)
   public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
     final Configuration conf = new Configuration();
     conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
@@ -253,15 +323,7 @@ public class TestShortCircuitLocalRead {
       ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
       Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
       final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
-      ClientDatanodeProtocol proxy = aUgi1
-          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
-                  60000, false);
-            }
-          });
-      
+      ClientDatanodeProtocol proxy = getProxy(aUgi1, dnInfo, conf);
       // This should succeed
       BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
       Assert.assertEquals(
@@ -269,14 +331,7 @@ public class TestShortCircuitLocalRead {
           blpi.getBlockPath());
 
       // Try with the other allowed user
-      proxy = aUgi2
-          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
-                  60000, false);
-            }
-          });
+      proxy = getProxy(aUgi2, dnInfo, conf);
 
       // This should succeed as well
       blpi = proxy.getBlockLocalPathInfo(blk, token);
@@ -287,14 +342,7 @@ public class TestShortCircuitLocalRead {
       // Now try with a disallowed user
       UserGroupInformation bUgi = UserGroupInformation
           .createRemoteUser("notalloweduser");
-      proxy = bUgi
-          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
-                  60000, false);
-            }
-          });
+      proxy = getProxy(bUgi, dnInfo, conf);
       try {
         proxy.getBlockLocalPathInfo(blk, token);
         Assert.fail("The call should have failed as " + bUgi.getShortUserName()
@@ -309,14 +357,14 @@ public class TestShortCircuitLocalRead {
     }
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testSkipWithVerifyChecksum() throws IOException {
     int size = blockSize;
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
     conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
+        getCurrentUser());
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }
@@ -356,7 +404,7 @@ public class TestShortCircuitLocalRead {
   }
      
   /**
-   * Test to run benchmarks between shortcircuit read vs regular read with
+   * Test to run benchmarks between short circuit read vs regular read with
    * specified number of threads simultaneously reading.
    * <br>
    * Run this using the following command:
@@ -374,7 +422,7 @@ public class TestShortCircuitLocalRead {
     int threadCount = Integer.valueOf(args[2]);
 
     // Setup create a file
-    Configuration conf = new Configuration();
+    final Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
         checksum);
@@ -400,9 +448,13 @@ public class TestShortCircuitLocalRead {
         public void run() {
           for (int i = 0; i < iteration; i++) {
             try {
-              checkFileContent(fs, file1, dataToWrite, 0);
+              String user = getCurrentUser();
+              checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf,
+                  true);
             } catch (IOException e) {
               e.printStackTrace();
+            } catch (InterruptedException e) {
+              e.printStackTrace();
             }
           }
         }



Mime
View raw message