hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1102509 [1/3] - in /hadoop/hdfs/branches/HDFS-1073: ./ ivy/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/j...
Date Thu, 12 May 2011 23:34:51 GMT
Author: todd
Date: Thu May 12 23:34:50 2011
New Revision: 1102509

URL: http://svn.apache.org/viewvc?rev=1102509&view=rev
Log:
Merge trunk into HDFS-1073.

Added:
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
      - copied unchanged from r1102504, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/fs/viewfs/
      - copied from r1102504, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/viewfs/
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
      - copied unchanged from r1102504, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java
      - copied unchanged from r1102504, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/fs/viewfs/TestViewFsHdfs.java
      - copied unchanged from r1102504, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/viewfs/TestViewFsHdfs.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
      - copied, changed from r1102504, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
    hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/decommission.jsp
      - copied unchanged from r1102504, hadoop/hdfs/trunk/src/webapps/hdfs/decommission.jsp
    hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/decommission.xsl
      - copied unchanged from r1102504, hadoop/hdfs/trunk/src/webapps/hdfs/decommission.xsl
    hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/dfsclusterhealth.jsp
      - copied unchanged from r1102504, hadoop/hdfs/trunk/src/webapps/hdfs/dfsclusterhealth.jsp
    hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/dfsclusterhealth.xsl
      - copied unchanged from r1102504, hadoop/hdfs/trunk/src/webapps/hdfs/dfsclusterhealth.xsl
    hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/dfsclusterhealth_utils.xsl
      - copied unchanged from r1102504, hadoop/hdfs/trunk/src/webapps/hdfs/dfsclusterhealth_utils.xsl
Modified:
    hadoop/hdfs/branches/HDFS-1073/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1073/build.xml   (contents, props changed)
    hadoop/hdfs/branches/HDFS-1073/ivy/ivysettings.xml
    hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/build.xml
    hadoop/hdfs/branches/HDFS-1073/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
    hadoop/hdfs/branches/HDFS-1073/src/java/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/LeaseRenewer.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
    hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
    hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/fi/Pipeline.java
    hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/fi/PipelineTest.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestLease.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
    hadoop/hdfs/branches/HDFS-1073/src/webapps/datanode/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/branches/HDFS-1073/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu May 12 23:34:50 2011
@@ -2,4 +2,4 @@
 /hadoop/hdfs/branches/HDFS-1052:987665-1095512
 /hadoop/hdfs/branches/HDFS-265:796829-820463
 /hadoop/hdfs/branches/branch-0.21:820487
-/hadoop/hdfs/trunk:1086482-1100841
+/hadoop/hdfs/trunk:1086482-1102504

Modified: hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/CHANGES.txt?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1073/CHANGES.txt Thu May 12 23:34:50 2011
@@ -270,6 +270,12 @@ Trunk (unreleased changes)
     
     HDFS-1751. Intrinsic limits for HDFS files, directories (daryn via boryas).
 
+    HDFS-1873. Federation: Add cluster management web console.
+    (Tanping Wang via suresh)
+
+    HDFS 1911 HDFS tests for the newly added viewfs
+
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
@@ -386,6 +392,18 @@ Trunk (unreleased changes)
     HDFS-1866. Document dfs.datanode.max.transfer.threads in hdfs-default.xml
     (Harsh J Chouraria via todd)
 
+    HDFS-1890. Improve the name, class and value type of the map
+    LeaseRenewer.pendingCreates.  (szetszwo)
+
+    HDFS-1865. Share LeaseRenewer among DFSClients so that there is only a
+    LeaseRenewer thread per namenode per user.  (szetszwo)
+
+    HDFS-1906. Remove logging exception stack trace in client logs when one of
+    the datanode targets to read from is not reachable. (suresh)
+
+    HDFS-1378. Edit log replay should track and report file offsets in case of
+    errors. (Aaron T. Myers and Todd Lipcon via todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -393,6 +411,9 @@ Trunk (unreleased changes)
 
     HDFS-1601. Pipeline ACKs are sent as lots of tiny TCP packets (todd)
 
+    HDFS-1826. NameNode should save image to name directories in parallel
+    during upgrade. (Matt Foley via hairong)
+
   BUG FIXES
 
     HDFS-1449. Fix test failures - ExtendedBlock must return 
@@ -499,15 +520,29 @@ Trunk (unreleased changes)
 
     HDFS-1889. incorrect path in start/stop dfs script. (John George via eli)
 
-    HDFS-1890. Improve the name, class and value type of the map
-    LeaseRenewer.pendingCreates.  (szetszwo)
-
     HDFS-1891. Disable IPV6 for junit tests to fix TestBackupNode failure.
     (suresh)
 
     HDFS-1898. Tests failing on trunk due to use of NameNode.format.
     (todd via eli)
 
+    HDFS-1902. Fix setrep path display for TestHDFSCLI.  (Daryn Sharp
+    via szetszwo)
+
+    HDFS-1827. Fix timeout problem in TestBlockReplacement.  (Matt Foley
+    via szetszwo)
+
+    HDFS-1908. Fix a NullPointerException in fi.DataTransferTestUtil.
+    (szetszwo)
+
+    HDFS-1912. Update tests for FsShell standardized error messages.
+    (Daryn Sharp via szetszwo)
+
+    HDFS-1903. Fix path display for rm/rmr in TestHDFSCLI and TestDFSShell.
+    (Daryn Sharp via szetszwo)
+
+    HDFS-1627. Fix NullPointerException in Secondary NameNode. (hairong)
+
 Release 0.22.0 - Unreleased
 
   NEW FEATURES
@@ -900,6 +935,18 @@ Release 0.22.0 - Unreleased
     HDFS-671. Documentation change for updated configuration keys. 
     (tomwhite via eli)
 
+    HDFS-1544. Ivy resolve force mode should be turned off by default.
+    (Luke Lu via tomwhite)
+
+    HDFS-1615. seek() on closed DFS input stream throws NullPointerException
+    (Scott Carey via todd)
+
+    HDFS-1897. Documentation refers to removed option dfs.network.script
+    (Andrew Whang via todd)
+
+    HDFS-1621. Fix references to hadoop-common-${version} in build.xml
+    (Jolly Chen via todd)
+
 Release 0.21.1 - Unreleased
     HDFS-1466. TestFcHdfsSymlink relies on /tmp/test not existing. (eli)
 

Modified: hadoop/hdfs/branches/HDFS-1073/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/build.xml?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/build.xml Thu May 12 23:34:50 2011
@@ -1145,8 +1145,8 @@
     <copy todir="${system-test-build-dir}/${final.name}/conf">
       <fileset dir="${test.src.dir}/system/conf/"/>
     </copy>
-    <copy tofile="${system-test-build-dir}/${final.name}/lib/hadoop-common-${version}.jar"
-      file="${system-test-build-dir}/ivy/lib/${ant.project.name}/system/hadoop-common-${herriot.suffix}-${version}.jar"
+    <copy tofile="${system-test-build-dir}/${final.name}/lib/hadoop-common-${hadoop-common.version}.jar"
+      file="${system-test-build-dir}/ivy/lib/${ant.project.name}/system/hadoop-common-${herriot.suffix}-${hadoop-common.version}.jar"
       overwrite="true"/>
     <copy tofile="${system-test-build-dir}/${final.name}/${final.name}.jar"
       file="${system-test-build-dir}/${instrumented.final.name}.jar" overwrite="true"/>

Propchange: hadoop/hdfs/branches/HDFS-1073/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu May 12 23:34:50 2011
@@ -3,4 +3,4 @@
 /hadoop/hdfs/branches/HDFS-1052/build.xml:987665-1095512
 /hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
 /hadoop/hdfs/branches/branch-0.21/build.xml:820487
-/hadoop/hdfs/trunk/build.xml:1086482-1100841
+/hadoop/hdfs/trunk/build.xml:1086482-1102504

Modified: hadoop/hdfs/branches/HDFS-1073/ivy/ivysettings.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/ivy/ivysettings.xml?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/ivy/ivysettings.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/ivy/ivysettings.xml Thu May 12 23:34:50 2011
@@ -35,6 +35,7 @@
   <property name="repo.dir" value="${user.home}/.m2/repository"/>
   <property name="maven2.pattern.ext"  value="${maven2.pattern}.[ext]"/>
   <property name="resolvers" value="default" override="false"/>
+  <property name="force-resolve" value="false" override="false"/>
   <settings defaultResolver="${resolvers}"/>
 
   <resolvers>
@@ -42,7 +43,7 @@
     <ibiblio name="apache-snapshot" root="${snapshot.apache.org}" m2compatible="true"
         checkmodified="true" changingPattern=".*SNAPSHOT"/>
 
-    <filesystem name="fs" m2compatible="true" force="true">
+    <filesystem name="fs" m2compatible="true" force="${force-resolve}">
        <artifact pattern="${repo.dir}/[organisation]/[module]/[revision]/[module]-[revision].[ext]"/>
        <ivy pattern="${repo.dir}/[organisation]/[module]/[revision]/[module]-[revision].pom"/>
     </filesystem>

Propchange: hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu May 12 23:34:50 2011
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
 /hadoop/core/trunk/src/c++/libhdfs:776175-784663
 /hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs:987665-1095512
-/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1100841
+/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1102504

Propchange: hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu May 12 23:34:50 2011
@@ -3,4 +3,4 @@
 /hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy:987665-1095512
 /hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1100841
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1102504

Modified: hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/build.xml?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/build.xml Thu May 12 23:34:50 2011
@@ -126,7 +126,7 @@
               <include name="slf4j-log4j12-${slf4j-log4j12.version}.jar"/>
               <include name="xmlenc-${xmlenc.version}.jar"/>
               <include name="core-${core.vesion}.jar"/> 
-	      <include name="hadoop-common-${hadoop-version}.jar"/>
+	      <include name="hadoop-common-${hadoop-common.version}.jar"/>
 	    </lib>
 	    <classes dir="${proxy.conf.dir}">
 	    	<include name="hdfsproxy-default.xml"/>
@@ -153,7 +153,7 @@
         <include name="core-${core.vesion}.jar"/> 
 	    </lib>
 	    <lib dir="${hadoop.root}/lib">
-		<include name="hadoop-common-${hadoop-version}.jar"/>
+	        <include name="hadoop-common-${hadoop-common.version}.jar"/>
 	    </lib>
 	    <classes dir="${proxy.conf.dir}">
 	    	<include name="hdfsproxy-default.xml"/>
@@ -179,7 +179,7 @@
               <include name="slf4j-log4j12-${slf4j-log4j12.version}.jar"/>
               <include name="xmlenc-${xmlenc.version}.jar"/>
               <include name="core-${core.vesion}.jar"/> 
-	      <include name="hadoop-common-${hadoop-version}.jar"/>
+	      <include name="hadoop-common-${hadoop-common.version}.jar"/>
 	    </lib>
 	    <classes dir="${proxy.conf.test}" excludes="**/*.template **/*.sh"/>
 	    <classes dir="${build.classes}"/>

Modified: hadoop/hdfs/branches/HDFS-1073/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml Thu May 12 23:34:50 2011
@@ -480,7 +480,7 @@
       addition NameNode tries to place replicas of block on
       multiple racks for improved fault tolerance. Hadoop lets the
       cluster administrators decide which rack a node belongs to
-      through configuration variable <code>dfs.network.script</code>. When this
+      through configuration variable <code>net.topology.script.file.name</code>. When this
       script is configured, each node runs the script to determine its
       rack id. A default installation assumes all the nodes belong to
       the same rack. This feature and configuration is further described

Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu May 12 23:34:50 2011
@@ -3,4 +3,4 @@
 /hadoop/hdfs/branches/HDFS-1052/src/java:987665-1095512
 /hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java:820487
-/hadoop/hdfs/trunk/src/java:1086482-1100841
+/hadoop/hdfs/trunk/src/java:1086482-1102504

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu May 12 23:34:50 2011
@@ -136,9 +136,14 @@ public class DFSClient implements FSCons
   final LeaseRenewer leaserenewer;
 
   /**
-   * The locking hierarchy is to first acquire lock on DFSClient object, followed by 
-   * lock on leasechecker, followed by lock on an individual DFSOutputStream.
+   * A map from file names to {@link DFSOutputStream} objects
+   * that are currently being written by this client.
+   * Note that a file can only be written by a single client.
    */
+  private final Map<String, DFSOutputStream> filesBeingWritten
+      = new HashMap<String, DFSOutputStream>();
+
+  /** Create a {@link NameNode} proxy */
   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
     return createNamenode(NameNode.getAddress(conf), conf);
   }
