hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r627663 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Date Thu, 14 Feb 2008 05:35:21 GMT
Author: dhruba
Date: Wed Feb 13 21:35:14 2008
New Revision: 627663

URL: http://svn.apache.org/viewvc?rev=627663&view=rev
Log:
HADOOP-2345.  New HDFS transactions to support appending
to files.  Disk layout version changed from -11 to -12. (dhruba)


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Feb 13 21:35:14 2008
@@ -7,6 +7,9 @@
 
     HADOOP-2786.  Move hbase out of hadoop core
 
+    HADOOP-2345.  New HDFS transactions to support appending 
+    to files.  Disk layout version changed from -11 to -12. (dhruba)
+
   NEW FEATURES
 
     HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Wed Feb 13 21:35:14
2008
@@ -35,9 +35,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 23 : added setOwner(...) and setPermission(...); changed create(...) and mkdir(...)
+   * 24 : added fsync
    */
-  public static final long versionID = 23L;
+  public static final long versionID = 24L;
   
   ///////////////////////////////////////
   // File contents
@@ -437,4 +437,12 @@
    * @return size of directory subtree in bytes
    */
   public long getContentLength(String src) throws IOException;
+
+  /**
+   * Write all metadata for this file into persistent storage.
+   * The file must be currently open for writing.
+   * @param src The string representation of the path
+   * @param clientName The string representation of the client
+   */
+  public void fsync(String src, String client) throws IOException;
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Feb 13 21:35:14
2008
@@ -183,7 +183,7 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -11;
+  public static final int LAYOUT_VERSION = -12;
   // Current version: 
-  // Added permission information to INode.
+  // Introduce OPEN, CLOSE and GENSTAMP transactions for supporting appends
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Wed Feb 13 21:35:14
2008
@@ -124,7 +124,8 @@
                 long preferredBlockSize,
                 String clientName,
                 String clientMachine,
-                DatanodeDescriptor clientNode) 
+                DatanodeDescriptor clientNode,
+                long generationStamp) 
                 throws IOException {
     waitForReady();
 
@@ -134,7 +135,8 @@
         modTime)) {
       return null;
     }
-    INodeFile newNode = new INodeFileUnderConstruction(permissions,replication,
+    INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(
+                                 permissions,replication,
                                  preferredBlockSize, modTime, clientName, 
                                  clientMachine, clientNode);
     synchronized (rootDir) {
@@ -153,8 +155,10 @@
                                    +" to the file system");
       return null;
     }
-    // add create file record to log
-    fsImage.getEditLog().logCreateFile(path, newNode);
+    // add create file record to log, record new generation stamp
+    fsImage.getEditLog().logOpenFile(path, newNode);
+    fsImage.getEditLog().logGenerationStamp(generationStamp);
+
     NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                                   +path+" is added to the file system");
     return newNode;
@@ -171,9 +175,9 @@
     INode newNode;
     if (blocks == null)
       newNode = new INodeDirectory(permissions, modificationTime);
