hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r395000 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/
Date Tue, 18 Apr 2006 18:58:51 GMT
Author: cutting
Date: Tue Apr 18 11:58:49 2006
New Revision: 395000

URL: http://svn.apache.org/viewcvs?rev=395000&view=rev
Log:
Fix HADOOP-118.  Improved cleanup of abandoned file creations in DFS.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=395000&r1=394999&r2=395000&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Apr 18 11:58:49 2006
@@ -40,6 +40,9 @@
 10. Fix HADOOP-143.  Do not line-wrap stack-traces in web ui.
     (omalley via cutting)
 
+11. Fix HADOOP-118.  In DFS, improve clean up of abandoned file
+    creations.  (omalley via cutting)
+
 
 Release 0.1.1 - 2006-04-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=395000&r1=394999&r2=395000&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Tue Apr 18 11:58:49
2006
@@ -95,8 +95,11 @@
      *
      * Any blocks that have been written for the file will be 
      * garbage-collected.
+     * @param src The filename
+     * @param holder The datanode holding the lease
      */
-    public void abandonFileInProgress(String src) throws IOException;
+    public void abandonFileInProgress(String src, 
+                                      String holder) throws IOException;
 
     /**
      * The client is done writing data to the given filename, and would 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=395000&r1=394999&r2=395000&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Apr 18 11:58:49
2006
@@ -48,7 +48,48 @@
     String clientName;
     Daemon leaseChecker;
     private Configuration conf;
+    
+    /**
+     * A map from name -> DFSOutputStream of files that are currently being
+     * written by this client.
+     */
+    private TreeMap pendingCreates = new TreeMap();
+    
+    /**
+     * A class to track the list of DFS clients, so that they can be closed
+     * on exit.
+     * @author Owen O'Malley
+     */
+    private static class ClientFinalizer extends Thread {
+      private List clients = new ArrayList();
+
+      public synchronized void addClient(DFSClient client) {
+        clients.add(client);
+      }
+
+      public synchronized void run() {
+        Iterator itr = clients.iterator();
+        while (itr.hasNext()) {
+          DFSClient client = (DFSClient) itr.next();
+          if (client.running) {
+            try {
+              client.close();
+            } catch (IOException ie) {
+              System.err.println("Error closing client");
+              ie.printStackTrace();
+            }
+          }
+        }
+      }
+    }
+
+    // add a cleanup thread
+    private static ClientFinalizer clientFinalizer = new ClientFinalizer();
+    static {
+      Runtime.getRuntime().addShutdownHook(clientFinalizer);
+    }
 
+        
     /** 
      * Create a new DFSClient connected to the given namenode server.
      */
@@ -70,14 +111,40 @@
         this.leaseChecker.start();
     }
 
+    private void checkOpen() throws IOException {
+      if (!running) {
+        IOException result = new IOException("Filesystem closed");
+        throw result;
+      }
+    }
+    
     /**
+     * Close the file system, abadoning all of the leases and files being
+     * created.
      */
     public void close() throws IOException {
+      // synchronize in here so that we don't need to change the API
+      synchronized (this) {
+        checkOpen();
+        synchronized (pendingCreates) {
+          Iterator file_itr = pendingCreates.keySet().iterator();
+          while (file_itr.hasNext()) {
+            String name = (String) file_itr.next();
+            try {
+              namenode.abandonFileInProgress(name, clientName);
+            } catch (IOException ie) {
+              System.err.println("Exception abandoning create lock on " + name);
+              ie.printStackTrace();
+            }
+          }
+          pendingCreates.clear();
+        }
         this.running = false;
         try {
             leaseChecker.join();
         } catch (InterruptedException ie) {
         }
+      }
     }
 
     /**
@@ -96,7 +163,8 @@
      * work.
      */
     public FSInputStream open(UTF8 src) throws IOException {
-        // Get block info from namenode
+        checkOpen();
+        //    Get block info from namenode
         return new DFSInputStream(src.toString());
     }
 
@@ -129,7 +197,12 @@
                                   boolean overwrite, 
                                   short replication
                                 ) throws IOException {
-        return new DFSOutputStream(src, overwrite, replication);
+      checkOpen();
+      FSOutputStream result = new DFSOutputStream(src, overwrite, replication);
+      synchronized (pendingCreates) {
+        pendingCreates.put(src.toString(), result);
+      }
+      return result;
     }
 
     /**
@@ -137,6 +210,7 @@
      * there.
      */
     public boolean rename(UTF8 src, UTF8 dst) throws IOException {
+        checkOpen();
         return namenode.rename(src.toString(), dst.toString());
     }
 
@@ -145,24 +219,28 @@
      * there.
      */
     public boolean delete(UTF8 src) throws IOException {
+        checkOpen();
         return namenode.delete(src.toString());
     }
 
     /**
      */
     public boolean exists(UTF8 src) throws IOException {
+        checkOpen();
         return namenode.exists(src.toString());
     }
 
     /**
      */
     public boolean isDirectory(UTF8 src) throws IOException {
+        checkOpen();
         return namenode.isDir(src.toString());
     }
 
     /**
      */
     public DFSFileInfo[] listPaths(UTF8 src) throws IOException {
+        checkOpen();
         return namenode.getListing(src.toString());
     }
 
