hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1202013 [2/3] - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/apache/hadoop/hdfs/...
Date Tue, 15 Nov 2011 02:39:29 GMT
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Nov 15 02:39:13 2011
@@ -56,14 +56,17 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
+import org.apache.hadoop.hdfs.web.resources.DoAsParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
 import org.apache.hadoop.http.HtmlQuoting;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
 
@@ -117,7 +120,8 @@ public class JspHelper {
       return 0;
     }
   }
-  public static DatanodeInfo bestNode(LocatedBlocks blks) throws IOException {
+  public static DatanodeInfo bestNode(LocatedBlocks blks, Configuration conf)
+      throws IOException {
     HashMap<DatanodeInfo, NodeRecord> map =
       new HashMap<DatanodeInfo, NodeRecord>();
     for (LocatedBlock block : blks.getLocatedBlocks()) {
@@ -133,16 +137,17 @@ public class JspHelper {
     }
     NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]);
     Arrays.sort(nodes, new NodeRecordComparator());
-    return bestNode(nodes, false);
+    return bestNode(nodes, false, conf);
   }
 
-  public static DatanodeInfo bestNode(LocatedBlock blk) throws IOException {
+  public static DatanodeInfo bestNode(LocatedBlock blk, Configuration conf)
+      throws IOException {
     DatanodeInfo[] nodes = blk.getLocations();
-    return bestNode(nodes, true);
+    return bestNode(nodes, true, conf);
   }
 
-  public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom)
-    throws IOException {
+  public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom,
+      Configuration conf) throws IOException {
     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
     DatanodeInfo chosenNode = null;
     int failures = 0;
@@ -169,7 +174,7 @@ public class JspHelper {
           chosenNode.getHost() + ":" + chosenNode.getInfoPort());
         
       try {
-        s = new Socket();
+        s = NetUtils.getDefaultSocketFactory(conf).createSocket();
         s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
       } catch (IOException e) {
@@ -191,27 +196,26 @@ public class JspHelper {
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
       JspWriter out, Configuration conf) throws IOException {
     if (chunkSizeToView == 0) return;
-    Socket s = new Socket();
+    Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
     s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
       
-    long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
+    int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
       
       // Use the block name for file name. 
-    int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
-        DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
     String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
-    BlockReader blockReader = BlockReaderFactory.newBlockReader(s, file,
+    BlockReader blockReader = BlockReaderFactory.newBlockReader(
+        conf, s, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead, bufferSize);
+        offsetIntoBlock, amtToRead);
         
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
     int retries = 2;
     while ( amtToRead > 0 ) {
-      int numRead;
+      int numRead = amtToRead;
       try {
-        numRead = blockReader.readAll(buf, readOffset, (int)amtToRead);
+        blockReader.readFully(buf, readOffset, amtToRead);
       }
       catch (IOException e) {
         retries--;
@@ -534,9 +538,10 @@ public class JspHelper {
       final boolean tryUgiParameter) throws IOException {
     final UserGroupInformation ugi;
     final String usernameFromQuery = getUsernameFromQuery(request, tryUgiParameter);
+    final String doAsUserFromQuery = request.getParameter(DoAsParam.NAME);
 
     if(UserGroupInformation.isSecurityEnabled()) {
-      final String user = request.getRemoteUser();
+      final String remoteUser = request.getRemoteUser();
       String tokenString = request.getParameter(DELEGATION_PARAMETER_NAME);
       if (tokenString != null) {
         Token<DelegationTokenIdentifier> token = 
@@ -544,9 +549,8 @@ public class JspHelper {
         token.decodeFromUrlString(tokenString);
         String serviceAddress = getNNServiceAddress(context, request);
         if (serviceAddress != null) {
-          LOG.info("Setting service in token: "
-              + new Text(serviceAddress));
           token.setService(new Text(serviceAddress));
+          token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
         }
         ByteArrayInputStream buf = new ByteArrayInputStream(token
             .getIdentifier());
@@ -561,26 +565,36 @@ public class JspHelper {
           }
         }
         ugi = id.getUser();
-        checkUsername(ugi.getShortUserName(), usernameFromQuery);
-        checkUsername(ugi.getShortUserName(), user);
+        if (ugi.getRealUser() == null) {
+          //non-proxy case
+          checkUsername(ugi.getShortUserName(), usernameFromQuery);
+          checkUsername(null, doAsUserFromQuery);
+        } else {
+          //proxy case
+          checkUsername(ugi.getRealUser().getShortUserName(), usernameFromQuery);
+          checkUsername(ugi.getShortUserName(), doAsUserFromQuery);
+          ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
+        }
         ugi.addToken(token);
         ugi.setAuthenticationMethod(AuthenticationMethod.TOKEN);
       } else {
-        if(user == null) {
+        if(remoteUser == null) {
           throw new IOException("Security enabled but user not " +
                                 "authenticated by filter");
         }
-        ugi = UserGroupInformation.createRemoteUser(user);
-        checkUsername(ugi.getShortUserName(), usernameFromQuery);
+        final UserGroupInformation realUgi = UserGroupInformation.createRemoteUser(remoteUser);
+        checkUsername(realUgi.getShortUserName(), usernameFromQuery);
         // This is not necessarily true, could have been auth'ed by user-facing
         // filter
-        ugi.setAuthenticationMethod(secureAuthMethod);
+        realUgi.setAuthenticationMethod(secureAuthMethod);
+        ugi = initUGI(realUgi, doAsUserFromQuery, request, true, conf);
       }
     } else { // Security's not on, pull from url
-      ugi = usernameFromQuery == null?
+      final UserGroupInformation realUgi = usernameFromQuery == null?
           getDefaultWebUser(conf) // not specified in request
           : UserGroupInformation.createRemoteUser(usernameFromQuery);
-      ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
+      realUgi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
+      ugi = initUGI(realUgi, doAsUserFromQuery, request, false, conf);
     }
     
     if(LOG.isDebugEnabled())
@@ -588,12 +602,34 @@ public class JspHelper {
     return ugi;
   }
 
+  private static UserGroupInformation initUGI(final UserGroupInformation realUgi,
+      final String doAsUserFromQuery, final HttpServletRequest request,
+      final boolean isSecurityEnabled, final Configuration conf
+      ) throws AuthorizationException {
+    final UserGroupInformation ugi;
+    if (doAsUserFromQuery == null) {
+      //non-proxy case
+      ugi = realUgi;
+    } else {
+      //proxy case
+      ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, realUgi);
+      ugi.setAuthenticationMethod(
+          isSecurityEnabled? AuthenticationMethod.PROXY: AuthenticationMethod.SIMPLE);
+      ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
+    }
+    return ugi;
+  }
+
   /**
    * Expected user name should be a short name.
    */
   private static void checkUsername(final String expected, final String name
       ) throws IOException {
-    if (name == null) {
+    if (expected == null && name != null) {
+      throw new IOException("Usernames not matched: expecting null but name="
+          + name);
+    }
+    if (name == null) { //name is optional, null is okay
       return;
     }
     KerberosName u = new KerberosName(name);

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Tue Nov 15 02:39:13 2011
@@ -18,12 +18,15 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 
+import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -88,6 +91,18 @@ class BlockMetadataHeader {
     }
   }
   
+  /**
+   * Read the header at the beginning of the given block meta file.
+   * The current file position will be altered by this method.
+   * If an error occurs, the file is <em>not</em> closed.
+   */
+  static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException {
+    byte[] buf = new byte[getHeaderSize()];
+    raf.seek(0);
+    raf.readFully(buf, 0, buf.length);
+    return readHeader(new DataInputStream(new ByteArrayInputStream(buf)));
+  }
+  
   // Version is already read.
   private static BlockMetadataHeader readHeader(short version, DataInputStream in) 
                                    throws IOException {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Nov 15 02:39:13 2011
@@ -63,7 +63,15 @@ class BlockReceiver implements Closeable
   private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
   
   private DataInputStream in = null; // from where data are read
-  private DataChecksum checksum; // from where chunks of a block can be read
+  private DataChecksum clientChecksum; // checksum used by client
+  private DataChecksum diskChecksum; // checksum we write to disk
+  
+  /**
+   * In the case that the client is writing with a different
+   * checksum polynomial than the block is stored with on disk,
+   * the DataNode needs to recalculate checksums before writing.
+   */
+  private boolean needsChecksumTranslation;
   private OutputStream out = null; // to block file at local disk
   private FileDescriptor outFd;
   private OutputStream cout = null; // output stream for cehcksum file
@@ -177,33 +185,35 @@ class BlockReceiver implements Closeable
               " while receiving block " + block + " from " + inAddr);
         }
       }
-      // read checksum meta information
-      this.checksum = requestedChecksum;
-      this.bytesPerChecksum = checksum.getBytesPerChecksum();
-      this.checksumSize = checksum.getChecksumSize();
       this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
       this.syncBehindWrites = datanode.shouldSyncBehindWrites();
       
       final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
-      streams = replicaInfo.createStreams(isCreate,
-          this.bytesPerChecksum, this.checksumSize);
-      if (streams != null) {
-        this.out = streams.dataOut;
-        if (out instanceof FileOutputStream) {
-          this.outFd = ((FileOutputStream)out).getFD();
-        } else {
-          LOG.warn("Could not get file descriptor for outputstream of class " +
-              out.getClass());
-        }
-        this.cout = streams.checksumOut;
-        this.checksumOut = new DataOutputStream(new BufferedOutputStream(
-            streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
-        // write data chunk header if creating a new replica
-        if (isCreate) {
-          BlockMetadataHeader.writeHeader(checksumOut, checksum);
-        } 
+      streams = replicaInfo.createStreams(isCreate, requestedChecksum);
+      assert streams != null : "null streams!";
+
+      // read checksum meta information
+      this.clientChecksum = requestedChecksum;
+      this.diskChecksum = streams.getChecksum();
+      this.needsChecksumTranslation = !clientChecksum.equals(diskChecksum);
+      this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
+      this.checksumSize = diskChecksum.getChecksumSize();
+
+      this.out = streams.dataOut;
+      if (out instanceof FileOutputStream) {
+        this.outFd = ((FileOutputStream)out).getFD();
+      } else {
+        LOG.warn("Could not get file descriptor for outputstream of class " +
+            out.getClass());
       }
+      this.cout = streams.checksumOut;
+      this.checksumOut = new DataOutputStream(new BufferedOutputStream(
+          streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
+      // write data chunk header if creating a new replica
+      if (isCreate) {
+        BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
+      } 
     } catch (ReplicaAlreadyExistsException bae) {
       throw bae;
     } catch (ReplicaNotFoundException bne) {
@@ -315,9 +325,9 @@ class BlockReceiver implements Closeable
     while (len > 0) {
       int chunkLen = Math.min(len, bytesPerChecksum);
       
-      checksum.update(dataBuf, dataOff, chunkLen);
+      clientChecksum.update(dataBuf, dataOff, chunkLen);
 
-      if (!checksum.compare(checksumBuf, checksumOff)) {
+      if (!clientChecksum.compare(checksumBuf, checksumOff)) {
         if (srcDataNode != null) {
           try {
             LOG.info("report corrupt block " + block + " from datanode " +
@@ -334,12 +344,32 @@ class BlockReceiver implements Closeable
                               "while writing " + block + " from " + inAddr);
       }
 
-      checksum.reset();
+      clientChecksum.reset();
       dataOff += chunkLen;
       checksumOff += checksumSize;
       len -= chunkLen;
     }
   }
+  
+    
+  /**
+   * Translate CRC chunks from the client's checksum implementation
+   * to the disk checksum implementation.
+   * 
+   * This does not verify the original checksums, under the assumption
+   * that they have already been validated.
+   */
+  private void translateChunks( byte[] dataBuf, int dataOff, int len, 
+                             byte[] checksumBuf, int checksumOff ) 
+                             throws IOException {
+    if (len == 0) return;
+    
+    int numChunks = (len - 1)/bytesPerChecksum + 1;
+    
+    diskChecksum.calculateChunkedSums(
+        ByteBuffer.wrap(dataBuf, dataOff, len),
+        ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
+  }
 
   /**
    * Makes sure buf.position() is zero without modifying buf.remaining().
@@ -583,9 +613,16 @@ class BlockReceiver implements Closeable
        * protocol includes acks and only the last datanode needs to verify 
        * checksum.
        */
-      if (mirrorOut == null || isDatanode) {
+      if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
         verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+        if (needsChecksumTranslation) {
+          // overwrite the checksums in the packet buffer with the
+          // appropriate polynomial for the disk storage.
+          translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+        }
       }
+      
+      // by this point, the data in the buffer uses the disk checksum
 
       byte[] lastChunkChecksum;
       
@@ -807,7 +844,7 @@ class BlockReceiver implements Closeable
     // find offset of the beginning of partial chunk.
     //
     int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
-    int checksumSize = checksum.getChecksumSize();
+    int checksumSize = diskChecksum.getChecksumSize();
     blkoff = blkoff - sizePartialChunk;
     LOG.info("computePartialChunkCrc sizePartialChunk " + 
               sizePartialChunk +
@@ -832,7 +869,8 @@ class BlockReceiver implements Closeable
     }
 
     // compute crc of partial chunk from data read in the block file.
-    partialCrc = new PureJavaCrc32();
+    partialCrc = DataChecksum.newDataChecksum(
+        diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
     partialCrc.update(buf, 0, sizePartialChunk);
     LOG.info("Read in partial CRC chunk from disk for block " + block);
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Tue Nov 15 02:39:13 2011
@@ -124,7 +124,7 @@ public class DatanodeJspHelper {
         if (locations == null || locations.length == 0) {
           out.print("Empty file");
         } else {
-          DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock);
+          DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock, conf);
           String fqdn = InetAddress.getByName(chosenNode.getHost())
               .getCanonicalHostName();
           String datanodeAddr = chosenNode.getName();
@@ -299,7 +299,7 @@ public class DatanodeJspHelper {
     // URL for TAIL
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     try {
-      chosenNode = JspHelper.bestNode(lastBlk);
+      chosenNode = JspHelper.bestNode(lastBlk, conf);
     } catch (IOException e) {
       out.print(e.toString());
       dfs.close();
@@ -514,7 +514,7 @@ public class DatanodeJspHelper {
                 .getGenerationStamp());
             nextStartOffset = 0;
             nextBlockSize = nextBlock.getBlock().getNumBytes();
-            DatanodeInfo d = JspHelper.bestNode(nextBlock);
+            DatanodeInfo d = JspHelper.bestNode(nextBlock, conf);
             String datanodeAddr = d.getName();
             nextDatanodePort = Integer.parseInt(datanodeAddr.substring(
                 datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
@@ -569,7 +569,7 @@ public class DatanodeJspHelper {
             if (prevStartOffset < 0)
               prevStartOffset = 0;
             prevBlockSize = prevBlock.getBlock().getNumBytes();
-            DatanodeInfo d = JspHelper.bestNode(prevBlock);
+            DatanodeInfo d = JspHelper.bestNode(prevBlock, conf);
             String datanodeAddr = d.getName();
             prevDatanodePort = Integer.parseInt(datanodeAddr.substring(
                 datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
@@ -686,7 +686,7 @@ public class DatanodeJspHelper {
     long genStamp = lastBlk.getBlock().getGenerationStamp();
     DatanodeInfo chosenNode;
     try {
-      chosenNode = JspHelper.bestNode(lastBlk);
+      chosenNode = JspHelper.bestNode(lastBlk, conf);
     } catch (IOException e) {
       out.print(e.toString());
       dfs.close();

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Nov 15 02:39:13 2011
@@ -1258,7 +1258,7 @@ public class FSDataset implements FSData
   /**
    * Get File name for a given block.
    */
-  public synchronized File getBlockFile(String bpid, Block b)
+  public File getBlockFile(String bpid, Block b)
       throws IOException {
     File f = validateBlockFile(bpid, b);
     if(f == null) {
@@ -1271,16 +1271,44 @@ public class FSDataset implements FSData
   }
   
   @Override // FSDatasetInterface
-  public synchronized InputStream getBlockInputStream(ExtendedBlock b)
+  public InputStream getBlockInputStream(ExtendedBlock b)
       throws IOException {
-    return new FileInputStream(getBlockFile(b));
+    File f = getBlockFileNoExistsCheck(b);
+    try {
+      return new FileInputStream(f);
+    } catch (FileNotFoundException fnfe) {
+      throw new IOException("Block " + b + " is not valid. " +
+          "Expected block file at " + f + " does not exist.");
+    }
+  }
+  
+  /**
+   * Return the File associated with a block, without first
+   * checking that it exists. This should be used when the
+   * next operation is going to open the file for read anyway,
+   * and thus the exists check is redundant.
+   */
+  private File getBlockFileNoExistsCheck(ExtendedBlock b)
+      throws IOException {
+    File f = getFile(b.getBlockPoolId(), b.getLocalBlock());
+    if (f == null) {
+      throw new IOException("Block " + b + " is not valid");
+    }
+    return f;
   }
 
   @Override // FSDatasetInterface
-  public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+  public InputStream getBlockInputStream(ExtendedBlock b,
       long seekOffset) throws IOException {
-    File blockFile = getBlockFile(b);
-    RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
+    File blockFile = getBlockFileNoExistsCheck(b);
+    RandomAccessFile blockInFile;
+    try {
+      blockInFile = new RandomAccessFile(blockFile, "r");
+    } catch (FileNotFoundException fnfe) {
+      throw new IOException("Block " + b + " is not valid. " +
+          "Expected block file at " + blockFile + " does not exist.");
+    }
+
     if (seekOffset > 0) {
       blockInFile.seek(seekOffset);
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Nov 15 02:39:13 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 /**
@@ -158,15 +159,23 @@ public interface FSDatasetInterface exte
      static class BlockWriteStreams {
       OutputStream dataOut;
       OutputStream checksumOut;
-      BlockWriteStreams(OutputStream dOut, OutputStream cOut) {
+      DataChecksum checksum;
+      
+      BlockWriteStreams(OutputStream dOut, OutputStream cOut,
+          DataChecksum checksum) {
         dataOut = dOut;
         checksumOut = cOut;
+        this.checksum = checksum;
       }
       
       void close() throws IOException {
         IOUtils.closeStream(dataOut);
         IOUtils.closeStream(checksumOut);
       }
+      
+      DataChecksum getChecksum() {
+        return checksum;
+      }
     }
 
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Tue Nov 15 02:39:13 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -169,7 +170,7 @@ class ReplicaInPipeline extends ReplicaI
   
   @Override // ReplicaInPipelineInterface
   public BlockWriteStreams createStreams(boolean isCreate, 
-      int bytesPerChunk, int checksumSize) throws IOException {
+      DataChecksum requestedChecksum) throws IOException {
     File blockFile = getBlockFile();
     File metaFile = getMetaFile();
     if (DataNode.LOG.isDebugEnabled()) {
@@ -180,30 +181,64 @@ class ReplicaInPipeline extends ReplicaI
     }
     long blockDiskSize = 0L;
     long crcDiskSize = 0L;
-    if (!isCreate) { // check on disk file
-      blockDiskSize = bytesOnDisk;
-      crcDiskSize = BlockMetadataHeader.getHeaderSize() +
-      (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
-      if (blockDiskSize>0 && 
-          (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
-        throw new IOException("Corrupted block: " + this);
+    
+    // the checksum that should actually be used -- this
+    // may differ from requestedChecksum for appends.
+    DataChecksum checksum;
+    
+    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+    
+    if (!isCreate) {
+      // For append or recovery, we must enforce the existing checksum.
+      // Also, verify that the file has correct lengths, etc.
+      boolean checkedMeta = false;
+      try {
+        BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
+        checksum = header.getChecksum();
+        
+        if (checksum.getBytesPerChecksum() !=
+            requestedChecksum.getBytesPerChecksum()) {
+          throw new IOException("Client requested checksum " +
+              requestedChecksum + " when appending to an existing block " +
+              "with different chunk size: " + checksum);
+        }
+        
+        int bytesPerChunk = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize();
+        
+        blockDiskSize = bytesOnDisk;
+        crcDiskSize = BlockMetadataHeader.getHeaderSize() +
+          (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
+        if (blockDiskSize>0 && 
+            (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
+          throw new IOException("Corrupted block: " + this);
+        }
+        checkedMeta = true;
+      } finally {
+        if (!checkedMeta) {
+          // clean up in case of exceptions.
+          IOUtils.closeStream(metaRAF);
+        }
       }
+    } else {
+      // for create, we can use the requested checksum
+      checksum = requestedChecksum;
     }
+    
     FileOutputStream blockOut = null;
     FileOutputStream crcOut = null;
     try {
       blockOut = new FileOutputStream(
           new RandomAccessFile( blockFile, "rw" ).getFD() );
-      crcOut = new FileOutputStream(
-          new RandomAccessFile( metaFile, "rw" ).getFD() );
+      crcOut = new FileOutputStream(metaRAF.getFD() );
       if (!isCreate) {
         blockOut.getChannel().position(blockDiskSize);
         crcOut.getChannel().position(crcDiskSize);
       }
-      return new BlockWriteStreams(blockOut, crcOut);
+      return new BlockWriteStreams(blockOut, crcOut, checksum);
     } catch (IOException e) {
       IOUtils.closeStream(blockOut);
-      IOUtils.closeStream(crcOut);
+      IOUtils.closeStream(metaRAF);
       throw e;
     }
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java Tue Nov 15 02:39:13 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
+import org.apache.hadoop.util.DataChecksum;
 
 /** 
  * This defines the interface of a replica in Pipeline that's being written to
@@ -61,11 +62,10 @@ interface ReplicaInPipelineInterface ext
    * one for block file and one for CRC file
    * 
    * @param isCreate if it is for creation
-   * @param bytePerChunk number of bytes per CRC chunk
-   * @param checksumSize number of bytes per checksum
+   * @param requestedChecksum the checksum the writer would prefer to use
    * @return output streams for writing
    * @throws IOException if any error occurs
    */
   public BlockWriteStreams createStreams(boolean isCreate,
-      int bytesPerChunk, int checksumSize) throws IOException;
+      DataChecksum requestedChecksum) throws IOException;
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Tue Nov 15 02:39:13 2011
@@ -51,6 +51,7 @@ import org.apache.hadoop.fs.MD5MD5CRC32F
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.web.JsonUtil;
@@ -58,7 +59,9 @@ import org.apache.hadoop.hdfs.web.ParamF
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
+import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
 import org.apache.hadoop.hdfs.web.resources.LengthParam;
 import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
@@ -69,7 +72,9 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
 import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 
 import com.sun.jersey.spi.container.ResourceFilters;
 
@@ -84,6 +89,29 @@ public class DatanodeWebHdfsMethods {
   private @Context ServletContext context;
   private @Context HttpServletResponse response;
 
+  private void init(final UserGroupInformation ugi, final DelegationParam delegation,
+      final UriFsPathParam path, final HttpOpParam<?> op,
+      final Param<?, ?>... parameters) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
+          + ", ugi=" + ugi + Param.toSortedString(", ", parameters));
+    }
+
+    //clear content type
+    response.setContentType(null);
+    
+    if (UserGroupInformation.isSecurityEnabled()) {
+      //add a token for RPC.
+      final DataNode datanode = (DataNode)context.getAttribute("datanode");
+      final InetSocketAddress nnRpcAddr = NameNode.getAddress(datanode.getConf());
+      final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
+      token.decodeFromUrlString(delegation.getValue());
+      SecurityUtil.setTokenService(token, nnRpcAddr);
+      token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+      ugi.addToken(token);
+    }
+  }
+
   /** Handle HTTP PUT request for the root. */
   @PUT
   @Path("/")
@@ -92,6 +120,8 @@ public class DatanodeWebHdfsMethods {
   public Response putRoot(
       final InputStream in,
       @Context final UserGroupInformation ugi,
+      @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
+          final DelegationParam delegation,
       @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
           final PutOpParam op,
       @QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
@@ -105,7 +135,7 @@ public class DatanodeWebHdfsMethods {
       @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
           final BlockSizeParam blockSize
       ) throws IOException, InterruptedException {
-    return put(in, ugi, ROOT, op, permission, overwrite, bufferSize,
+    return put(in, ugi, delegation, ROOT, op, permission, overwrite, bufferSize,
         replication, blockSize);
   }
 
@@ -117,6 +147,8 @@ public class DatanodeWebHdfsMethods {
   public Response put(
       final InputStream in,
       @Context final UserGroupInformation ugi,
+      @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
+          final DelegationParam delegation,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
           final PutOpParam op,
@@ -132,14 +164,8 @@ public class DatanodeWebHdfsMethods {
           final BlockSizeParam blockSize
       ) throws IOException, InterruptedException {
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(op + ": " + path + ", ugi=" + ugi
-          + Param.toSortedString(", ", permission, overwrite, bufferSize,
-              replication, blockSize));
-    }
-
-    //clear content type
-    response.setContentType(null);
+    init(ugi, delegation, path, op, permission, overwrite, bufferSize,
+        replication, blockSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -193,12 +219,14 @@ public class DatanodeWebHdfsMethods {
   public Response postRoot(
       final InputStream in,
       @Context final UserGroupInformation ugi,
+      @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
+          final DelegationParam delegation,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
-    return post(in, ugi, ROOT, op, bufferSize);
+    return post(in, ugi, delegation, ROOT, op, bufferSize);
   }
 
   /** Handle HTTP POST request. */
@@ -209,6 +237,8 @@ public class DatanodeWebHdfsMethods {
   public Response post(
       final InputStream in,
       @Context final UserGroupInformation ugi,
+      @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
+          final DelegationParam delegation,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
@@ -216,13 +246,7 @@ public class DatanodeWebHdfsMethods {
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(op + ": " + path + ", ugi=" + ugi
-          + Param.toSortedString(", ", bufferSize));
-    }
-
-    //clear content type
-    response.setContentType(null);
+    init(ugi, delegation, path, op, bufferSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -265,6 +289,8 @@ public class DatanodeWebHdfsMethods {
   @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
   public Response getRoot(
       @Context final UserGroupInformation ugi,
+      @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
+          final DelegationParam delegation,
       @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
           final GetOpParam op,
       @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
@@ -274,7 +300,7 @@ public class DatanodeWebHdfsMethods {
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
-    return get(ugi, ROOT, op, offset, length, bufferSize); 
+    return get(ugi, delegation, ROOT, op, offset, length, bufferSize); 
   }
 
   /** Handle HTTP GET request. */
@@ -283,6 +309,8 @@ public class DatanodeWebHdfsMethods {
   @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
   public Response get(
       @Context final UserGroupInformation ugi,
+      @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
+          final DelegationParam delegation,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
           final GetOpParam op,
@@ -294,13 +322,7 @@ public class DatanodeWebHdfsMethods {
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(op + ": " + path + ", ugi=" + ugi
-          + Param.toSortedString(", ", offset, length, bufferSize));
-    }
-
-    //clear content type
-    response.setContentType(null);
+    init(ugi, delegation, path, op, offset, length, bufferSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -350,9 +372,7 @@ public class DatanodeWebHdfsMethods {
         }
       };
 
-      final int status = offset.getValue() == 0?
-          HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT;
-      return Response.status(status).entity(streaming).type(
+      return Response.ok(streaming).type(
           MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETFILECHECKSUM:

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Nov 15 02:39:13 2011
@@ -4050,7 +4050,7 @@ public class FSNamesystem implements Nam
    * @throws IOException
    */
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
-      String startBlockAfter) throws IOException {
+	String[] cookieTab) throws IOException {
 
     readLock();
     try {
@@ -4059,23 +4059,27 @@ public class FSNamesystem implements Nam
                               "replication queues have not been initialized.");
       }
       checkSuperuserPrivilege();
-      long startBlockId = 0;
       // print a limited # of corrupt files per call
       int count = 0;
       ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
-      
-      if (startBlockAfter != null) {
-        startBlockId = Block.filename2id(startBlockAfter);
-      }
 
       final Iterator<Block> blkIterator = blockManager.getCorruptReplicaBlockIterator();
+
+      if (cookieTab == null) {
+        cookieTab = new String[] { null };
+      }
+      int skip = getIntCookie(cookieTab[0]);
+      for (int i = 0; i < skip && blkIterator.hasNext(); i++) {
+        blkIterator.next();
+      }
+
       while (blkIterator.hasNext()) {
         Block blk = blkIterator.next();
         INode inode = blockManager.getINode(blk);
+        skip++;
         if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
           String src = FSDirectory.getFullPathName(inode);
-          if (((startBlockAfter == null) || (blk.getBlockId() > startBlockId))
-              && (src.startsWith(path))) {
+          if (src.startsWith(path)){
             corruptFiles.add(new CorruptFileBlockInfo(src, blk));
             count++;
             if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED)
@@ -4083,13 +4087,32 @@ public class FSNamesystem implements Nam
           }
         }
       }
+      cookieTab[0] = String.valueOf(skip);
       LOG.info("list corrupt file blocks returned: " + count);
       return corruptFiles;
     } finally {
       readUnlock();
     }
   }
-  
+
+  /**
+   * Convert string cookie to integer.
+   */
+  private static int getIntCookie(String cookie){
+    int c;
+    if(cookie == null){
+      c = 0;
+    } else {
+      try{
+        c = Integer.parseInt(cookie);
+      }catch (NumberFormatException e) {
+        c = 0;
+      }
+    }
+    c = Math.max(0, c);
+    return c;
+  }
+
   /**
    * Create delegation token secret manager
    */

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Tue Nov 15 02:39:13 2011
@@ -52,7 +52,9 @@ public class FileDataServlet extends Dfs
     String scheme = request.getScheme();
     final LocatedBlocks blks = nnproxy.getBlockLocations(
         status.getFullPath(new Path(path)).toUri().getPath(), 0, 1);
-    final DatanodeID host = pickSrcDatanode(blks, status);
+    final Configuration conf = NameNodeHttpServer.getConfFromContext(
+        getServletContext());
+    final DatanodeID host = pickSrcDatanode(blks, status, conf);
     final String hostname;
     if (host instanceof DatanodeInfo) {
       hostname = ((DatanodeInfo)host).getHostName();
@@ -83,16 +85,17 @@ public class FileDataServlet extends Dfs
   /** Select a datanode to service this request.
    * Currently, this looks at no more than the first five blocks of a file,
    * selecting a datanode randomly from the most represented.
+   * @param conf 
    */
-  private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i)
-      throws IOException {
+  private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i,
+      Configuration conf) throws IOException {
     if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
       // pick a random datanode
       NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
           getServletContext());
       return NamenodeJspHelper.getRandomDatanode(nn);
     }
-    return JspHelper.bestNode(blks);
+    return JspHelper.bestNode(blks, conf);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Tue Nov 15 02:39:13 2011
@@ -744,17 +744,16 @@ class NameNodeRpcServer implements Namen
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {
     nn.checkOperation(OperationCategory.READ);
+    String[] cookieTab = new String[] { cookie };
     Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
-      namesystem.listCorruptFileBlocks(path, cookie);
-    
+      namesystem.listCorruptFileBlocks(path, cookieTab);
+
     String[] files = new String[fbs.size()];
-    String lastCookie = "";
     int i = 0;
     for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
       files[i++] = fb.path;
-      lastCookie = fb.block.getBlockName();
     }
-    return new CorruptFileBlocks(files, lastCookie);
+    return new CorruptFileBlocks(files, cookieTab[0]);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Tue Nov 15 02:39:13 2011
@@ -114,11 +114,11 @@ public class NamenodeFsck {
   // We return back N files that are corrupt; the list of files returned is
   // ordered by block id; to allow continuation support, pass in the last block
   // # from previous call
-  private String startBlockAfter = null;
-  
+  private String[] currentCookie = new String[] { null };
+
   private final Configuration conf;
   private final PrintWriter out;
-  
+
   /**
    * Filesystem checker.
    * @param conf configuration (namenode config)
@@ -156,11 +156,11 @@ public class NamenodeFsck {
         this.showCorruptFileBlocks = true;
       }
       else if (key.equals("startblockafter")) {
-        this.startBlockAfter = pmap.get("startblockafter")[0]; 
+        this.currentCookie[0] = pmap.get("startblockafter")[0];
       }
     }
   }
-  
+
   /**
    * Check files on DFS, starting from the indicated path.
    */
@@ -216,19 +216,20 @@ public class NamenodeFsck {
       out.close();
     }
   }
- 
+
   private void listCorruptFileBlocks() throws IOException {
     Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode.
-      getNamesystem().listCorruptFileBlocks(path, startBlockAfter);
+      getNamesystem().listCorruptFileBlocks(path, currentCookie);
     int numCorruptFiles = corruptFiles.size();
     String filler;
     if (numCorruptFiles > 0) {
       filler = Integer.toString(numCorruptFiles);
-    } else if (startBlockAfter == null) {
+    } else if (currentCookie[0].equals("0")) {
       filler = "no";
     } else {
       filler = "no more";
     }
+    out.println("Cookie:\t" + currentCookie[0]);
     for (FSNamesystem.CorruptFileBlockInfo c : corruptFiles) {
       out.println(c.toString());
     }
@@ -509,8 +510,9 @@ public class NamenodeFsck {
         
         String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
             block.getBlockId());
-        blockReader = BlockReaderFactory.newBlockReader(s, file, block, lblock
-            .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
+        blockReader = BlockReaderFactory.newBlockReader(
+            conf, s, file, block, lblock
+            .getBlockToken(), 0, -1);
         
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Tue Nov 15 02:39:13 2011
@@ -42,14 +42,13 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -68,9 +67,9 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
-import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DestinationParam;
+import org.apache.hadoop.hdfs.web.resources.DoAsParam;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.hdfs.web.resources.GroupParam;
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
@@ -87,6 +86,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
 import org.apache.hadoop.hdfs.web.resources.RenewerParam;
 import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
+import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
 import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
 import org.apache.hadoop.net.NodeBase;
@@ -117,9 +117,24 @@ public class NamenodeWebHdfsMethods {
   private @Context HttpServletRequest request;
   private @Context HttpServletResponse response;
 
+  private void init(final UserGroupInformation ugi,
+      final DelegationParam delegation,
+      final UserParam username, final DoAsParam doAsUser,
+      final UriFsPathParam path, final HttpOpParam<?> op,
+      final Param<?, ?>... parameters) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
+          + ", ugi=" + ugi + ", " + username + ", " + doAsUser
+          + Param.toSortedString(", ", parameters));
+    }
+
+    //clear content type
+    response.setContentType(null);
+  }
+
   private static DatanodeInfo chooseDatanode(final NameNode namenode,
-      final String path, final HttpOpParam.Op op, final long openOffset
-      ) throws IOException {
+      final String path, final HttpOpParam.Op op, final long openOffset,
+      Configuration conf) throws IOException {
     if (op == GetOpParam.Op.OPEN
         || op == GetOpParam.Op.GETFILECHECKSUM
         || op == PostOpParam.Op.APPEND) {
@@ -139,7 +154,7 @@ public class NamenodeWebHdfsMethods {
         final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
         final int count = locations.locatedBlockCount();
         if (count > 0) {
-          return JspHelper.bestNode(locations.get(0));
+          return JspHelper.bestNode(locations.get(0), conf);
         }
       }
     } 
@@ -153,24 +168,25 @@ public class NamenodeWebHdfsMethods {
       final NameNode namenode, final UserGroupInformation ugi,
       final String renewer) throws IOException {
     final Credentials c = DelegationTokenSecretManager.createCredentials(
-        namenode, ugi,
-        renewer != null? renewer: request.getUserPrincipal().getName());
+        namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
     final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
     t.setKind(WebHdfsFileSystem.TOKEN_KIND);
-    SecurityUtil.setTokenService(t, namenode.getNameNodeAddress());
+    SecurityUtil.setTokenService(t, namenode.getHttpAddress());
     return t;
   }
 
   private URI redirectURI(final NameNode namenode,
       final UserGroupInformation ugi, final DelegationParam delegation,
+      final UserParam username, final DoAsParam doAsUser,
       final String path, final HttpOpParam.Op op, final long openOffset,
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
-    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset);
+    final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
+    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, conf);
 
     final String delegationQuery;
     if (!UserGroupInformation.isSecurityEnabled()) {
       //security disabled
-      delegationQuery = "";
+      delegationQuery = Param.toSortedString("&", doAsUser, username);
     } else if (delegation.getValue() != null) {
       //client has provided a token
       delegationQuery = "&" + delegation;
@@ -180,8 +196,7 @@ public class NamenodeWebHdfsMethods {
           namenode, ugi, request.getUserPrincipal().getName());
       delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
     }
-    final String query = op.toQueryString()
-        + '&' + new UserParam(ugi) + delegationQuery
+    final String query = op.toQueryString() + delegationQuery
         + Param.toSortedString("&", parameters);
     final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
 
@@ -202,6 +217,10 @@ public class NamenodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
+          final UserParam username,
+      @QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
+          final DoAsParam doAsUser,
       @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
           final PutOpParam op,
       @QueryParam(DestinationParam.NAME) @DefaultValue(DestinationParam.DEFAULT)
@@ -226,12 +245,13 @@ public class NamenodeWebHdfsMethods {
           final AccessTimeParam accessTime,
       @QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT)
           final RenameOptionSetParam renameOptions,
-      @QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT) 
+      @QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
           final TokenArgumentParam delegationTokenArgument
       ) throws IOException, InterruptedException {
-    return put(ugi, delegation, ROOT, op, destination, owner, group,
-        permission, overwrite, bufferSize, replication, blockSize,
-        modificationTime, accessTime, renameOptions, delegationTokenArgument);
+    return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
+        owner, group, permission, overwrite, bufferSize, replication,
+        blockSize, modificationTime, accessTime, renameOptions,
+        delegationTokenArgument);
   }
 
   /** Handle HTTP PUT request. */
@@ -243,6 +263,10 @@ public class NamenodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
+          final UserParam username,
+      @QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
+          final DoAsParam doAsUser,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
           final PutOpParam op,
@@ -268,19 +292,13 @@ public class NamenodeWebHdfsMethods {
           final AccessTimeParam accessTime,
       @QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT)
           final RenameOptionSetParam renameOptions,
-      @QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT) 
+      @QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
           final TokenArgumentParam delegationTokenArgument
       ) throws IOException, InterruptedException {
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(op + ": " + path + ", ugi=" + ugi
-          + Param.toSortedString(", ", destination, owner, group, permission,
-              overwrite, bufferSize, replication, blockSize,
-              modificationTime, accessTime, renameOptions));
-    }
-
-    //clear content type
-    response.setContentType(null);
+    init(ugi, delegation, username, doAsUser, path, op, destination, owner,
+        group, permission, overwrite, bufferSize, replication, blockSize,
+        modificationTime, accessTime, renameOptions, delegationTokenArgument);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -296,8 +314,8 @@ public class NamenodeWebHdfsMethods {
     switch(op.getValue()) {
     case CREATE:
     {
-      final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
-          op.getValue(), -1L,
+      final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
+          fullpath, op.getValue(), -1L,
           permission, overwrite, bufferSize, replication, blockSize);
       return Response.temporaryRedirect(uri).build();
     } 
@@ -324,8 +342,7 @@ public class NamenodeWebHdfsMethods {
     {
       final boolean b = np.setReplication(fullpath, replication.getValue(conf));
       final String js = JsonUtil.toJsonString("boolean", b);
-      final ResponseBuilder r = b? Response.ok(): Response.status(Status.FORBIDDEN);
-      return r.entity(js).type(MediaType.APPLICATION_JSON).build();
+      return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
     }
     case SETOWNER:
     {
@@ -381,12 +398,16 @@ public class NamenodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
+          final UserParam username,
+      @QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
+          final DoAsParam doAsUser,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
-    return post(ugi, delegation, ROOT, op, bufferSize);
+    return post(ugi, delegation, username, doAsUser, ROOT, op, bufferSize);
   }
 
   /** Handle HTTP POST request. */
@@ -398,6 +419,10 @@ public class NamenodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
+          final UserParam username,
+      @QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
+          final DoAsParam doAsUser,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
@@ -405,13 +430,7 @@ public class NamenodeWebHdfsMethods {
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(op + ": " + path + ", ugi=" + ugi
-          + Param.toSortedString(", ", bufferSize));
-    }
-
-    //clear content type
-    response.setContentType(null);
+    init(ugi, delegation, username, doAsUser, path, op, bufferSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -425,8 +444,8 @@ public class NamenodeWebHdfsMethods {
     switch(op.getValue()) {
     case APPEND:
     {
-      final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
-          op.getValue(), -1L, bufferSize);
+      final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
+          fullpath, op.getValue(), -1L, bufferSize);
       return Response.temporaryRedirect(uri).build();
     }
     default:
@@ -448,6 +467,10 @@ public class NamenodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
+          final UserParam username,
+      @QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
+          final DoAsParam doAsUser,
       @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
           final GetOpParam op,
       @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
@@ -459,7 +482,8 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
       ) throws IOException, URISyntaxException, InterruptedException {
-    return get(ugi, delegation, ROOT, op, offset, length, renewer, bufferSize);
+    return get(ugi, delegation, username, doAsUser, ROOT, op,
+        offset, length, renewer, bufferSize);
   }
 
   /** Handle HTTP GET request. */
@@ -470,6 +494,10 @@ public class NamenodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
+          final UserParam username,
+      @QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
+          final DoAsParam doAsUser,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
           final GetOpParam op,
@@ -483,13 +511,8 @@ public class NamenodeWebHdfsMethods {
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(op + ": " + path + ", ugi=" + ugi
-          + Param.toSortedString(", ", offset, length, renewer, bufferSize));
-    }
-
-    //clear content type
-    response.setContentType(null);
+    init(ugi, delegation, username, doAsUser, path, op,
+        offset, length, renewer, bufferSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -504,11 +527,11 @@ public class NamenodeWebHdfsMethods {
     switch(op.getValue()) {
     case OPEN:
     {
-      final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
-          op.getValue(), offset.getValue(), offset, length, bufferSize);
+      final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
+          fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize);
       return Response.temporaryRedirect(uri).build();
     }
-    case GETFILEBLOCKLOCATIONS:
+    case GET_BLOCK_LOCATIONS:
     {
       final long offsetValue = offset.getValue();
       final Long lengthValue = length.getValue();
@@ -540,17 +563,28 @@ public class NamenodeWebHdfsMethods {
     }
     case GETFILECHECKSUM:
     {
-      final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
-          op.getValue(), -1L);
+      final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
+          fullpath, op.getValue(), -1L);
       return Response.temporaryRedirect(uri).build();
     }
     case GETDELEGATIONTOKEN:
     {
+      if (delegation.getValue() != null) {
+        throw new IllegalArgumentException(delegation.getName()
+            + " parameter is not null.");
+      }
       final Token<? extends TokenIdentifier> token = generateDelegationToken(
           namenode, ugi, renewer.getValue());
       final String js = JsonUtil.toJsonString(token);
       return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
     }
+    case GETHOMEDIRECTORY:
+    {
+      final String js = JsonUtil.toJsonString(
+          org.apache.hadoop.fs.Path.class.getSimpleName(),
+          WebHdfsFileSystem.getHomeDirectoryString(ugi));
+      return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+    }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
     }    
@@ -580,8 +614,8 @@ public class NamenodeWebHdfsMethods {
       @Override
       public void write(final OutputStream outstream) throws IOException {
         final PrintStream out = new PrintStream(outstream);
-        out.println("{\"" + HdfsFileStatus.class.getSimpleName() + "es\":{\""
-            + HdfsFileStatus.class.getSimpleName() + "\":[");
+        out.println("{\"" + FileStatus.class.getSimpleName() + "es\":{\""
+            + FileStatus.class.getSimpleName() + "\":[");
 
         final HdfsFileStatus[] partial = first.getPartialListing();
         if (partial.length > 0) {
@@ -612,12 +646,18 @@ public class NamenodeWebHdfsMethods {
   @Produces(MediaType.APPLICATION_JSON)
   public Response deleteRoot(
       @Context final UserGroupInformation ugi,
+      @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
+          final DelegationParam delegation,
+      @QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
+          final UserParam username,
+      @QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
+          final DoAsParam doAsUser,
       @QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
           final DeleteOpParam op,
       @QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT)
           final RecursiveParam recursive
       ) throws IOException, InterruptedException {
-    return delete(ugi, ROOT, op, recursive);
+    return delete(ugi, delegation, username, doAsUser, ROOT, op, recursive);
   }
 
   /** Handle HTTP DELETE request. */
@@ -626,6 +666,12 @@ public class NamenodeWebHdfsMethods {
   @Produces(MediaType.APPLICATION_JSON)
   public Response delete(
       @Context final UserGroupInformation ugi,
+      @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
+          final DelegationParam delegation,
+      @QueryParam(UserParam.NAME) @DefaultValue(UserParam.DEFAULT)
+          final UserParam username,
+      @QueryParam(DoAsParam.NAME) @DefaultValue(DoAsParam.DEFAULT)
+          final DoAsParam doAsUser,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
           final DeleteOpParam op,
@@ -633,13 +679,7 @@ public class NamenodeWebHdfsMethods {
           final RecursiveParam recursive
       ) throws IOException, InterruptedException {
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(op + ": " + path + ", ugi=" + ugi
-          + Param.toSortedString(", ", recursive));
-    }
-
-    //clear content type
-    response.setContentType(null);
+    init(ugi, delegation, username, doAsUser, path, op, recursive);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java Tue Nov 15 02:39:13 2011
@@ -145,14 +145,15 @@ public class DFSck extends Configured im
       throws IOException {
     int errCode = -1;
     int numCorrupt = 0;
-    String lastBlock = null;
+    int cookie = 0;
     final String noCorruptLine = "has no CORRUPT files";
     final String noMoreCorruptLine = "has no more CORRUPT files";
+    final String cookiePrefix = "Cookie:";
     boolean allDone = false;
     while (!allDone) {
       final StringBuffer url = new StringBuffer(baseUrl);
-      if (lastBlock != null) {
-        url.append("&startblockafter=").append(lastBlock);
+      if (cookie > 0) {
+        url.append("&startblockafter=").append(String.valueOf(cookie));
       }
       URL path = new URL(url.toString());
       SecurityUtil.fetchServiceTicket(path);
@@ -163,29 +164,31 @@ public class DFSck extends Configured im
       try {
         String line = null;
         while ((line = input.readLine()) != null) {
-          if ((line.endsWith(noCorruptLine)) || 
+          if (line.startsWith(cookiePrefix)){
+            try{
+              cookie = Integer.parseInt(line.split("\t")[1]);
+            } catch (Exception e){
+              allDone = true;
+              break;
+            }
+            continue;
+          }
+          if ((line.endsWith(noCorruptLine)) ||
               (line.endsWith(noMoreCorruptLine)) ||
               (line.endsWith(NamenodeFsck.NONEXISTENT_STATUS))) {
             allDone = true;
             break;
           }
           if ((line.isEmpty())
-              || (line.startsWith("FSCK started by")) 
+              || (line.startsWith("FSCK started by"))
               || (line.startsWith("The filesystem under path")))
             continue;
           numCorrupt++;
           if (numCorrupt == 1) {
-            out.println("The list of corrupt files under path '" 
+            out.println("The list of corrupt files under path '"
                 + dir + "' are:");
           }
           out.println(line);
-          try {
-            // Get the block # that we need to send in next call
-            lastBlock = line.split("\t")[0];
-          } catch (Exception e) {
-            allDone = true;
-            break;
-          }
         }
       } finally {
         input.close();

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/AuthFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/AuthFilter.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/AuthFilter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/AuthFilter.java Tue Nov 15 02:39:13 2011
@@ -18,6 +18,12 @@
 package org.apache.hadoop.hdfs.web;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import javax.servlet.FilterChain;
@@ -26,6 +32,7 @@ import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
 
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -67,15 +74,77 @@ public class AuthFilter extends Authenti
   @Override
   public void doFilter(ServletRequest request, ServletResponse response,
       FilterChain filterChain) throws IOException, ServletException {
-    HttpServletRequest httpRequest = (HttpServletRequest) request;
-    String tokenString = httpRequest
-        .getParameter(DelegationParam.NAME);
+    final HttpServletRequest httpRequest = toLowerCase((HttpServletRequest)request);
+    final String tokenString = httpRequest.getParameter(DelegationParam.NAME);
     if (tokenString != null) {
       //Token is present in the url, therefore token will be used for
       //authentication, bypass kerberos authentication.
       filterChain.doFilter(httpRequest, response);
       return;
     }
-    super.doFilter(request, response, filterChain);
+    super.doFilter(httpRequest, response, filterChain);
+  }
+
+  private static HttpServletRequest toLowerCase(final HttpServletRequest request) {
+    @SuppressWarnings("unchecked")
+    final Map<String, String[]> original = (Map<String, String[]>)request.getParameterMap();
+    if (!ParamFilter.containsUpperCase(original.keySet())) {
+      return request;
+    }
+
+    final Map<String, List<String>> m = new HashMap<String, List<String>>();
+    for(Map.Entry<String, String[]> entry : original.entrySet()) {
+      final String key = entry.getKey().toLowerCase();
+      List<String> strings = m.get(key);
+      if (strings == null) {
+        strings = new ArrayList<String>();
+        m.put(key, strings);
+      }
+      for(String v : entry.getValue()) {
+        strings.add(v);
+      }
+    }
+
+    return new HttpServletRequestWrapper(request) {
+      private Map<String, String[]> parameters = null;
+
+      @Override
+      public Map<String, String[]> getParameterMap() {
+        if (parameters == null) {
+          parameters = new HashMap<String, String[]>();
+          for(Map.Entry<String, List<String>> entry : m.entrySet()) {
+            final List<String> a = entry.getValue();
+            parameters.put(entry.getKey(), a.toArray(new String[a.size()]));
+          }
+        }
+       return parameters;
+      }
+
+      @Override
+      public String getParameter(String name) {
+        final List<String> a = m.get(name);
+        return a == null? null: a.get(0);
+      }
+      
+      @Override
+      public String[] getParameterValues(String name) {
+        return getParameterMap().get(name);
+      }
+
+      @Override
+      public Enumeration<String> getParameterNames() {
+        final Iterator<String> i = m.keySet().iterator();
+        return new Enumeration<String>() {
+          @Override
+          public boolean hasMoreElements() {
+            return i.hasNext();
+          }
+          @Override
+          public String nextElement() {
+            return i.next();
+          }
+        };
+      }
+    };
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java Tue Nov 15 02:39:13 2011
@@ -28,6 +28,7 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -134,6 +135,14 @@ public class JsonUtil {
     return new FsPermission(Short.parseShort(s, 8));
   }
 
+  static enum PathType {
+    FILE, DIRECTORY, SYMLINK;
+    
+    static PathType valueOf(HdfsFileStatus status) {
+      return status.isDir()? DIRECTORY: status.isSymlink()? SYMLINK: FILE;
+    }
+  }
+
   /** Convert a HdfsFileStatus object to a Json string. */
   public static String toJsonString(final HdfsFileStatus status,
       boolean includeType) {
@@ -141,14 +150,13 @@ public class JsonUtil {
       return null;
     }
     final Map<String, Object> m = new TreeMap<String, Object>();
-    m.put("localName", status.getLocalName());
-    m.put("isDir", status.isDir());
-    m.put("isSymlink", status.isSymlink());
+    m.put("pathSuffix", status.getLocalName());
+    m.put("type", PathType.valueOf(status));
     if (status.isSymlink()) {
       m.put("symlink", status.getSymlink());
     }
 
-    m.put("len", status.getLen());
+    m.put("length", status.getLen());
     m.put("owner", status.getOwner());
     m.put("group", status.getGroup());
     m.put("permission", toString(status.getPermission()));
@@ -156,8 +164,7 @@ public class JsonUtil {
     m.put("modificationTime", status.getModificationTime());
     m.put("blockSize", status.getBlockSize());
     m.put("replication", status.getReplication());
-    return includeType ? toJsonString(HdfsFileStatus.class, m) : 
-      JSON.toString(m);
+    return includeType ? toJsonString(FileStatus.class, m): JSON.toString(m);
   }
 
   /** Convert a Json map to a HdfsFileStatus object. */
@@ -167,14 +174,13 @@ public class JsonUtil {
     }
 
     final Map<?, ?> m = includesType ? 
-        (Map<?, ?>)json.get(HdfsFileStatus.class.getSimpleName()) : json;
-    final String localName = (String) m.get("localName");
-    final boolean isDir = (Boolean) m.get("isDir");
-    final boolean isSymlink = (Boolean) m.get("isSymlink");
-    final byte[] symlink = isSymlink?
-        DFSUtil.string2Bytes((String)m.get("symlink")): null;
+        (Map<?, ?>)json.get(FileStatus.class.getSimpleName()) : json;
+    final String localName = (String) m.get("pathSuffix");
+    final PathType type = PathType.valueOf((String) m.get("type"));
+    final byte[] symlink = type != PathType.SYMLINK? null
+        : DFSUtil.string2Bytes((String)m.get("symlink"));
 
-    final long len = (Long) m.get("len");
+    final long len = (Long) m.get("length");
     final String owner = (String) m.get("owner");
     final String group = (String) m.get("group");
     final FsPermission permission = toFsPermission((String) m.get("permission"));
@@ -182,8 +188,8 @@ public class JsonUtil {
     final long mTime = (Long) m.get("modificationTime");
     final long blockSize = (Long) m.get("blockSize");
     final short replication = (short) (long) (Long) m.get("replication");
-    return new HdfsFileStatus(len, isDir, replication, blockSize, mTime, aTime,
-        permission, owner, group,
+    return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
+        blockSize, mTime, aTime, permission, owner, group,
         symlink, DFSUtil.string2Bytes(localName));
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ParamFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ParamFilter.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ParamFilter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ParamFilter.java Tue Nov 15 02:39:13 2011
@@ -59,7 +59,7 @@ public class ParamFilter implements Reso
   }
 
   /** Do the strings contain upper case letters? */
-  private static boolean containsUpperCase(final Iterable<String> strings) {
+  static boolean containsUpperCase(final Iterable<String> strings) {
     for(String s : strings) {
       for(int i = 0; i < s.length(); i++) {
         if (Character.isUpperCase(s.charAt(i))) {



Mime
View raw message