-    else
+    else 
       newNode = new INodeFile(permissions, blocks.length, replication,
-          modificationTime, preferredBlockSize);
+                              modificationTime, preferredBlockSize);
     synchronized (rootDir) {
       try {
         newNode = rootDir.addNode(path, newNode);
@@ -220,20 +224,30 @@
   /**
    * Persist the block list for the inode.
    */
-  void persistBlocks(String path, INode file) throws IOException {
+  void persistBlocks(String path, INodeFileUnderConstruction file) 
+                     throws IOException {
     waitForReady();
 
     synchronized (rootDir) {
-      INodeFile fileNode = (INodeFile) file;
+      fsImage.getEditLog().logOpenFile(path, file);
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
+                                    +path+" with "+ file.getBlocks().length 
+                                    +" blocks is persisted to the file system");
+    }
+  }
 
-      // create two transactions. The first one deletes the empty
-      // file and the second transaction recreates the same file
-      // with the appropriate set of blocks.
-      fsImage.getEditLog().logDelete(path, fileNode.getModificationTime());
+  /**
+   * Close file.
+   */
+  void closeFile(String path, INode file) throws IOException {
+    waitForReady();
 
-      // re-add create file record to log
-      fsImage.getEditLog().logCreateFile(path, fileNode);
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+    synchronized (rootDir) {
+      INodeFile fileNode = (INodeFile) file;
+
+      // file is closed
+      fsImage.getEditLog().logCloseFile(path, fileNode);
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
                                     +path+" with "+ fileNode.getBlocks().length 
                                     +" blocks is persisted to the file system");
     }
@@ -242,26 +256,20 @@
   /**
    * Remove a block to the file.
    */
-  boolean removeBlock(String path, INode file, Block block) throws IOException {
+  boolean removeBlock(String path, INodeFileUnderConstruction fileNode, 
+                      Block block) throws IOException {
     waitForReady();
 
     synchronized (rootDir) {
-      INodeFile fileNode = (INodeFile) file;
-      if (fileNode == null) {
-        throw new IOException("Unknown file: " + path);
-      }
-
       // modify file-> block and blocksMap
       fileNode.removeBlock(block);
       namesystem.blocksMap.removeINode(block);
 
-      // create two transactions. The first one deletes the empty
-      // file and the second transaction recreates the same file
-      // with the appropriate set of blocks.
-      fsImage.getEditLog().logDelete(path, fileNode.getModificationTime());
+      // Remove the block locations for the last block.
+      fileNode.setLastBlockLocations(new DatanodeDescriptor[0]);
 
-      // re-add create file record to log
-      fsImage.getEditLog().logCreateFile(path, fileNode);
+      // write modified block locations to log
+      fsImage.getEditLog().logOpenFile(path, fileNode);
       NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                                     +path+" with "+block
                                     +" block is added to the file system");
@@ -433,20 +441,22 @@
   /**
    * Remove the file from management, return blocks
    */
-  public Block[] delete(String src) {
+  public INode delete(String src, Collection<Block> deletedBlocks) {
     NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "
                                   +src);
     waitForReady();
     long now = FSNamesystem.now();
-    Block[] blocks = unprotectedDelete(src, now); 
-    if (blocks != null)
+    INode deletedNode = unprotectedDelete(src, now, deletedBlocks); 
+    if (deletedNode != null) {
       fsImage.getEditLog().logDelete(src, now);
-    return blocks;
+    }
+    return deletedNode;
   }
 
   /**
    */
-  Block[] unprotectedDelete(String src, long modificationTime) {
+  INode unprotectedDelete(String src, long modificationTime, 
+                          Collection<Block> deletedBlocks) {
     synchronized (rootDir) {
       INode targetNode = rootDir.getNode(src);
       if (targetNode == null) {
@@ -472,8 +482,11 @@
           totalInodes -= filesRemoved;
           for (Block b : v) {
             namesystem.blocksMap.removeINode(b);
+            if (deletedBlocks != null) {
+              deletedBlocks.add(b);
+            }
           }
-          return v.toArray(new Block[v.size()]);
+          return targetNode;
         }
       }
     }
@@ -567,7 +580,6 @@
    * Get {@link INode} associated with the file.
    */
   INodeFile getFileINode(String src) {
-    waitForReady();
     synchronized (rootDir) {
       INode inode = rootDir.getNode(src);
       if (inode == null || inode.isDirectory())

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Wed Feb 13 21:35:14 2008
@@ -39,15 +39,17 @@
  */
 class FSEditLog {
   private static final byte OP_ADD = 0;
-  private static final byte OP_RENAME = 1;
-  private static final byte OP_DELETE = 2;
-  private static final byte OP_MKDIR = 3;
-  private static final byte OP_SET_REPLICATION = 4;
-  //the following two are used only for backword compatibility :
+  private static final byte OP_RENAME = 1;  // rename
+  private static final byte OP_DELETE = 2;  // delete
+  private static final byte OP_MKDIR = 3;   // create directory
+  private static final byte OP_SET_REPLICATION = 4; // set replication
+  //the following two are used only for backward compatibility :
   @Deprecated private static final byte OP_DATANODE_ADD = 5;
   @Deprecated private static final byte OP_DATANODE_REMOVE = 6;
   private static final byte OP_SET_PERMISSIONS = 7;
   private static final byte OP_SET_OWNER = 8;
+  private static final byte OP_CLOSE = 9;    // close after write
+  private static final byte OP_SET_GENSTAMP = 10;    // store genstamp
   private static int sizeFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
@@ -377,6 +379,10 @@
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
     int logVersion = 0;
+    INode old = null;
+    String clientName = null;
+    String clientMachine = null;
+	DatanodeDescriptor lastLocations[] = null;
     
     if (edits != null) {
       DataInputStream in = new DataInputStream(
@@ -420,8 +426,10 @@
           }
           numEdits++;
           switch (opcode) {
-          case OP_ADD: {
+          case OP_ADD:
+          case OP_CLOSE: {
             UTF8 name = new UTF8();
+            String path = null;
             ArrayWritable aw = null;
             Writable writables[];
             // version 0 does not support per file replication
@@ -441,6 +449,7 @@
                                         writables.length + ". ");
               }
               name = (UTF8) writables[0];
+              path = name.toString();
               replication = Short.parseShort(
                                              ((UTF8)writables[1]).toString());
               replication = adjustReplication(replication);
@@ -470,16 +479,77 @@
                 blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
               }
             }
+             
             PermissionStatus permissions = fsNamesys.getUpgradePermission();
             if (logVersion <= -11) {
               permissions = PermissionStatus.read(in);
             }
 
+            // clientname, clientMachine and block locations of last block.
+            clientName = null;
+            clientMachine = null;
+            if (opcode == OP_ADD && logVersion <= -12) {
+              UTF8 uu = new UTF8();
+              UTF8 cl = new UTF8();
+              aw = new ArrayWritable(DatanodeDescriptor.class);
+              uu.readFields(in);
+              cl.readFields(in);
+              aw.readFields(in);
+              clientName = uu.toString();
+              clientMachine = cl.toString();
+              writables = aw.get();
+              lastLocations = new DatanodeDescriptor[writables.length];
+              System.arraycopy(writables, 0, lastLocations, 0, writables.length);
+            }
+  
+            // The open lease transaction re-creates a file if necessary.
+            // Delete the file if it already exists.
+            if (FSNamesystem.LOG.isDebugEnabled()) {
+              FSNamesystem.LOG.debug(opcode + ": " + name.toString() + 
+                                     " numblocks : " + blocks.length +
+                                     " clientHolder " +  
+                                     ((clientName != null) ? clientName : "") +
+                                     " clientMachine " +
+                                     ((clientMachine != null) ? clientMachine : ""));
+            }
+
+            old = fsDir.unprotectedDelete(path, mtime, null);
+
             // add to the file tree
-            fsDir.unprotectedAddFile(name.toString(), permissions,
-                blocks, replication, mtime, blockSize);
+            INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
+                                                      path, permissions,
+                                                      blocks, replication, 
+                                                      mtime, blockSize);
+            if (opcode == OP_ADD) {
+              //
+              // Replace current node with a INodeUnderConstruction.
+              // Recreate in-memory lease record.
+              //
+              INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                                        INode.string2Bytes(node.getLocalName()),
+                                        node.getReplication(), 
+                                        node.getModificationTime(),
+                                        node.getPreferredBlockSize(),
+                                        node.getBlocks(),
+                                        node.getPermissionStatus(),
+                                        clientName, 
+                                        clientMachine, 
+                                        null, 
+                                        lastLocations);
+              fsDir.replaceNode(path, node, cons);
+              fsNamesys.addLease(path, clientName);
+            } else if (opcode == OP_CLOSE) {
+              //
+              // Remove lease if it exists.
+              //
+              if (old.isUnderConstruction()) {
+                INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
+                                                     old;
+                fsNamesys.removeLease(path, cons.getClientName());
+              }
+            }
             break;
-          }
+          } 
           case OP_SET_REPLICATION: {
             UTF8 src = new UTF8();
             UTF8 repl = new UTF8();
@@ -534,7 +604,11 @@
               src = (UTF8) writables[0];
               timestamp = Long.parseLong(((UTF8)writables[1]).toString());
             }
-            fsDir.unprotectedDelete(src.toString(), timestamp);
+            old = fsDir.unprotectedDelete(src.toString(), timestamp, null);
+            if (old != null && old.isUnderConstruction()) {
+              INodeFileUnderConstruction cons = (INodeFileUnderConstruction)old;
+              fsNamesys.removeLease(src.toString(), cons.getClientName());
+            }
             break;
           }
           case OP_MKDIR: {
@@ -563,6 +637,12 @@
             fsDir.unprotectedMkdir(src.toString(),permissions,false,timestamp);
             break;
           }
+          case OP_SET_GENSTAMP: {
+            LongWritable aw = new LongWritable();
+            aw.readFields(in);
+            fsDir.namesystem.setGenerationStamp(aw.get());
+            break;
+          } 
           case OP_DATANODE_ADD: {
             if (logVersion > -3)
               throw new IOException("Unexpected opcode " + opcode 
@@ -657,7 +737,7 @@
     //
     // record the transactionId when new data was written to the edits log
     //
-    TransactionId id = (TransactionId)myTransactionId.get();
+    TransactionId id = myTransactionId.get();
     id.txid = txid;
 
     // update statistics
@@ -675,7 +755,7 @@
     long syncStart = 0;
 
     // Fetch the transactionId of this thread. 
-    TransactionId id = (TransactionId)myTransactionId.get();
+    TransactionId id = myTransactionId.get();
     long mytxid = id.txid;
 
     synchronized (this) {
@@ -766,9 +846,14 @@
   }
 
   /** 
-   * Add create file record to edit log
+   * Add open lease record to edit log. 
+   * Records the block locations of the last block.
    */
-  void logCreateFile(String path, INodeFile newNode) {
+  void logOpenFile(String path, INodeFileUnderConstruction newNode) 
+                   throws IOException {
+
+    DatanodeDescriptor[] locations = newNode.getLastBlockLocations();
+
     UTF8 nameReplicationPair[] = new UTF8[] { 
       new UTF8(path), 
       FSEditLog.toLogReplication(newNode.getReplication()),
@@ -777,6 +862,24 @@
     logEdit(OP_ADD,
             new ArrayWritable(UTF8.class, nameReplicationPair), 
             new ArrayWritable(Block.class, newNode.getBlocks()),
+            newNode.getPermissionStatus(),
+            new UTF8(newNode.getClientName()),
+            new UTF8(newNode.getClientMachine()),
+            new ArrayWritable(DatanodeDescriptor.class, locations));
+  }
+
+  /** 
+   * Add close lease record to edit log.
+   */
+  void logCloseFile(String path, INodeFile newNode) {
+    UTF8 nameReplicationPair[] = new UTF8[] {
+      new UTF8(path),
+      FSEditLog.toLogReplication(newNode.getReplication()),
+      FSEditLog.toLogLong(newNode.getModificationTime()),
+      FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
+    logEdit(OP_CLOSE,
+            new ArrayWritable(UTF8.class, nameReplicationPair),
+            new ArrayWritable(Block.class, newNode.getBlocks()),
             newNode.getPermissionStatus());
   }
   
@@ -833,6 +936,13 @@
       new UTF8(src),
       FSEditLog.toLogLong(timestamp)};
     logEdit(OP_DELETE, new ArrayWritable(UTF8.class, info));
+  }
+
+  /** 
+   * Add generation stamp record to edit log
+   */
+  void logGenerationStamp(long genstamp) {
+    logEdit(OP_SET_GENSTAMP, new LongWritable(genstamp));
   }
   
   static UTF8 toLogReplication(short replication) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Wed Feb 13 21:35:14 2008
@@ -41,6 +41,7 @@
 import org.apache.hadoop.dfs.FSConstants.NodeType;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
 
 /**
  * FSImage handles checkpointing and logging of the namespace edits.
@@ -668,8 +669,9 @@
       // read image version: first appeared in version -1
       int imgVersion = in.readInt();
       // read namespaceID: first appeared in version -2
-      if (imgVersion <= -2)
+      if (imgVersion <= -2) {
         this.namespaceID = in.readInt();
+      }
       // read number of files
       int numFiles = 0;
       // version 0 does not store version #
@@ -681,6 +683,11 @@
         numFiles = in.readInt();
       }
       this.layoutVersion = imgVersion;
+      // read in the last generation stamp.
+      if (imgVersion <= -12) {
+        long genstamp = in.readLong();
+        fsNamesys.setGenerationStamp(genstamp); 
+      }
 
       needToSave = (imgVersion != FSConstants.LAYOUT_VERSION);
 
@@ -737,6 +744,9 @@
       
       // load datanode info
       this.loadDatanodes(imgVersion, in);
+
+      // load Files Under Construction
+      this.loadFilesUnderConstruction(imgVersion, in, fsNamesys);
     } finally {
       in.close();
     }
@@ -776,8 +786,9 @@
       out.writeInt(FSConstants.LAYOUT_VERSION);
       out.writeInt(namespaceID);
       out.writeInt(fsDir.rootDir.numItemsInTree() - 1);
+      out.writeLong(fsNamesys.getGenerationStamp());
       saveImage("", fsDir.rootDir, out);
-      saveDatanodes(out);
+      fsNamesys.saveFilesUnderConstruction(out);
     } finally {
       out.close();
     }
@@ -882,21 +893,12 @@
     }
   }
 
-  /**
-   * Earlier version used to store all the known datanodes.
-   * DFS don't store datanodes anymore.
-   * 
-   * @param out output stream
-   * @throws IOException
-   */
-  void saveDatanodes(DataOutputStream out) throws IOException {
-    // we don't store datanodes anymore.
-    out.writeInt(0);    
-  }
-
   void loadDatanodes(int version, DataInputStream in) throws IOException {
     if (version > -3) // pre datanode image version
       return;
+    if (version <= -12) {
+      return; // new versions do not store the datanodes any more.
+    }
     int size = in.readInt();
     for(int i = 0; i < size; i++) {
       DatanodeImage nodeImage = new DatanodeImage();
@@ -905,6 +907,104 @@
     }
   }
 
+  private void loadFilesUnderConstruction(int version, DataInputStream in, 
+                                  FSNamesystem fs) throws IOException {
+
+    FSDirectory fsDir = fs.dir;
+    if (version > -12) // pre lease image version
+      return;
+    int size = in.readInt();
+
+    for (int i = 0; i < size; i++) {
+      INodeFileUnderConstruction cons = readINodeUnderConstruction(in);
+
+      // verify that file exists in namespace
+      String path = cons.getLocalName();
+      INode old = fsDir.getFileINode(path);
+      if (old == null) {
+        throw new IOException("Found lease for non-existent file " + path);
+      }
+      if (old.isDirectory()) {
+        throw new IOException("Found lease for directory " + path);
+      }
+      INodeFile oldnode = (INodeFile) old;
+      fsDir.replaceNode(path, oldnode, cons);
+      fs.addLease(path, cons.getClientName()); 
+    }
+    if (fs.countLease() != size) {
+      throw new IOException("Created " + size + " leases but found " +
+                            fs.countLease());
+    }
+  }
+
+  // Helper function that reads in an INodeUnderConstruction
+  // from the input stream
+  //
+  static INodeFileUnderConstruction readINodeUnderConstruction(
+                            DataInputStream in) throws IOException {
+    UTF8 src = new UTF8();
+    src.readFields(in);
+    byte[] name = src.getBytes();
+    short blockReplication = in.readShort();
+    long modificationTime = in.readLong();
+    long preferredBlockSize = in.readLong();
+    int numBlocks = in.readInt();
+    BlockInfo[] blocks = new BlockInfo[numBlocks];
+    for (int i = 0; i < numBlocks; i++) {
+      blocks[i].readFields(in);
+    }
+
+    PermissionStatus perm = PermissionStatus.read(in);
+    UTF8 clientName = new UTF8();
+    clientName.readFields(in);
+    UTF8 clientMachine = new UTF8();
+    clientMachine.readFields(in);
+
+    int numLocs = in.readInt();
+    DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
+    for (int i = 0; i < numLocs; i++) {
+      locations[i].readFields(in);
+    }
+
+    return new INodeFileUnderConstruction(name, 
+                                          blockReplication, 
+                                          modificationTime,
+                                          preferredBlockSize,
+                                          blocks,
+                                          perm,
+                                          clientName.toString(),
+                                          clientMachine.toString(),
+                                          null,
+                                          locations);
+
+  }
+
+  // Helper function that writes an INodeUnderConstruction
+  // into the input stream
+  //
+  static void writeINodeUnderConstruction(DataOutputStream out,
+                                           INodeFileUnderConstruction cons) 
+                                           throws IOException {
+    new UTF8(cons.getAbsoluteName()).write(out);
+    out.writeShort(cons.getReplication());
+    out.writeLong(cons.getModificationTime());
+    out.writeLong(cons.getPreferredBlockSize());
+    int nrBlocks = cons.getBlocks().length;
+    out.writeInt(nrBlocks);
+    for (int i = 0; i < nrBlocks; i++) {
+      cons.getBlocks()[i].write(out);
+    }
+    cons.getPermissionStatus().write(out);
+    new UTF8(cons.getClientName()).write(out);
+    new UTF8(cons.getClientMachine()).write(out);
+
+    int numLocs = cons.getLastBlockLocations().length;
+    out.writeInt(numLocs);
+    for (int i = 0; i < numLocs; i++) {
+      cons.getLastBlockLocations()[i].write(out);
+    }
+  }
+
   /**
    * Moves fsimage.ckpt to fsImage and edits.new to edits
    * Reopens the new edits file.
@@ -1118,4 +1218,5 @@
         + um.getUpgradeVersion() + " to current LV " 
         + FSConstants.LAYOUT_VERSION + " is initialized.");
   }
+
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Feb 13 21:35:14
2008
@@ -38,6 +38,7 @@
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.io.DataOutputStream;
 import java.net.InetSocketAddress;
 import java.util.*;
 import java.util.Map.Entry;
@@ -221,6 +222,12 @@
 
   private long maxFsObjects = 0;          // maximum number of fs objects
 
+  /**
+   * The global generation stamp for this file system. 
+   * Valid values start from 1000.
+   */
+  private GenerationStamp generationStamp = new GenerationStamp(1000);
+
   private long softLimit = LEASE_SOFTLIMIT_PERIOD;
   private long hardLimit = LEASE_HARDLIMIT_PERIOD;
 
@@ -1029,27 +1036,19 @@
       DatanodeDescriptor clientNode = 
         host2DataNodeMap.getDatanodeByHost(clientMachine);
 
-      synchronized (sortedLeases) {
-        Lease lease = getLease(holder);
-        if (lease == null) {
-          lease = new Lease(holder);
-          putLease(holder, lease);
-          sortedLeases.add(lease);
-        } else {
-          sortedLeases.remove(lease);
-          lease.renew();
-          sortedLeases.add(lease);
-        }
-        lease.startedCreate(src);
-      }
+      addLease(src, holder);
 
       //
       // Now we can add the name to the filesystem. This file has no
       // blocks associated with it.
       //
       checkFsObjectLimit();
+
+      // increment global generation stamp
+      long genstamp = generationStamp.nextStamp();
+
       INode newNode = dir.addFile(src, permissions,
-          replication, blockSize, holder, clientMachine, clientNode);
+          replication, blockSize, holder, clientMachine, clientNode, genstamp);
       if (newNode == null) {
         throw new IOException("DIR* NameSystem.startFile: " +
                               "Unable to add file to namespace.");
@@ -1106,28 +1105,30 @@
       blockSize = pendingFile.getPreferredBlockSize();
       clientNode = pendingFile.getClientNode();
       replication = (int)pendingFile.getReplication();
-      newBlock = allocateBlock(src, pendingFile);
     }
 
+    // choose targets for the new block tobe allocated.
     DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
                                                            clientNode,
                                                            null,
                                                            blockSize);
     if (targets.length < this.minReplication) {
-      // if we could not find any targets, remove this block from file
-      synchronized (this) {
-        INodeFile iFile = dir.getFileINode(src);
-        if (iFile != null && iFile.isUnderConstruction()) {
-          INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
-          if (pendingFile.getClientName().equals(clientName)) {
-            dir.removeBlock(src, pendingFile, newBlock);
-          }
-        }
-      }
       throw new IOException("File " + src + " could only be replicated to " +
                             targets.length + " nodes, instead of " +
                             minReplication);
     }
+
+    // Allocate a new block and record it in the INode. 
+    synchronized (this) {
+      INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
+      if (!checkFileProgress(pendingFile, false)) {
+        throw new NotReplicatedYetException("Not replicated yet:" + src);
+      }
+
+      // allocate new block record block locations in INode.
+      newBlock = allocateBlock(src, pendingFile);
+      pendingFile.setLastBlockLocations(targets);
+    }
         
     // Create next block
     return new LocatedBlock(newBlock, targets, fileLength);
@@ -1143,7 +1144,7 @@
     //
     NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                   +b.getBlockName()+"of file "+src);
-    INode file = checkLease(src, holder);
+    INodeFileUnderConstruction file = checkLease(src, holder);
     dir.removeBlock(src, file, b);
     NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                     + b.getBlockName()
@@ -1234,22 +1235,13 @@
     INodeFile newFile = pendingFile.convertToInodeFile();
     dir.replaceNode(src, pendingFile, newFile);
 
-    // persist block allocations for this file
-    dir.persistBlocks(src, newFile);
+    // close file and persist block allocations for this file
+    dir.closeFile(src, newFile);
 
     NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
                                   + " blocklist persisted");
 
-    synchronized (sortedLeases) {
-      Lease lease = getLease(holder);
-      if (lease != null) {
-        lease.completedCreate(src);
-        if (!lease.hasLocks()) {
-          removeLease(holder);
-          sortedLeases.remove(lease);
-        }
-      }
-    }
+    removeLease(src, holder);
 
     //
     // REMIND - mjc - this should be done only after we wait a few secs.
@@ -1463,23 +1455,26 @@
       checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
     }
 
-    Block deletedBlocks[] = dir.delete(src);
-    if (deletedBlocks != null) {
-      for (int i = 0; i < deletedBlocks.length; i++) {
-        Block b = deletedBlocks[i];
-                
-        for (Iterator<DatanodeDescriptor> it = 
-               blocksMap.nodeIterator(b); it.hasNext();) {
-          DatanodeDescriptor node = it.next();
-          addToInvalidates(b, node);
-          NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
-                                        + b.getBlockName() + " is added to invalidSet of
" 
-                                        + node.getName());
-        }
+    ArrayList<Block> deletedBlocks = new ArrayList<Block>();
+    INode old = dir.delete(src, deletedBlocks);
+    if (old == null) {
+      return false;
+    }
+    for (Block b : deletedBlocks) {
+      for (Iterator<DatanodeDescriptor> it = 
+            blocksMap.nodeIterator(b); it.hasNext();) {
+        DatanodeDescriptor node = it.next();
+        addToInvalidates(b, node);
+        NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
+                                      + b.getBlockName() + " is added to invalidSet of "

+                                      + node.getName());
       }
     }
-
-    return (deletedBlocks != null);
+    if (old.isUnderConstruction()) {
+      INodeFileUnderConstruction cons = (INodeFileUnderConstruction) old;
+      removeLease(src, cons.getClientName());
+    }
+    return true;
   }
 
   /**
@@ -1586,6 +1581,24 @@
     return dir.getContentLength(src);
   }
 
+  /** Persist all metadata about this file.
+   * @param src The string representation of the path
+   * @param clientName The string representation of the client
+   * @throws IOException if path does not exist
+   */
+  void fsync(String src, String clientName) throws IOException {
+
+    NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
+                                  + src + " for " + clientName);
+    synchronized (this) {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot fsync file " + src, safeMode);
+      }
+      INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
+      dir.persistBlocks(src, pendingFile);
+    }
+  }
+
   /************************************************************
    * A Lease governs all the locks held by a single client.
    * For each client there's a corresponding lease, whose
@@ -1689,6 +1702,10 @@
     String getHolder() throws IOException {
       return holder.getString();
     }
+
+    Collection<StringBytesWritable> getPaths() throws IOException {
+      return creates;
+    }
   }
   
   /******************************************************
@@ -1778,8 +1795,8 @@
     INodeFile newFile = pendingFile.convertToInodeFile();
     dir.replaceNode(src, pendingFile, newFile);
 
-    // persist block allocations for this file
-    dir.persistBlocks(src, newFile);
+    // close file and persist block allocations for this file
+    dir.closeFile(src, newFile);
   
     NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " + 
                                   src + " is no longer written to by " + 
@@ -4049,7 +4066,6 @@
     this.lmthread.interrupt();
   }
 
-
   public long getFilesTotal() {
     return this.dir.totalInodes();
   }
@@ -4121,5 +4137,92 @@
       }
     }
     return numDead;
+  }
+
+  /**
+   * Sets the generation stamp for this filesystem
+   */
+  void setGenerationStamp(long stamp) {
+    generationStamp.setStamp(stamp);
+  }
+
+  /**
+   * Gets the generation stamp for this filesystem
+   */
+  long getGenerationStamp() {
+    return generationStamp.getStamp();
+  }
+
+  /**
+   * deletes the lease for the specified file
+   */
+  void removeLease(String src, String holder) throws IOException {
+    synchronized (sortedLeases) {
+      Lease lease = getLease(holder);
+      if (lease != null) {
+        lease.completedCreate(src);
+        if (!lease.hasLocks()) {
+          removeLease(holder);
+          sortedLeases.remove(lease);
+        }
+      }
+    }
+  }
+
+  /**
+   * Adds (or re-adds) the lease for the specified file.
+   */
+  void addLease(String src, String holder) throws IOException {
+    synchronized (sortedLeases) {
+      Lease lease = getLease(holder);
+      if (lease == null) {
+        lease = new Lease(holder);
+        putLease(holder, lease);
+        sortedLeases.add(lease);
+      } else {
+        sortedLeases.remove(lease);
+        lease.renew();
+        sortedLeases.add(lease);
+      }
+      lease.startedCreate(src);
+    }
+  }
+
+  /**
+   * Returns the number of leases currently in the system
+   */
+  int countLease() {
+    synchronized (sortedLeases) {
+      return sortedLeases.size();
+    }
+  }
+
+  /** 
+   * Serializes leases
+   */
+  void saveFilesUnderConstruction(DataOutputStream out) throws IOException {
+    synchronized (sortedLeases) {
+      out.writeInt(sortedLeases.size()); // write the size
+      for (Iterator<Lease> it = sortedLeases.iterator(); it.hasNext();) {
+        Lease lease = it.next();        
+        Collection<StringBytesWritable> files = lease.getPaths();
+        for (Iterator<StringBytesWritable> i = files.iterator(); i.hasNext();){
+          String path = i.next().getString();
+          
+          // verify that path exists in namespace
+          INode node = dir.getFileINode(path);
+          if (node == null) {
+            throw new IOException("saveLeases found path " + path +
+                                  " but no matching entry in namespace.");
+          }
+          if (!node.isUnderConstruction()) {
+            throw new IOException("saveLeases found path " + path +
+                                  " but is not under construction.");
+          }
+          INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+          FSImage.writeINodeUnderConstruction(out, cons);
+        }
+      }
+    }
   }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java Wed Feb 13 21:35:14 2008