@@ -249,13 +254,14 @@ public class DFSClient implements FSCons
 
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
-    this.leaserenewer = new LeaseRenewer(this, hdfsTimeout);
-
     this.ugi = UserGroupInformation.getCurrentUser();
+    final String authority = nameNodeAddr == null? "null":
+        nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
+    this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
     
     String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
-    this.clientName = "DFSClient_" + taskId + "_" +
-                      r.nextInt() + "_" + Thread.currentThread().getId(); 
+    this.clientName = leaserenewer.getClientName(taskId);
+
     defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
     defaultReplication = (short) 
       conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
@@ -309,19 +315,77 @@ public class DFSClient implements FSCons
     }
   }
 
+  /** Put a file. */
+  void putFileBeingWritten(final String src, final DFSOutputStream out) {
+    synchronized(filesBeingWritten) {
+      filesBeingWritten.put(src, out);
+    }
+  }
+
+  /** Remove a file. */
+  void removeFileBeingWritten(final String src) {
+    synchronized(filesBeingWritten) {
+      filesBeingWritten.remove(src);
+    }
+  }
+
+  /** Is file-being-written map empty? */
+  boolean isFilesBeingWrittenEmpty() {
+    synchronized(filesBeingWritten) {
+      return filesBeingWritten.isEmpty();
+    }
+  }
+
+  /** Renew leases */
+  void renewLease() throws IOException {
+    if (clientRunning && !isFilesBeingWrittenEmpty()) {
+      namenode.renewLease(clientName);
+    }
+  }
+
+  /** Abort and release resources held.  Ignore all errors. */
+  void abort() {
+    clientRunning = false;
+    closeAllFilesBeingWritten(true);
+    RPC.stopProxy(rpcNamenode); // close connections to the namenode
+  }
+
+  /** Close/abort all files being written. */
+  private void closeAllFilesBeingWritten(final boolean abort) {
+    for(;;) {
+      final String src;
+      final DFSOutputStream out;
+      synchronized(filesBeingWritten) {
+        if (filesBeingWritten.isEmpty()) {
+          return;
+        }
+        src = filesBeingWritten.keySet().iterator().next();
+        out = filesBeingWritten.remove(src);
+      }
+      if (out != null) {
+        try {
+          if (abort) {
+            out.abort();
+          } else {
+            out.close();
+          }
+        } catch(IOException ie) {
+          LOG.error("Failed to " + (abort? "abort": "close") + " file " + src,
+              ie);
+        }
+      }
+    }
+  }
+
   /**
    * Close the file system, abandoning all of the leases and files being
    * created and close connections to the namenode.
    */
   public synchronized void close() throws IOException {
     if(clientRunning) {
-      leaserenewer.close();
+      closeAllFilesBeingWritten(false);
       clientRunning = false;
-      try {
-        leaserenewer.interruptAndJoin();
-      } catch (InterruptedException ie) {
-      }
-  
+      leaserenewer.closeClient(this);
       // close connections to the namenode
       RPC.stopProxy(rpcNamenode);
     }
@@ -633,7 +697,7 @@ public class DFSClient implements FSCons
         flag, createParent, replication, blockSize, progress, buffersize,
         conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
                     DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
-    leaserenewer.put(src, result);
+    leaserenewer.put(src, result, this);
     return result;
   }
   
@@ -680,7 +744,7 @@ public class DFSClient implements FSCons
           flag, createParent, replication, blockSize, progress, buffersize,
           bytesPerChecksum);
     }
-    leaserenewer.put(src, result);
+    leaserenewer.put(src, result, this);
     return result;
   }
   
@@ -759,7 +823,7 @@ public class DFSClient implements FSCons
           + src + " on client " + clientName);
     }
     final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
-    leaserenewer.put(src, result);
+    leaserenewer.put(src, result, this);
     return result;
   }
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu May 12 23:34:50 2011
@@ -410,8 +410,11 @@ public class DFSInputStream extends FSIn
           refetchToken--;
           fetchBlockAt(target);
         } else {
-          DFSClient.LOG.info("Failed to connect to " + targetAddr
-              + ", add to deadNodes and continue", ex);
+          DFSClient.LOG.warn("Failed to connect to " + targetAddr
+              + ", add to deadNodes and continue " + ex);
+          if (DFSClient.LOG.isDebugEnabled()) {
+            DFSClient.LOG.debug("Connection failure ", ex);
+          }
           // Put chosen node into dead list, continue
           addToDeadNodes(chosenNode);
         }