@@ -187,6 +265,7 @@
     /**
      */
     public boolean mkdirs(UTF8 src) throws IOException {
+        checkOpen();
         return namenode.mkdirs(src.toString());
     }
 
@@ -456,6 +535,7 @@
          * Close it down!
          */
         public synchronized void close() throws IOException {
+            checkOpen();
             if (closed) {
                 throw new IOException("Stream closed");
             }
@@ -473,6 +553,7 @@
          * Basic read()
          */
         public synchronized int read() throws IOException {
+            checkOpen();
             if (closed) {
                 throw new IOException("Stream closed");
             }
@@ -493,6 +574,7 @@
          * Read the entire buffer.
          */
         public synchronized int read(byte buf[], int off, int len) throws IOException {
+            checkOpen();
             if (closed) {
                 throw new IOException("Stream closed");
             }
@@ -646,7 +728,8 @@
                     } catch (InterruptedException iex) {
                     }
                     if (firstTime) {
-                        namenode.abandonFileInProgress(src.toString());
+                        namenode.abandonFileInProgress(src.toString(), 
+                                                       clientName);
                     } else {
                         namenode.abandonBlock(block, src.toString());
                     }
@@ -684,6 +767,7 @@
          * Writes the specified byte to this output stream.
          */
         public synchronized void write(int b) throws IOException {
+            checkOpen();
             if (closed) {
                 throw new IOException("Stream closed");
             }
@@ -701,6 +785,7 @@
          */
       public synchronized void write(byte b[], int off, int len)
         throws IOException {
+            checkOpen();
             if (closed) {
                 throw new IOException("Stream closed");
             }
@@ -724,6 +809,7 @@
          * Flush the buffer, getting a stream to a new block if necessary.
          */
         public synchronized void flush() throws IOException {
+            checkOpen();
             if (closed) {
                 throw new IOException("Stream closed");
             }
@@ -854,20 +940,22 @@
          * resources associated with this stream.
          */
         public synchronized void close() throws IOException {
-            if (closed) {
-                throw new IOException("Stream closed");
-            }
-
+          checkOpen();
+          if (closed) {
+              throw new IOException("Stream closed");
+          }
+          
+          try {
             flush();
             if (filePos == 0 || bytesWrittenToBlock != 0) {
               try {
                 endBlock();
               } catch (IOException e) {
-                namenode.abandonFileInProgress(src.toString());
+                namenode.abandonFileInProgress(src.toString(), clientName);
                 throw e;
               }
             }
-
+            
             backupStream.close();
             backupFile.delete();
 
@@ -880,18 +968,23 @@
             long localstart = System.currentTimeMillis();
             boolean fileComplete = false;
             while (! fileComplete) {
-                fileComplete = namenode.complete(src.toString(), clientName.toString());
-                if (!fileComplete) {
-                    try {
-                        Thread.sleep(400);
-                        if (System.currentTimeMillis() - localstart > 5000) {
-                            LOG.info("Could not complete file, retrying...");
-                        }
-                    } catch (InterruptedException ie) {
-                    }
+              fileComplete = namenode.complete(src.toString(), clientName.toString());
+              if (!fileComplete) {
+                try {
+                  Thread.sleep(400);
+                  if (System.currentTimeMillis() - localstart > 5000) {
+                    LOG.info("Could not complete file, retrying...");
+                  }
+                } catch (InterruptedException ie) {
                 }
+              }
             }
             closed = true;
+          } finally {
+            synchronized (pendingCreates) {
+              pendingCreates.remove(src.toString());
+            }
+          }
         }
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=395000&r1=394999&r2=395000&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Apr 18 11:58:49
2006
@@ -280,6 +280,7 @@
 
         // Reserve space for this pending file
         pendingCreates.put(src, new FileUnderConstruction( replication ));
+        LOG.fine("Adding " + src + " to pendingCreates for " + holder);
         synchronized (leases) {
             Lease lease = (Lease) leases.get(holder);
             if (lease == null) {
@@ -314,7 +315,8 @@
      */
     public synchronized Object[] getAdditionalBlock(UTF8 src, UTF8 clientMachine) {
         Object results[] = null;
-        FileUnderConstruction pendingFile = (FileUnderConstruction) pendingCreates.get(src);
+        FileUnderConstruction pendingFile = 
+          (FileUnderConstruction) pendingCreates.get(src);
         if (dir.getFile(src) == null && pendingFile != null) {
             results = new Object[2];
 
@@ -360,8 +362,11 @@
     /**
      * Abandon the entire file in progress
      */
-    public synchronized void abandonFileInProgress(UTF8 src) throws IOException {
-        internalReleaseCreate(src);
+    public synchronized void abandonFileInProgress(UTF8 src, 
+                                                   UTF8 holder
+                                                   ) throws IOException {
+      LOG.info("abandoning file in progress on " + src.toString());
+      internalReleaseCreate(src, holder);
     }
 
     /**
@@ -416,6 +421,8 @@
 
         // The file is no longer pending
         pendingCreates.remove(src);
+        LOG.fine("Removing " + src + " from pendingCreates for " + holder +
+                 ". (complete)");
         for (int i = 0; i < nrBlocks; i++) {
             pendingCreateBlocks.remove(pendingBlocks[i]);
         }
@@ -462,7 +469,8 @@
      */
     synchronized Block allocateBlock(UTF8 src) {
         Block b = new Block();
-        FileUnderConstruction v = (FileUnderConstruction) pendingCreates.get(src);
+        FileUnderConstruction v = 
+          (FileUnderConstruction) pendingCreates.get(src);
         v.add(b);
         pendingCreateBlocks.add(b);
         return b;
@@ -473,7 +481,8 @@
      * replicated.  If not, return false.
      */
     synchronized boolean checkFileProgress(UTF8 src) {
-        FileUnderConstruction v = (FileUnderConstruction) pendingCreates.get(src);
+        FileUnderConstruction v = 
+          (FileUnderConstruction) pendingCreates.get(src);
 
         for (Iterator it = v.iterator(); it.hasNext(); ) {
             Block b = (Block) it.next();
@@ -652,8 +661,8 @@
         public void startedCreate(UTF8 src) {
             creates.add(src);
         }
-        public void completedCreate(UTF8 src) {
-            creates.remove(src);
+        public boolean completedCreate(UTF8 src) {
+            return creates.remove(src);
         }
         public boolean hasLocks() {
             return (locks.size() + creates.size()) > 0;
@@ -666,7 +675,7 @@
             locks.clear();
             for (Iterator it = creates.iterator(); it.hasNext(); ) {
                 UTF8 src = (UTF8) it.next();
-                internalReleaseCreate(src);
+                internalReleaseCreate(src, holder);
             }
             creates.clear();
         }
@@ -674,7 +683,8 @@
         /**
          */
         public String toString() {
-            return "[Lease.  Holder: " + holder.toString() + ", heldlocks: " + locks.size()
+ ", pendingcreates: " + creates.size() + "]";
+            return "[Lease.  Holder: " + holder.toString() + ", heldlocks: " +
+                   locks.size() + ", pendingcreates: " + creates.size() + "]";
         }
 
         /**
@@ -771,12 +781,41 @@
     private int internalReleaseLock(UTF8 src, UTF8 holder) {
         return dir.releaseLock(src, holder);
     }
-    private void internalReleaseCreate(UTF8 src) {
-        FileUnderConstruction v = (FileUnderConstruction) pendingCreates.remove(src);
-        for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
-            Block b = (Block) it2.next();
-            pendingCreateBlocks.remove(b);
-        }
+
+    /**
+     * Release a pending file creation lock.
+     * @param src The filename
+     * @param holder The datanode that was creating the file
+     */
+    private void internalReleaseCreate(UTF8 src, UTF8 holder) {
+      // find the lease
+      Lease lease = (Lease) leases.get(holder);
+      if (lease != null) {
+        // remove the file from the lease
+        if (lease.completedCreate(src)) {
+          // if we found the file in the lease, remove it from pendingCreates
+          FileUnderConstruction v = 
+            (FileUnderConstruction) pendingCreates.remove(src);
+          if (v != null) {
+            LOG.info("Removing " + src + " from pendingCreates for " + 
+                     holder + " (failure)");
+            for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
+              Block b = (Block) it2.next();
+              pendingCreateBlocks.remove(b);
+            }
+          } else {
+            LOG.info("Attempt to release a create lock on " + src.toString() +
+                     " that was not in pendingCreates");
+          }
+        } else {
+          LOG.info("Attempt by " + holder.toString() + 
+                   " to release someone else's create lock on " + 
+                   src.toString());
+        }
+      } else {
+        LOG.info("Attempt to release a lock from an unknown lease holder "
+                 + holder.toString() + " for " + src.toString());
+      }
     }
 
     /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=395000&r1=394999&r2=395000&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Apr 18 11:58:49 2006
@@ -208,8 +208,9 @@
     }
     /**
      */
-    public void abandonFileInProgress(String src) throws IOException {
-        namesystem.abandonFileInProgress(new UTF8(src));
+    public void abandonFileInProgress(String src, 
+                                      String holder) throws IOException {
+        namesystem.abandonFileInProgress(new UTF8(src), new UTF8(holder));
     }
     /**
      */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=395000&r1=394999&r2=395000&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Apr 18 11:58:49
2006
@@ -196,10 +196,6 @@
     /**
      */
     public synchronized void close() throws IOException {
-        if (fs != null) {
-            fs.close();
-            fs = null;
-        }
     }
 
     /**



Mime
View raw message