@@ -24,10 +24,15 @@
 import java.util.Arrays;
 import java.util.List;
 import java.io.IOException;
+import java.io.DataOutput;
+import java.io.DataInput;
+import java.io.DataOutputStream;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.UTF8;
 
 /**
  * We keep an in-memory representation of the file/block hierarchy.
@@ -67,9 +72,10 @@
     }
   }
 
-  protected INode(String name, PermissionStatus permissions) {
-    this(permissions, 0L);
-    setLocalName(name);
+  protected INode() {
+    name = null;
+    parent = null;
+    modificationTime = 0;
   }
 
   INode(PermissionStatus permissions, long mTime) {
@@ -79,6 +85,11 @@
     setPermissionStatus(permissions);
   }
 
+  protected INode(String name, PermissionStatus permissions) {
+    this(permissions, 0L);
+    setLocalName(name);
+  }
+
   /** Set the {@link PermissionStatus} */
   protected void setPermissionStatus(PermissionStatus ps) {
     setUser(ps.getUserName());
@@ -145,6 +156,13 @@
     this.name = string2Bytes(name);
   }
 
+  /**
+   * Set local file name
+   */
+  void setLocalName(byte[] name) {
+    this.name = name;
+  }
+
   /** {@inheritDoc} */
   public String toString() {
     return "\"" + getLocalName() + "\":" + getPermissionStatus();
@@ -570,7 +588,7 @@
 class INodeFile extends INode {
   static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
 
-  private BlockInfo blocks[] = null;
+  protected BlockInfo blocks[] = null;
   protected short blockReplication;
   protected long preferredBlockSize;
 
@@ -581,6 +599,12 @@
         modificationTime, preferredBlockSize);
   }
 