@@ -653,9 +656,11 @@ public class DFSInputStream extends FSIn
           fetchBlockAt(block.getStartOffset());
           continue;
         } else {
-          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for file " + src
-              + " for block " + block.getBlock() + ":"
-              + StringUtils.stringifyException(e));
+          DFSClient.LOG.warn("Failed to connect to " + targetAddr + 
+              " for file " + src + " for block " + block.getBlock() + ":" + e);
+          if (DFSClient.LOG.isDebugEnabled()) {
+            DFSClient.LOG.debug("Connection failure ", e);
+          }
         }
       } finally {
         IOUtils.closeStream(reader);
@@ -736,6 +741,9 @@ public class DFSInputStream extends FSIn
     if (targetPos > getFileLength()) {
       throw new IOException("Cannot seek after EOF");
     }
+    if (closed) {
+      throw new IOException("Stream is closed!");
+    }
     boolean done = false;
     if (pos <= targetPos && targetPos <= blockEnd) {
       //

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu May 12 23:34:50 2011
@@ -1639,7 +1639,7 @@ class DFSOutputStream extends FSOutputSu
       ExtendedBlock lastBlock = streamer.getBlock();
       closeThreads(false);
       completeFile(lastBlock);
-      dfsClient.leaserenewer.remove(src);
+      dfsClient.leaserenewer.closeFile(src, dfsClient);
     } finally {
       closed = true;
     }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/LeaseRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/LeaseRenewer.java?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/LeaseRenewer.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/LeaseRenewer.java Thu May 12 23:34:50 2011
@@ -18,33 +18,150 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 
-public class LeaseRenewer {
+/**
+ * <p>
+ * Used by {@link DFSClient} for renewing file-being-written leases
+ * on the namenode.
+ * When a file is opened for write (create or append),
+ * namenode stores a file lease for recording the identity of the writer.
+ * The writer (i.e. the DFSClient) is required to renew the lease periodically.
+ * When the lease is not renewed before it expires,
+ * the namenode considers the writer as failed and then it may either let
+ * another writer to obtain the lease or close the file.
+ * </p>
+ * <p>
+ * This class also provides the following functionality:
+ * <ul>
+ * <li>
+ * It maintains a map from (namenode, user) pairs to lease renewers. 
+ * The same {@link LeaseRenewer} instance is used for renewing lease
+ * for all the {@link DFSClient} to the same namenode and the same user.
+ * </li>
+ * <li>
+ * Each renewer maintains a list of {@link DFSClient}.
+ * Periodically the leases for all the clients are renewed.
+ * A client is removed from the list when the client is closed.
+ * </li>
+ * <li>
+ * A thread per namenode per user is used by the {@link LeaseRenewer}
+ * to renew the leases.
+ * </li>
+ * </ul>
+ * </p>
+ */
+class LeaseRenewer {
   private static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
 
   static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
   static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
-  /** A map from src -> DFSOutputStream of files that are currently being
-   * written by this client.
+
+  /** Get a {@link LeaseRenewer} instance */
+  static LeaseRenewer getInstance(final String authority,
+      final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
+    return Factory.INSTANCE.get(authority, ugi, dfsc);
+  }
+
+  /** 
+   * A factory for sharing {@link LeaseRenewer} objects
+   * among {@link DFSClient} instances
+   * so that there is only one renewer per authority per user.
    */
-  private final Map<String, DFSOutputStream> filesBeingWritten
-      = new TreeMap<String, DFSOutputStream>();
+  private static class Factory {
+    private static final Factory INSTANCE = new Factory();
+
+    private static class Key {
+      /** Namenode info */
+      final String authority;
+      /** User info */
+      final UserGroupInformation ugi;
+
+      private Key(final String authority, final UserGroupInformation ugi) {
+        if (authority == null) {
+          throw new HadoopIllegalArgumentException("authority == null");
+        } else if (ugi == null) {
+          throw new HadoopIllegalArgumentException("ugi == null");
+        }
+
+        this.authority = authority;
+        this.ugi = ugi;
+      }
+
+      @Override
+      public int hashCode() {
+        return authority.hashCode() ^ ugi.hashCode();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        if (obj == this) {
+          return true;
+        }
+        if (obj != null && obj instanceof Key) {
+          final Key that = (Key)obj;
+          return this.authority.equals(that.authority)
+                 && this.ugi.equals(that.ugi);
+        }
+        return false;        
+      }
+
+      @Override
+      public String toString() {
+        return ugi.getShortUserName() + "@" + authority;
+      }
+    }
+
+    /** A map for per user per namenode renewers. */
+    private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>();
+
+    /** Get a renewer. */
+    private synchronized LeaseRenewer get(final String authority,
+        final UserGroupInformation ugi, final DFSClient dfsc) {
+      final Key k = new Key(authority, ugi);
+      LeaseRenewer r = renewers.get(k);
+      if (r == null) {
+        r = new LeaseRenewer(k);
+        renewers.put(k, r);
+      }
+      r.addClient(dfsc);
+      return r;
+    }
+
+    /** Remove the given renewer. */
+    private synchronized void remove(final LeaseRenewer r) {
+      final LeaseRenewer stored = renewers.get(r.factorykey);
+      //Since a renewer may expire, the stored renewer can be different.
+      if (r == stored) {
+        if (!r.clientsRunning()) {
+          renewers.remove(r.factorykey);
+        }
+      }
+    }
+  }
+
+  private final String clienNamePostfix = DFSClient.r.nextInt()
+      + "_" + Thread.currentThread().getId();
+
   /** The time in milliseconds that the map became empty. */
   private long emptyTime = Long.MAX_VALUE;
   /** A fixed lease renewal time period in milliseconds */
-  private final long renewal;
+  private long renewal = FSConstants.LEASE_SOFTLIMIT_PERIOD/2;
 
   /** A daemon for renewing lease */
   private Daemon daemon = null;
@@ -54,7 +171,8 @@ public class LeaseRenewer {
   /** 
    * A period in milliseconds that the lease renewer thread should run
    * after the map became empty.
-   * If the map is empty for a time period longer than the grace period,
+   * In other words,
+   * if the map is empty for a time period longer than the grace period,
    * the renewer should terminate.  
    */
   private long gracePeriod;
@@ -62,26 +180,68 @@ public class LeaseRenewer {
    * The time period in milliseconds
    * that the renewer sleeps for each iteration. 
    */
-  private volatile long sleepPeriod;
+  private long sleepPeriod;
+
+  private final Factory.Key factorykey;
 
-  private final DFSClient dfsclient;
+  /** A list of clients corresponding to this renewer. */
+  private final List<DFSClient> dfsclients = new ArrayList<DFSClient>();
 
-  LeaseRenewer(final DFSClient dfsclient, final long timeout) {
-    this.dfsclient = dfsclient;
-    this.renewal = (timeout > 0 && timeout < FSConstants.LEASE_SOFTLIMIT_PERIOD)? 
-        timeout/2: FSConstants.LEASE_SOFTLIMIT_PERIOD/2;
+  private LeaseRenewer(Factory.Key factorykey) {
+    this.factorykey = factorykey;
     setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
   }
 
+  /** @return the renewal time in milliseconds. */
+  private synchronized long getRenewalTime() {
+    return renewal;
+  }
+
+  /** @return the client name for the given id. */
+  String getClientName(final String id) {
+    return "DFSClient_" + id + "_" + clienNamePostfix;
+  }
+
+  /** Add a client. */
+  private synchronized void addClient(final DFSClient dfsc) {
+    for(DFSClient c : dfsclients) {
+      if (c == dfsc) {
+        //client already exists, nothing to do.
+        return;
+      }
+    }
+    //client not found, add it
+    dfsclients.add(dfsc);
+
+    //update renewal time
+    if (dfsc.hdfsTimeout > 0) {
+      final long half = dfsc.hdfsTimeout/2;
+      if (half < renewal) {
+        this.renewal = half;
+      }
+    }
+  }
+
+  private synchronized boolean clientsRunning() {
+    for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
+      if (!i.next().clientRunning) {
+        i.remove();
+      }
+    }
+    return !dfsclients.isEmpty();
+  }
+
+  private synchronized long getSleepPeriod() {
+    return sleepPeriod;    
+  }
+
   /** Set the grace period and adjust the sleep period accordingly. */
-  void setGraceSleepPeriod(final long gracePeriod) {
+  synchronized void setGraceSleepPeriod(final long gracePeriod) {
     if (gracePeriod < 100L) {
       throw new HadoopIllegalArgumentException(gracePeriod
           + " = gracePeriod < 100ms is too small.");
     }
-    synchronized(this) {
-      this.gracePeriod = gracePeriod;
-    }
+    this.gracePeriod = gracePeriod;
     final long half = gracePeriod/2;
     this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
         half: LEASE_RENEWER_SLEEP_DEFAULT;
@@ -98,9 +258,10 @@ public class LeaseRenewer {
         && System.currentTimeMillis() - emptyTime > gracePeriod;
   }
 
-  synchronized void put(String src, DFSOutputStream out) {
-    if (dfsclient.clientRunning) {
-      if (daemon == null || isRenewerExpired()) {
+  synchronized void put(final String src, final DFSOutputStream out,
+      final DFSClient dfsc) {
+    if (dfsc.clientRunning) {
+      if (!isRunning() || isRenewerExpired()) {
         //start a new deamon with a new id.
         final int id = ++currentId;
         daemon = new Daemon(new Runnable() {
@@ -113,24 +274,68 @@ public class LeaseRenewer {
                 LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
                     + " is interrupted.", e);
               }
+            } finally {
+              synchronized(LeaseRenewer.this) {
+                Factory.INSTANCE.remove(LeaseRenewer.this);
+              }
             }
           }
         });
         daemon.start();
       }
-      filesBeingWritten.put(src, out);
+      dfsc.putFileBeingWritten(src, out);
       emptyTime = Long.MAX_VALUE;
     }
   }
-  
-  synchronized void remove(String src) {
-    filesBeingWritten.remove(src);
-    if (filesBeingWritten.isEmpty() && emptyTime == Long.MAX_VALUE) {
-      //discover the first time that the map is empty.
-      emptyTime = System.currentTimeMillis();
+
+  /** Close a file. */
+  void closeFile(final String src, final DFSClient dfsc) {
+    dfsc.removeFileBeingWritten(src);
+
+    synchronized(this) {
+      //update emptyTime if necessary
+      if (emptyTime == Long.MAX_VALUE) {
+        for(DFSClient c : dfsclients) {
+          if (!c.isFilesBeingWrittenEmpty()) {
+            //found a non-empty file-being-written map
+            return;
+          }
+        }
+        //discover the first time that all file-being-written maps are empty.
+        emptyTime = System.currentTimeMillis();
+      }
+    }
+  }
+
+  /** Close the given client. */
+  synchronized void closeClient(final DFSClient dfsc) {
+    dfsclients.remove(dfsc);
+    if (dfsclients.isEmpty()) {
+      if (!isRunning() || isRenewerExpired()) {
+        Factory.INSTANCE.remove(LeaseRenewer.this);
+        return;
+      }
+      if (emptyTime == Long.MAX_VALUE) {
+        //discover the first time that the client list is empty.
+        emptyTime = System.currentTimeMillis();
+      }
+    }
+
+    //update renewal time
+    if (renewal == dfsc.hdfsTimeout/2) {
+      long min = FSConstants.LEASE_SOFTLIMIT_PERIOD;
+      for(DFSClient c : dfsclients) {
+        if (c.hdfsTimeout > 0) {
+          final long half = c.hdfsTimeout;
+          if (half < min) {
+            min = half;
+          }
+        }
+      }
+      renewal = min/2;
     }
   }
-  
+
   void interruptAndJoin() throws InterruptedException {
     Daemon daemonCopy = null;
     synchronized (this) {
@@ -148,53 +353,27 @@ public class LeaseRenewer {
     }
   }
 
-  void close() {
-    while (true) {
-      String src;
-      OutputStream out;
-      synchronized (this) {
-        if (filesBeingWritten.isEmpty()) {
-          return;
-        }
-        src = filesBeingWritten.keySet().iterator().next();
-        out = filesBeingWritten.remove(src);
-      }
-      if (out != null) {
-        try {
-          out.close();
-        } catch (IOException ie) {
-          LOG.error("Exception closing file " + src+ " : " + ie, ie);
-        }
-      }
-    }
-  }
-
-  /**
-   * Abort all open files. Release resources held. Ignore all errors.
-   */
-  synchronized void abort() {
-    dfsclient.clientRunning = false;
-    for(Map.Entry<String, DFSOutputStream> e : filesBeingWritten.entrySet()) {
-      final DFSOutputStream out = e.getValue();
-      if (out != null) {
-        try {
-          out.abort();
-        } catch (IOException ie) {
-          LOG.error("Failed to abort file " + e.getKey(), ie);
-        }
-      }
-    }
-    filesBeingWritten.clear();
-    RPC.stopProxy(dfsclient.rpcNamenode); // close connections to the namenode
-  }
-
   private void renew() throws IOException {
+    final List<DFSClient> copies;
     synchronized(this) {
-      if (filesBeingWritten.isEmpty()) {
-        return;
+      copies = new ArrayList<DFSClient>(dfsclients);
+    }
+    //sort the client names for finding out repeated names.
+    Collections.sort(copies, new Comparator<DFSClient>() {
+      @Override
+      public int compare(final DFSClient left, final DFSClient right) {
+        return left.clientName.compareTo(right.clientName);
+      }
+    });
+    String previousName = "";
+    for(int i = 0; i < copies.size(); i++) {
+      final DFSClient c = copies.get(i);
+      //skip if current client name is the same as the previous name.
+      if (!c.clientName.equals(previousName)) {
+        c.renewLease();
+        previousName = c.clientName;
       }
     }
-    dfsclient.namenode.renewLease(dfsclient.clientName);
   }
 
   /**
@@ -203,20 +382,25 @@ public class LeaseRenewer {
    */
   private void run(final int id) throws InterruptedException {
     for(long lastRenewed = System.currentTimeMillis();
-        dfsclient.clientRunning && !Thread.interrupted();
-        Thread.sleep(sleepPeriod)) {
-      if (System.currentTimeMillis() - lastRenewed >= renewal) {
+        clientsRunning() && !Thread.interrupted();
+        Thread.sleep(getSleepPeriod())) {
+      if (System.currentTimeMillis() - lastRenewed >= getRenewalTime()) {
         try {
           renew();
           lastRenewed = System.currentTimeMillis();
         } catch (SocketTimeoutException ie) {
-          LOG.warn("Failed to renew lease for " + dfsclient.clientName + " for "
-              + (renewal/1000) + " seconds.  Aborting ...", ie);
-          abort();
+          LOG.warn("Failed to renew lease for " + clientsString() + " for "
+              + (getRenewalTime()/1000) + " seconds.  Aborting ...", ie);
+          synchronized (this) {
+            for(DFSClient c : dfsclients) {
+              c.abort();
+            }
+          }
           break;
         } catch (IOException ie) {
-          LOG.warn("Failed to renew lease for " + dfsclient.clientName + " for "
-              + (renewal/1000) + " seconds.  Will retry shortly ...", ie);
+          LOG.warn("Failed to renew lease for " + clientsString() + " for "
+              + (getRenewalTime()/1000) + " seconds.  Will retry shortly ...",
+              ie);
         }
       }
 
@@ -229,13 +413,27 @@ public class LeaseRenewer {
     }
   }
 
-  /** {@inheritDoc} */
+  @Override
   public String toString() {
-    String s = getClass().getSimpleName();
+    String s = getClass().getSimpleName() + ":" + factorykey;
     if (LOG.isTraceEnabled()) {
-      return s + "@" + dfsclient + ": "
+      return s + ", clients=" +  clientsString() + ", "
              + StringUtils.stringifyException(new Throwable("for testing"));
     }
     return s;
   }
+
+  /** Get the names of all clients */
+  private synchronized String clientsString() {
+    if (dfsclients.isEmpty()) {
+      return "[]";
+    } else {
+      final StringBuilder b = new StringBuilder("[").append(
+          dfsclients.get(0).clientName);
+      for(int i = 1; i < dfsclients.size(); i++) {
+        b.append(", ").append(dfsclients.get(i).clientName);
+      }
+      return b.append("]").toString();
+    }
+  }
 }

Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu May 12 23:34:50 2011
@@ -5,4 +5,4 @@
 /hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:987665-1095512
 /hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:1086482-1100841
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:1086482-1102504

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu May 12 23:34:50 2011
@@ -22,7 +22,10 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.EOFException;
+import java.io.FilterInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
@@ -144,394 +147,421 @@ public class FSEditLogLoader {
         numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0, 
         numOpUpdateMasterKey = 0, numOpOther = 0;
 
+    // Keep track of the file offsets of the last several opcodes.
+    // This is handy when manually recovering corrupted edits files.
+    PositionTrackingInputStream tracker = new PositionTrackingInputStream(in);
+    in = new DataInputStream(tracker);
+    long recentOpcodeOffsets[] = new long[4];
+    Arrays.fill(recentOpcodeOffsets, -1);
+
     try {
       long txId = expectedStartingTxId - 1;
 
-      while (true) {
-        long timestamp = 0;
-        long mtime = 0;
-        long atime = 0;
-        long blockSize = 0;
-        FSEditLogOpCodes opCode;
-        try {
-          if (checksum != null) {
-            checksum.reset();
-          }
-          in.mark(1);
-          byte opCodeByte = in.readByte();
-          opCode = FSEditLogOpCodes.fromByte(opCodeByte);
-          if (opCode == FSEditLogOpCodes.OP_INVALID) {
-            in.reset(); // reset back to end of file if somebody reads it again
+      try {
+        while (true) {
+          long timestamp = 0;
+          long mtime = 0;
+          long atime = 0;
+          long blockSize = 0;
+          FSEditLogOpCodes opCode;
+          try {
+            if (checksum != null) {
+              checksum.reset();
+            }
+            in.mark(1);
+            byte opCodeByte = in.readByte();
+            opCode = FSEditLogOpCodes.fromByte(opCodeByte);
+            if (opCode == FSEditLogOpCodes.OP_INVALID) {
+              in.reset(); // reset back to end of file if somebody reads it again
+              break; // no more transactions
+            }
+          } catch (EOFException e) {
             break; // no more transactions
           }
-        } catch (EOFException e) {
-          break; // no more transactions
-        }
-
-        if (logVersion <= FSConstants.FIRST_STORED_TXIDS_VERSION) {
-          // Read the txid
-          long thisTxId = in.readLong();
-          if (thisTxId != txId + 1) {
-            throw new IOException("Expected transaction ID " +
-                (txId + 1) + " but got " + thisTxId);
-          }
-          txId = thisTxId;
-        }
-        
-        numEdits++;
-        switch (opCode) {
-        case OP_ADD:
-        case OP_CLOSE: {
-          // versions > 0 support per file replication
-          // get name and replication
-          int length = in.readInt();
-          if (-7 == logVersion && length != 3||
-              -17 < logVersion && logVersion < -7 && length != 4 ||
-              logVersion <= -17 && length != 5) {
-              throw new IOException("Incorrect data format."  +
-                                    " logVersion is " + logVersion +
-                                    " but writables.length is " +
-                                    length + ". ");
-          }
-          path = FSImageSerialization.readString(in);
-          short replication = fsNamesys.adjustReplication(readShort(in));
-          mtime = readLong(in);
-          if (logVersion <= -17) {
-            atime = readLong(in);
-          }
-          if (logVersion < -7) {
-            blockSize = readLong(in);
+          recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
+              tracker.getPos();
+          if (logVersion <= FSConstants.FIRST_STORED_TXIDS_VERSION) {
+            // Read the txid
+            long thisTxId = in.readLong();
+            if (thisTxId != txId + 1) {
+              throw new IOException("Expected transaction ID " +
+                  (txId + 1) + " but got " + thisTxId);
+            }
+            txId = thisTxId;
           }
-          // get blocks
-          boolean isFileUnderConstruction = (opCode == FSEditLogOpCodes.OP_ADD);
-          BlockInfo blocks[] = 
-            readBlocks(in, logVersion, isFileUnderConstruction, replication);
-
-          // Older versions of HDFS does not store the block size in inode.
-          // If the file has more than one block, use the size of the
-          // first block as the blocksize. Otherwise use the default
-          // block size.
-          if (-8 <= logVersion && blockSize == 0) {
-            if (blocks.length > 1) {
-              blockSize = blocks[0].getNumBytes();
+
+          numEdits++;
+          switch (opCode) {
+          case OP_ADD:
+          case OP_CLOSE: {
+            // versions > 0 support per file replication
+            // get name and replication
+            int length = in.readInt();
+            if (-7 == logVersion && length != 3||
+                -17 < logVersion && logVersion < -7 && length != 4 ||
+                logVersion <= -17 && length != 5) {
+                throw new IOException("Incorrect data format."  +
+                                      " logVersion is " + logVersion +
+                                      " but writables.length is " +
+                                      length + ". ");
+            }
+            path = FSImageSerialization.readString(in);
+            short replication = fsNamesys.adjustReplication(readShort(in));
+            mtime = readLong(in);
+            if (logVersion <= -17) {
+              atime = readLong(in);
+            }
+            if (logVersion < -7) {
+              blockSize = readLong(in);
+            }
+            // get blocks
+            boolean isFileUnderConstruction = (opCode == FSEditLogOpCodes.OP_ADD);
+            BlockInfo blocks[] = 
+              readBlocks(in, logVersion, isFileUnderConstruction, replication);
+  
+            // Older versions of HDFS does not store the block size in inode.
+            // If the file has more than one block, use the size of the
+            // first block as the blocksize. Otherwise use the default
+            // block size.
+            if (-8 <= logVersion && blockSize == 0) {
+              if (blocks.length > 1) {
+                blockSize = blocks[0].getNumBytes();
+              } else {
+                long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
+                blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
+              }
+            }
+             
+            PermissionStatus permissions = fsNamesys.getUpgradePermission();
+            if (logVersion <= -11) {
+              permissions = PermissionStatus.read(in);
+            }
+  
+            // clientname, clientMachine and block locations of last block.
+            if (opCode == FSEditLogOpCodes.OP_ADD && logVersion <= -12) {
+              clientName = FSImageSerialization.readString(in);
+              clientMachine = FSImageSerialization.readString(in);
+              if (-13 <= logVersion) {
+                readDatanodeDescriptorArray(in);
+              }
             } else {
-              long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
-              blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
+              clientName = "";
+              clientMachine = "";
+            }
+  
+            // The open lease transaction re-creates a file if necessary.
+            // Delete the file if it already exists.
+            if (FSNamesystem.LOG.isDebugEnabled()) {
+              FSNamesystem.LOG.debug(opCode + ": " + path + 
+                                     " numblocks : " + blocks.length +
+                                     " clientHolder " +  clientName +
+                                     " clientMachine " + clientMachine);
+            }
+  
+            fsDir.unprotectedDelete(path, mtime);
+  
+            // add to the file tree
+            INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
+                                                      path, permissions,
+                                                      blocks, replication, 
+                                                      mtime, atime, blockSize);
+            if (isFileUnderConstruction) {
+              numOpAdd++;
+              //
+              // Replace current node with a INodeUnderConstruction.
+              // Recreate in-memory lease record.
+              //
+              INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                                        node.getLocalNameBytes(),
+                                        node.getReplication(), 
+                                        node.getModificationTime(),
+                                        node.getPreferredBlockSize(),
+                                        node.getBlocks(),
+                                        node.getPermissionStatus(),
+                                        clientName, 
+                                        clientMachine, 
+                                        null);
+              fsDir.replaceNode(path, node, cons);
+              fsNamesys.leaseManager.addLease(cons.getClientName(), path);
             }
+            break;
+          } 
+          case OP_SET_REPLICATION: {
+            numOpSetRepl++;
+            path = FSImageSerialization.readString(in);
+            short replication = fsNamesys.adjustReplication(readShort(in));
+            fsDir.unprotectedSetReplication(path, replication, null);
+            break;
+          } 
+          case OP_CONCAT_DELETE: {
+            if (logVersion > -22) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpConcatDelete++;
+            int length = in.readInt();
+            if (length < 3) { // trg, srcs.., timestam
+              throw new IOException("Incorrect data format. " 
+                                    + "Mkdir operation.");
+            }
+            String trg = FSImageSerialization.readString(in);
+            int srcSize = length - 1 - 1; //trg and timestamp
+            String [] srcs = new String [srcSize];
+            for(int i=0; i<srcSize;i++) {
+              srcs[i]= FSImageSerialization.readString(in);
+            }
+            timestamp = readLong(in);
+            fsDir.unprotectedConcat(trg, srcs);
+            break;
+          }
+          case OP_RENAME_OLD: {
+            numOpRenameOld++;
+            int length = in.readInt();
+            if (length != 3) {
+              throw new IOException("Incorrect data format. " 
+                                    + "Mkdir operation.");
+            }
+            String s = FSImageSerialization.readString(in);
+            String d = FSImageSerialization.readString(in);
+            timestamp = readLong(in);
+            HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
+            fsDir.unprotectedRenameTo(s, d, timestamp);
+            fsNamesys.changeLease(s, d, dinfo);
+            break;
+          }
+          case OP_DELETE: {
+            numOpDelete++;
+            int length = in.readInt();
+            if (length != 2) {
+              throw new IOException("Incorrect data format. " 
+                                    + "delete operation.");
+            }
+            path = FSImageSerialization.readString(in);
+            timestamp = readLong(in);
+            fsDir.unprotectedDelete(path, timestamp);
+            break;
+          }
+          case OP_MKDIR: {
+            numOpMkDir++;
+            PermissionStatus permissions = fsNamesys.getUpgradePermission();
+            int length = in.readInt();
+            if (-17 < logVersion && length != 2 ||
+                logVersion <= -17 && length != 3) {
+              throw new IOException("Incorrect data format. " 
+                                    + "Mkdir operation.");
+            }
+            path = FSImageSerialization.readString(in);
+            timestamp = readLong(in);
+  
+            // The disk format stores atimes for directories as well.
+            // However, currently this is not being updated/used because of
+            // performance reasons.
+            if (logVersion <= -17) {
+              atime = readLong(in);
+            }
+  
+            if (logVersion <= -11) {
+              permissions = PermissionStatus.read(in);
+            }
+            fsDir.unprotectedMkdir(path, permissions, timestamp);
+            break;
           }
-           
-          PermissionStatus permissions = fsNamesys.getUpgradePermission();
-          if (logVersion <= -11) {
-            permissions = PermissionStatus.read(in);
-          }
-
-          // clientname, clientMachine and block locations of last block.
-          if (opCode == FSEditLogOpCodes.OP_ADD && logVersion <= -12) {
-            clientName = FSImageSerialization.readString(in);
-            clientMachine = FSImageSerialization.readString(in);
-            if (-13 <= logVersion) {
-              readDatanodeDescriptorArray(in);
-            }
-          } else {
-            clientName = "";
-            clientMachine = "";
-          }
-
-          // The open lease transaction re-creates a file if necessary.
-          // Delete the file if it already exists.
-          if (FSNamesystem.LOG.isDebugEnabled()) {
-            FSNamesystem.LOG.debug(opCode + ": " + path + 
-                                   " numblocks : " + blocks.length +
-                                   " clientHolder " +  clientName +
-                                   " clientMachine " + clientMachine);
-          }
-
-          fsDir.unprotectedDelete(path, mtime);
-
-          // add to the file tree
-          INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
-                                                    path, permissions,
-                                                    blocks, replication, 
-                                                    mtime, atime, blockSize);
-          if (isFileUnderConstruction) {
-            numOpAdd++;
-            //
-            // Replace current node with a INodeUnderConstruction.
-            // Recreate in-memory lease record.
-            //
-            INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                      node.getLocalNameBytes(),
-                                      node.getReplication(), 
-                                      node.getModificationTime(),
-                                      node.getPreferredBlockSize(),
-                                      node.getBlocks(),
-                                      node.getPermissionStatus(),
-                                      clientName, 
-                                      clientMachine, 
-                                      null);
-            fsDir.replaceNode(path, node, cons);
-            fsNamesys.leaseManager.addLease(cons.getClientName(), path);
-          }
-          break;
-        } 
-        case OP_SET_REPLICATION: {
-          numOpSetRepl++;
-          path = FSImageSerialization.readString(in);
-          short replication = fsNamesys.adjustReplication(readShort(in));
-          fsDir.unprotectedSetReplication(path, replication, null);
-          break;
-        } 
-        case OP_CONCAT_DELETE: {
-          if (logVersion > -22) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpConcatDelete++;
-          int length = in.readInt();
-          if (length < 3) { // trg, srcs.., timestam
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          String trg = FSImageSerialization.readString(in);
-          int srcSize = length - 1 - 1; //trg and timestamp
-          String [] srcs = new String [srcSize];
-          for(int i=0; i<srcSize;i++) {
-            srcs[i]= FSImageSerialization.readString(in);
-          }
-          timestamp = readLong(in);
-          fsDir.unprotectedConcat(trg, srcs);
-          break;
-        }
-        case OP_RENAME_OLD: {
-          numOpRenameOld++;
-          int length = in.readInt();
-          if (length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          String s = FSImageSerialization.readString(in);
-          String d = FSImageSerialization.readString(in);
-          timestamp = readLong(in);
-          HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
-          fsDir.unprotectedRenameTo(s, d, timestamp);
-          fsNamesys.changeLease(s, d, dinfo);
-          break;
-        }
-        case OP_DELETE: {
-          numOpDelete++;
-          int length = in.readInt();
-          if (length != 2) {
-            throw new IOException("Incorrect data format. " 
-                                  + "delete operation.");
-          }
-          path = FSImageSerialization.readString(in);
-          timestamp = readLong(in);
-          fsDir.unprotectedDelete(path, timestamp);
-          break;
-        }
-        case OP_MKDIR: {
-          numOpMkDir++;
-          PermissionStatus permissions = fsNamesys.getUpgradePermission();
-          int length = in.readInt();
-          if (-17 < logVersion && length != 2 ||
-              logVersion <= -17 && length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          path = FSImageSerialization.readString(in);
-          timestamp = readLong(in);
-
-          // The disk format stores atimes for directories as well.
-          // However, currently this is not being updated/used because of
-          // performance reasons.
-          if (logVersion <= -17) {
+          case OP_SET_GENSTAMP: {
+            numOpSetGenStamp++;
+            long lw = in.readLong();
+            fsNamesys.setGenerationStamp(lw);
+            break;
+          } 
+          case OP_DATANODE_ADD: {
+            numOpOther++;
+            //Datanodes are not persistent any more.
+            FSImageSerialization.DatanodeImage.skipOne(in);
+            break;
+          }
+          case OP_DATANODE_REMOVE: {
+            numOpOther++;
+            DatanodeID nodeID = new DatanodeID();
+            nodeID.readFields(in);
+            //Datanodes are not persistent any more.
+            break;
+          }
+          case OP_SET_PERMISSIONS: {
+            numOpSetPerm++;
+            if (logVersion > -11)
+              throw new IOException("Unexpected opCode " + opCode
+                                    + " for version " + logVersion);
+            fsDir.unprotectedSetPermission(
+                FSImageSerialization.readString(in), FsPermission.read(in));
+            break;
+          }
+          case OP_SET_OWNER: {
+            numOpSetOwner++;
+            if (logVersion > -11)
+              throw new IOException("Unexpected opCode " + opCode
+                                    + " for version " + logVersion);
+            fsDir.unprotectedSetOwner(FSImageSerialization.readString(in),
+                FSImageSerialization.readString_EmptyAsNull(in),
+                FSImageSerialization.readString_EmptyAsNull(in));
+            break;
+          }
+          case OP_SET_NS_QUOTA: {
+            if (logVersion > -16) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), 
+                                      readLongWritable(in), 
+                                      FSConstants.QUOTA_DONT_SET);
+            break;
+          }
+          case OP_CLEAR_NS_QUOTA: {
+            if (logVersion > -16) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
+                                      FSConstants.QUOTA_RESET,
+                                      FSConstants.QUOTA_DONT_SET);
+            break;
+          }
+  
+          case OP_SET_QUOTA:
+            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
+                                      readLongWritable(in),
+                                      readLongWritable(in));
+                                        
+            break;
+  
+          case OP_TIMES: {
+            numOpTimes++;
+            int length = in.readInt();
+            if (length != 3) {
+              throw new IOException("Incorrect data format. " 
+                                    + "times operation.");
+            }
+            path = FSImageSerialization.readString(in);
+            mtime = readLong(in);
             atime = readLong(in);
+            fsDir.unprotectedSetTimes(path, mtime, atime, true);
+            break;
           }
-
-          if (logVersion <= -11) {
-            permissions = PermissionStatus.read(in);
+          case OP_SYMLINK: {
+            numOpSymlink++;
+            int length = in.readInt();
+            if (length != 4) {
+              throw new IOException("Incorrect data format. " 
+                                    + "symlink operation.");
+            }
+            path = FSImageSerialization.readString(in);
+            String value = FSImageSerialization.readString(in);
+            mtime = readLong(in);
+            atime = readLong(in);
+            PermissionStatus perm = PermissionStatus.read(in);
+            fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
+            break;
+          }
+          case OP_RENAME: {
+            if (logVersion > -21) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpRename++;
+            int length = in.readInt();
+            if (length != 3) {
+              throw new IOException("Incorrect data format. " 
+                                    + "Mkdir operation.");
+            }
+            String s = FSImageSerialization.readString(in);
+            String d = FSImageSerialization.readString(in);
+            timestamp = readLong(in);
+            Rename[] options = readRenameOptions(in);
+            HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
+            fsDir.unprotectedRenameTo(s, d, timestamp, options);
+            fsNamesys.changeLease(s, d, dinfo);
+            break;
+          }
+          case OP_GET_DELEGATION_TOKEN: {
+            if (logVersion > -24) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpGetDelegationToken++;
+            DelegationTokenIdentifier delegationTokenId = 
+                new DelegationTokenIdentifier();
+            delegationTokenId.readFields(in);
+            long expiryTime = readLong(in);
+            fsNamesys.getDelegationTokenSecretManager()
+                .addPersistedDelegationToken(delegationTokenId, expiryTime);
+            break;
+          }
+          case OP_RENEW_DELEGATION_TOKEN: {
+            if (logVersion > -24) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpRenewDelegationToken++;
+            DelegationTokenIdentifier delegationTokenId = 
+                new DelegationTokenIdentifier();
+            delegationTokenId.readFields(in);
+            long expiryTime = readLong(in);
+            fsNamesys.getDelegationTokenSecretManager()
+                .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
+            break;
+          }
+          case OP_CANCEL_DELEGATION_TOKEN: {
+            if (logVersion > -24) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpCancelDelegationToken++;
+            DelegationTokenIdentifier delegationTokenId = 
+                new DelegationTokenIdentifier();
+            delegationTokenId.readFields(in);
+            fsNamesys.getDelegationTokenSecretManager()
+                .updatePersistedTokenCancellation(delegationTokenId);
+            break;
+          }
+          case OP_UPDATE_MASTER_KEY: {
+            if (logVersion > -24) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpUpdateMasterKey++;
+            DelegationKey delegationKey = new DelegationKey();
+            delegationKey.readFields(in);
+            fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
+                delegationKey);
+            break;
+          }
+          default: {
+            throw new IOException("Never seen opCode " + opCode);
+          }
+          }
+          validateChecksum(in, checksum, numEdits);
+        }
+      } catch (IOException ex) {
+        check203UpgradeFailure(logVersion, ex);
+      } finally {
+        if(closeOnExit)
+          in.close();
+      }
+    } catch (Throwable t) {
+      // Catch Throwable because in the case of a truly corrupt edits log, any
+      // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
+      StringBuilder sb = new StringBuilder();
+      sb.append("Error replaying edit log at offset " + tracker.getPos());
+      if (recentOpcodeOffsets[0] != -1) {
+        Arrays.sort(recentOpcodeOffsets);
+        sb.append("\nRecent opcode offsets:");
+        for (long offset : recentOpcodeOffsets) {
+          if (offset != -1) {
+            sb.append(' ').append(offset);
           }
-          fsDir.unprotectedMkdir(path, permissions, timestamp);
-          break;
-        }
-        case OP_SET_GENSTAMP: {
-          numOpSetGenStamp++;
-          long lw = in.readLong();
-          fsNamesys.setGenerationStamp(lw);
-          break;
-        } 
-        case OP_DATANODE_ADD: {
-          numOpOther++;
-          //Datanodes are not persistent any more.
-          FSImageSerialization.DatanodeImage.skipOne(in);
-          break;
-        }
-        case OP_DATANODE_REMOVE: {
-          numOpOther++;
-          DatanodeID nodeID = new DatanodeID();
-          nodeID.readFields(in);
-          //Datanodes are not persistent any more.
-          break;
-        }
-        case OP_SET_PERMISSIONS: {
-          numOpSetPerm++;
-          if (logVersion > -11)
-            throw new IOException("Unexpected opCode " + opCode
-                                  + " for version " + logVersion);
-          fsDir.unprotectedSetPermission(
-              FSImageSerialization.readString(in), FsPermission.read(in));
-          break;
-        }
-        case OP_SET_OWNER: {
-          numOpSetOwner++;
-          if (logVersion > -11)
-            throw new IOException("Unexpected opCode " + opCode
-                                  + " for version " + logVersion);
-          fsDir.unprotectedSetOwner(FSImageSerialization.readString(in),
-              FSImageSerialization.readString_EmptyAsNull(in),
-              FSImageSerialization.readString_EmptyAsNull(in));
-          break;
-        }
-        case OP_SET_NS_QUOTA: {
-          if (logVersion > -16) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), 
-                                    readLongWritable(in), 
-                                    FSConstants.QUOTA_DONT_SET);
-          break;
-        }
-        case OP_CLEAR_NS_QUOTA: {
-          if (logVersion > -16) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
-                                    FSConstants.QUOTA_RESET,
-                                    FSConstants.QUOTA_DONT_SET);
-          break;
-        }
-
-        case OP_SET_QUOTA:
-          fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
-                                    readLongWritable(in),
-                                    readLongWritable(in));
-                                      
-          break;
-
-        case OP_TIMES: {
-          numOpTimes++;
-          int length = in.readInt();
-          if (length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "times operation.");
-          }
-          path = FSImageSerialization.readString(in);
-          mtime = readLong(in);
-          atime = readLong(in);
-          fsDir.unprotectedSetTimes(path, mtime, atime, true);
-          break;
-        }
-        case OP_SYMLINK: {
-          numOpSymlink++;
-          int length = in.readInt();
-          if (length != 4) {
-            throw new IOException("Incorrect data format. " 
-                                  + "symlink operation.");
-          }
-          path = FSImageSerialization.readString(in);
-          String value = FSImageSerialization.readString(in);
-          mtime = readLong(in);
-          atime = readLong(in);
-          PermissionStatus perm = PermissionStatus.read(in);
-          fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
-          break;
-        }
-        case OP_RENAME: {
-          if (logVersion > -21) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpRename++;
-          int length = in.readInt();
-          if (length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          String s = FSImageSerialization.readString(in);
-          String d = FSImageSerialization.readString(in);
-          timestamp = readLong(in);
-          Rename[] options = readRenameOptions(in);
-          HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
-          fsDir.unprotectedRenameTo(s, d, timestamp, options);
-          fsNamesys.changeLease(s, d, dinfo);
-          break;
-        }
-        case OP_GET_DELEGATION_TOKEN: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpGetDelegationToken++;
-          DelegationTokenIdentifier delegationTokenId = 
-              new DelegationTokenIdentifier();
-          delegationTokenId.readFields(in);
-          long expiryTime = readLong(in);
-          fsNamesys.getDelegationTokenSecretManager()
-              .addPersistedDelegationToken(delegationTokenId, expiryTime);
-          break;
-        }
-        case OP_RENEW_DELEGATION_TOKEN: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpRenewDelegationToken++;
-          DelegationTokenIdentifier delegationTokenId = 
-              new DelegationTokenIdentifier();
-          delegationTokenId.readFields(in);
-          long expiryTime = readLong(in);
-          fsNamesys.getDelegationTokenSecretManager()
-              .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
-          break;
-        }
-        case OP_CANCEL_DELEGATION_TOKEN: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpCancelDelegationToken++;
-          DelegationTokenIdentifier delegationTokenId = 
-              new DelegationTokenIdentifier();
-          delegationTokenId.readFields(in);
-          fsNamesys.getDelegationTokenSecretManager()
-              .updatePersistedTokenCancellation(delegationTokenId);
-          break;
-        }
-        case OP_UPDATE_MASTER_KEY: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpUpdateMasterKey++;
-          DelegationKey delegationKey = new DelegationKey();
-          delegationKey.readFields(in);
-          fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
-              delegationKey);
-          break;
-        }
-        default: {
-          throw new IOException("Never seen opCode " + opCode);
-        }
         }
-        validateChecksum(in, checksum, numEdits);
       }
-    } catch (IOException ex) {
-      check203UpgradeFailure(logVersion, ex);
-    } finally {
-      if(closeOnExit)
-        in.close();
+      String errorMessage = sb.toString();
+      FSImage.LOG.error(errorMessage);
+      throw new IOException(errorMessage, t);
     }
     if (FSImage.LOG.isDebugEnabled()) {
       FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
@@ -693,4 +723,52 @@ public class FSEditLogLoader {
       throw ex;
     }
   }
+  
+  /**
+   * Stream wrapper that keeps track of the current file position.
+   */
+  private static class PositionTrackingInputStream extends FilterInputStream {
+    private long curPos = 0;
+    private long markPos = -1;
+
+    public PositionTrackingInputStream(InputStream is) {
+      super(is);
+    }
+
+    public int read() throws IOException {
+      int ret = super.read();
+      if (ret != -1) curPos++;
+      return ret;
+    }
+
+    public int read(byte[] data) throws IOException {
+      int ret = super.read(data);
+      if (ret > 0) curPos += ret;
+      return ret;
+    }
+
+    public int read(byte[] data, int offset, int length) throws IOException {
+      int ret = super.read(data, offset, length);
+      if (ret > 0) curPos += ret;
+      return ret;
+    }
+
+    public void mark(int limit) {
+      super.mark(limit);
+      markPos = curPos;
+    }
+
+    public void reset() throws IOException {
+      if (markPos == -1) {
+        throw new IOException("Not marked!");
+      }
+      super.reset();
+      curPos = markPos;
+      markPos = -1;
+    }
+
+    public long getPos() {
+      return curPos;
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Thu May 12 23:34:50 2011
@@ -365,29 +365,62 @@ public class FSImage implements NNStorag
     storage.cTime = now();  // generate new cTime for the state
     int oldLV = storage.getLayoutVersion();
     storage.layoutVersion = FSConstants.LAYOUT_VERSION;
+    
+    List<StorageDirectory> errorSDs =
+      Collections.synchronizedList(new ArrayList<StorageDirectory>());
+    List<Thread> saveThreads = new ArrayList<Thread>();
+    File curDir, prevDir, tmpDir;
     for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
-      LOG.info("Upgrading image directory " + sd.getRoot()
+      LOG.info("Starting upgrade of image directory " + sd.getRoot()
                + ".\n   old LV = " + oldLV
                + "; old CTime = " + oldCTime
                + ".\n   new LV = " + storage.getLayoutVersion()
                + "; new CTime = " + storage.getCTime());
-      File curDir = sd.getCurrentDir();
-      File prevDir = sd.getPreviousDir();
-      File tmpDir = sd.getPreviousTmp();
-      assert curDir.exists() : "Current directory must exist.";
-      assert !prevDir.exists() : "prvious directory must not exist.";
-      assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
-      assert !editLog.isOpen() : "Edits log must not be open.";
-      // rename current to tmp
-      NNStorage.rename(curDir, tmpDir);
-      // save new image
-      saveCurrent(sd);
-      // rename tmp to previous
-      NNStorage.rename(tmpDir, prevDir);
-      isUpgradeFinalized = false;
+      try {
+        curDir = sd.getCurrentDir();
+        prevDir = sd.getPreviousDir();
+        tmpDir = sd.getPreviousTmp();
+        assert curDir.exists() : "Current directory must exist.";
+        assert !prevDir.exists() : "prvious directory must not exist.";
+        assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
+        assert !editLog.isOpen() : "Edits log must not be open.";
+
+        // rename current to tmp
+        NNStorage.rename(curDir, tmpDir);
+        
+        // launch thread to save new image
+        FSImageSaver saver = new FSImageSaver(sd, errorSDs);
+        Thread saveThread = new Thread(saver, saver.toString());
+        saveThreads.add(saveThread);
+        saveThread.start();
+        
+      } catch (Exception e) {
+        LOG.error("Failed upgrade of image directory " + sd.getRoot(), e);
+        errorSDs.add(sd);
+        continue;
+      }
+    }
+    waitForThreads(saveThreads);
+    saveThreads.clear();
+
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      if (errorSDs.contains(sd)) continue;
+      try {
+        prevDir = sd.getPreviousDir();
+        tmpDir = sd.getPreviousTmp();
+        // rename tmp to previous
+        NNStorage.rename(tmpDir, prevDir);
+      } catch (IOException ioe) {
+        LOG.error("Unable to rename temp to previous for " + sd.getRoot(), ioe);
+        errorSDs.add(sd);
+        continue;
+      }
       LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
     }
+    isUpgradeFinalized = false;
+    storage.reportErrorsOnDirectories(errorSDs);
     storage.initializeDistributedUpgrade();
     editLog.open();
   }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu May 12 23:34:50 2011
@@ -5509,6 +5509,11 @@ public class FSNamesystem implements FSC
   }
 
   @Override // NameNodeMXBean
+  public long getNumberOfMissingBlocks() {
+    return getMissingBlocksCount();
+  }
+  
+  @Override // NameNodeMXBean
   public int getThreads() {
     return ManagementFactory.getThreadMXBean().getThreadCount();
   }
@@ -5519,11 +5524,15 @@ public class FSNamesystem implements FSC
    */
   @Override // NameNodeMXBean
   public String getLiveNodes() {
-    final Map<String, Object> info = new HashMap<String, Object>();
-    final ArrayList<DatanodeDescriptor> aliveNodeList =
-      this.getDatanodeListForReport(DatanodeReportType.LIVE); 
-    removeDecomNodeFromList(aliveNodeList);
-    for (DatanodeDescriptor node : aliveNodeList) {
+    final Map<String, Map<String,Object>> info = 
+      new HashMap<String, Map<String,Object>>();
+    final ArrayList<DatanodeDescriptor> liveNodeList = 
+      new ArrayList<DatanodeDescriptor>();
+    final ArrayList<DatanodeDescriptor> deadNodeList =
+      new ArrayList<DatanodeDescriptor>();
+    DFSNodesStatus(liveNodeList, deadNodeList);
+    removeDecomNodeFromList(liveNodeList);
+    for (DatanodeDescriptor node : liveNodeList) {
       final Map<String, Object> innerinfo = new HashMap<String, Object>();
       innerinfo.put("lastContact", getLastContact(node));
       innerinfo.put("usedSpace", getDfsUsed(node));
@@ -5539,9 +5548,14 @@ public class FSNamesystem implements FSC
    */
   @Override // NameNodeMXBean
   public String getDeadNodes() {
-    final Map<String, Object> info = new HashMap<String, Object>();
+    final Map<String, Map<String, Object>> info = 
+      new HashMap<String, Map<String, Object>>();
+    final ArrayList<DatanodeDescriptor> liveNodeList =
+    new ArrayList<DatanodeDescriptor>();
     final ArrayList<DatanodeDescriptor> deadNodeList =
-      this.getDatanodeListForReport(DatanodeReportType.DEAD); 
+    new ArrayList<DatanodeDescriptor>();
+    // we need to call DFSNodeStatus to filter out the dead data nodes
+    DFSNodesStatus(liveNodeList, deadNodeList);
     removeDecomNodeFromList(deadNodeList);
     for (DatanodeDescriptor node : deadNodeList) {
       final Map<String, Object> innerinfo = new HashMap<String, Object>();
@@ -5558,7 +5572,8 @@ public class FSNamesystem implements FSC
    */
   @Override // NameNodeMXBean
   public String getDecomNodes() {
-    final Map<String, Object> info = new HashMap<String, Object>();
+    final Map<String, Map<String, Object>> info = 
+      new HashMap<String, Map<String, Object>>();
     final ArrayList<DatanodeDescriptor> decomNodeList = 
       this.getDecommissioningNodes();
     for (DatanodeDescriptor node : decomNodeList) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java Thu May 12 23:34:50 2011
@@ -119,6 +119,13 @@ public interface NameNodeMXBean {
   public long getTotalFiles();
   
   /**
+   * Gets the total number of missing blocks on the cluster
+   * 
+   * @return the total number of files and blocks on the cluster
+   */
+  public long getNumberOfMissingBlocks();
+  
+  /**
    * Gets the number of threads.
    * 
    * @return the number of threads

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1102509&r1=1102508&r2=1102509&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Thu May 12 23:34:50 2011
@@ -99,6 +99,7 @@ public class SecondaryNameNode implement
   private int imagePort;
   private String infoBindAddress;
 
+  private FSNamesystem namesystem;
   private Collection<URI> checkpointDirs;
   private Collection<URI> checkpointEditsDirs;
   private long checkpointPeriod;    // in seconds
@@ -479,8 +480,9 @@ public class SecondaryNameNode implement
    */
   private void doMerge(CheckpointSignature sig, boolean loadImage)
   throws IOException {
-    FSNamesystem namesystem = 
-            new FSNamesystem(checkpointImage, conf);
+    if (loadImage) {
+      namesystem = new FSNamesystem(checkpointImage, conf);
+    }
     assert namesystem.dir.fsImage == checkpointImage;
     checkpointImage.doMerge(sig, loadImage);
   }



Mime
View raw message