+  protected INodeFile() {
+    blocks = null;
+    blockReplication = 0;
+    preferredBlockSize = 0;
+  }
+
   protected INodeFile(PermissionStatus permissions, BlockInfo[] blklist,
                       short replication, long modificationTime,
                       long preferredBlockSize) {
@@ -711,6 +735,14 @@
   protected StringBytesWritable clientName;         // lease holder
   protected StringBytesWritable clientMachine;
   protected DatanodeDescriptor clientNode; // if client is a cluster node too.
+  protected DatanodeDescriptor[] targets;  // locations for last block
+
+  INodeFileUnderConstruction() {
+    clientName = null;
+    clientMachine = null;
+    clientNode = null;
+    clientNode = null;
+  }
 
   INodeFileUnderConstruction(PermissionStatus permissions,
                              short replication,
@@ -725,6 +757,27 @@
     this.clientName = new StringBytesWritable(clientName);
     this.clientMachine = new StringBytesWritable(clientMachine);
     this.clientNode = clientNode;
+    this.targets = new DatanodeDescriptor[0];
+  }
+
+  INodeFileUnderConstruction(byte[] name,
+                             short blockReplication,
+                             long modificationTime,
+                             long preferredBlockSize,
+                             BlockInfo[] blocks,
+                             PermissionStatus perm,
+                             String clientName,
+                             String clientMachine,
+                             DatanodeDescriptor clientNode,
+                             DatanodeDescriptor[] targets) 
+                             throws IOException {
+    super(perm, blocks, blockReplication, modificationTime, 
+          preferredBlockSize);
+    setLocalName(name);
+    this.clientName = new StringBytesWritable(clientName);
+    this.clientMachine = new StringBytesWritable(clientMachine);
+    this.clientNode = clientNode;
+    this.targets = targets;
   }
 
   String getClientName() throws IOException {
@@ -737,6 +790,14 @@
 
   DatanodeDescriptor getClientNode() {
     return clientNode;
+  }
+
+  void setLastBlockLocations(DatanodeDescriptor[] targets) {
+    this.targets = targets;
+  }
+
+  DatanodeDescriptor[] getLastBlockLocations() {
+    return this.targets;
   }
 
   /**

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Wed Feb 13 21:35:14 2008
@@ -531,6 +531,11 @@
     return namesystem.getContentLength(src);
   }
 
+  /** {@inheritDoc} */
+  public void fsync(String src, String clientName) throws IOException {
+    namesystem.fsync(src, clientName);
+  }
+
   ////////////////////////////////////////////////////////////////
   // DatanodeProtocol
   ////////////////////////////////////////////////////////////////

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java Wed Feb 13 21:35:14
2008
@@ -38,8 +38,8 @@
   static final int numDatanodes = 1;
 
   // This test creates numThreads threads and each thread does
-  // numberTransactions Transactions concurrently.
-  int numberTransactions = 1000;
+  // 2 * numberTransactions Transactions concurrently.
+  int numberTransactions = 100;
   int numThreads = 100;
 
   //
@@ -62,9 +62,16 @@
           ).createFsOwnerPermissions(new FsPermission((short)0777));
 
       for (int i = 0; i < numTransactions; i++) {
-        INodeFile inode = new INodeFile(p, 0, replication, 0, blockSize);
-        editLog.logCreateFile("/filename" + i, inode);
-        editLog.logSync();
+        try {
+          INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
+                              p, replication, blockSize, 0, "", "", null);
+          editLog.logOpenFile("/filename" + i, inode);
+          editLog.logCloseFile("/filename" + i, inode);
+          editLog.logSync();
+        } catch (IOException e) {
+          System.out.println("Transaction " + i + " encountered exception " +
+                             e);
+        }
       }
     }
   }
@@ -132,10 +139,16 @@
       File editFile = fsimage.getEditFile(i);
       System.out.println("Verifying file: " + editFile);
       int numEdits = editLog.loadFSEdits(editFile);
+      System.out.println("Number of outstanding leases " +
+                         FSNamesystem.getFSNamesystem().countLease());
+
+      assertTrue("Found " + FSNamesystem.getFSNamesystem().countLease() +
+                 " leases but expected 0",
+                 FSNamesystem.getFSNamesystem().countLease() == 0);
       assertTrue("Verification for " + editFile + " failed. " +
-                 "Expected " + (numThreads * numberTransactions) + " transactions. "+
+                 "Expected " + (numThreads * 2 * numberTransactions) + " transactions. "+
                  "Found " + numEdits + " transactions.",
-                 numEdits == numThreads * numberTransactions);
+                 numEdits == numThreads * 2 * numberTransactions);
 
     }
   }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java Wed Feb 13 21:35:14
2008
@@ -331,6 +331,64 @@
     }
   }
 
+  /**
+   * Test that file leases are persisted across namenode restarts.
+   */
+  public void testFileCreationNamenodeRestart() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt("heartbeat.recheck.interval", 1000);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    // create cluster
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    cluster.waitActive();
+    int nnport = cluster.getNameNodePort();
+    InetSocketAddress addr = new InetSocketAddress("localhost", nnport);
+
+    try {
+
+      // create a new file.
+      //
+      Path file1 = new Path("/filestatus.dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1);
+      System.out.println("testFileCreationNamenodeRestart: "
+                         + "Created file filestatus.dat with one "
+                         + " replicas.");
+
+      // restart cluster with the same namenode port as before.
+      cluster.shutdown();
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+      }
+      cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
+                                   null, null, null);
+      cluster.waitActive();
+
+      // write 1 byte to file.  This should succeed because the 
+      // namenode should have persisted leases.
+      byte[] buffer = new byte[1];
+      Random rand = new Random(seed);
+      rand.nextBytes(buffer);
+      stm.write(buffer);
+      stm.close();
+
+      // verify that new block is associated with this file
+      DFSClient client = new DFSClient(addr, conf);
+      LocatedBlocks locations = client.namenode.getBlockLocations(
+                                  file1.toString(), 0, Long.MAX_VALUE);
+      System.out.println("locations = " + locations.locatedBlockCount());
+      assertTrue("Error blocks were not cleaned up",
+                 locations.locatedBlockCount() == 1);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
 /**
  * Test that file data becomes available before file is closed.
  */



Mime
View raw message