hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r761439 - in /hadoop/core/trunk: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
Date Thu, 02 Apr 2009 21:53:51 GMT
Author: shv
Date: Thu Apr  2 21:53:49 2009
New Revision: 761439

URL: http://svn.apache.org/viewvc?rev=761439&view=rev
Log:
HADOOP-4045. Fix processing of IO errors in EditsLog. Contributed by Boris Shkolnik.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=761439&r1=761438&r2=761439&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr  2 21:53:49 2009
@@ -356,6 +356,9 @@
     HADOOP-2413. Remove the static variable FSNamesystem.fsNamesystemObject.
     (Konstantin Shvachko via szetszwo)
 
+    HADOOP-4045. Fix processing of IO errors in EditsLog.
+    (Boris Shkolnik via shv)
+
 Release 0.20.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=761439&r1=761438&r2=761439&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Apr
 2 21:53:49 2009
@@ -26,12 +26,14 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.lang.Math;
-import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -44,9 +46,14 @@
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.mortbay.log.Log;
 
 /**
  * FSEditLog maintains a log of the namespace modifications.
@@ -360,20 +367,30 @@
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
     if (editStreams == null)
       editStreams = new ArrayList<EditLogOutputStream>();
+    
+    ArrayList<StorageDirectory> al = null;
     for (Iterator<StorageDirectory> it = 
            fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
       StorageDirectory sd = it.next();
       File eFile = getEditFile(sd);
       try {
-        EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
-        editStreams.add(eStream);
+        addNewEditLogStream(eFile);
       } catch (IOException e) {
         FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
         // Remove the directory from list of storage directories
-        fsimage.removedStorageDirs.add(sd);
-        it.remove();
+        if(al == null) al = new ArrayList<StorageDirectory>(1);
+        al.add(sd);
+        
       }
     }
+    
+    if(al != null) fsimage.processIOError(al, false);
+  }
+  
+  
+  public synchronized void addNewEditLogStream(File eFile) throws IOException {
+    EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
+    editStreams.add(eStream);
   }
 
   public synchronized void createEditLogFile(File name) throws IOException {
@@ -424,7 +441,7 @@
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams);
+    processIOError(errorStreams, true);
     editStreams.clear();
   }
 
@@ -441,80 +458,101 @@
   }
 
   /**
-   * If there is an IO Error on any log operations, remove that
-   * directory from the list of directories.
-   * If no more directories remain, then exit.
-   */
-  synchronized void processIOError(int index) {
+   * The specified streams have IO errors. Close and remove them.
+   * If propagate is true - close related StorageDirectories.
+   * (is called with propagate value true from everywhere
+   *  except fsimage.processIOError)
+   */
+  synchronized void processIOError(
+      ArrayList<EditLogOutputStream> errorStreams,
+      boolean propagate) {
+    
+    String lsd = fsimage.listStorageDirectories();
+    FSNamesystem.LOG.info("current list of storage dirs:" + lsd);
+    
+    if (errorStreams == null || errorStreams.size() == 0) {
+      return;                       // nothing to do
+    }
+
+    //EditLogOutputStream
     if (editStreams == null || editStreams.size() <= 1) {
       FSNamesystem.LOG.fatal(
       "Fatal Error : All storage directories are inaccessible."); 
       Runtime.getRuntime().exit(-1);
     }
-    assert(index < getNumEditStreams());
 
-    EditLogOutputStream eStream = editStreams.get(index);
-    removeStream(index);
+    ArrayList<StorageDirectory> al = null;
+    for (EditLogOutputStream eStream : errorStreams) {
+      FSNamesystem.LOG.error("Unable to log edits to " + eStream.getName()
+          + "; removing it");     
+
+      StorageDirectory storageDir;
+      if(propagate && eStream.getType() == JournalType.FILE && //find SD
+          (storageDir = getStorage(eStream)) != null) {
+        FSNamesystem.LOG.info("about to remove corresponding storage:" 
+            + storageDir.getRoot().getAbsolutePath());
+        // remove corresponding storage dir
+        if(al == null) al = new ArrayList<StorageDirectory>(1);
+        al.add(storageDir);
+      }
+      Iterator<EditLogOutputStream> ies = editStreams.iterator();
+      while (ies.hasNext()) {
+        EditLogOutputStream es = ies.next();
+        if (es == eStream) {  
+          try { eStream.close(); } catch (IOException e) {
+            // nothing to do.
+            FSNamesystem.LOG.warn("Failed to close eStream " + eStream.getName()
+                + " before removing it (might be ok)");
+          }
+          ies.remove();
+          break;
+        }
+      } 
+    }
+    
+    // removed failed SDs
+    if(propagate && al != null) fsimage.processIOError(al, false);
+    
+    //for the rest of the streams
+    if(propagate) incrementCheckpointTime();
+    
+    lsd = fsimage.listStorageDirectories();
+    FSNamesystem.LOG.info("at the end current list of storage dirs:" + lsd);
+  }
 
-    if(!(eStream instanceof EditLogFileOutputStream))
-      return; // non file streams don't have associated storage directories
 
-    EditLogFileOutputStream eFStream = (EditLogFileOutputStream)eStream;
-    File parentStorageDir = eFStream.getFile().getParentFile().getParentFile();
-    //
-    // Invoke the ioerror routine of the fsimage
-    //
-    fsimage.processIOError(parentStorageDir);
-  }
-  
   /**
-   * If there is an IO Error on any log operations on storage directory,
-   * remove any stream associated with that directory 
+   * get an editStream corresponding to a sd
+   * @param es - stream to remove
+   * @return the matching stream
    */
-  synchronized void processIOError(StorageDirectory sd) {
-    // Try to remove stream only if one should exist
-    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
-      return;
-    if (editStreams == null || editStreams.size() <= 1) {
-      FSNamesystem.LOG.fatal(
-          "Fatal Error : All storage directories are inaccessible."); 
-      Runtime.getRuntime().exit(-1);
-    }
-    for (int idx = 0; idx < editStreams.size(); idx++) {
-      File parentStorageDir = ((EditLogFileOutputStream)editStreams
-                                       .get(idx)).getFile()
-                                       .getParentFile().getParentFile();
-      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
-        removeStream(idx);
+  public StorageDirectory getStorage(EditLogOutputStream es) {
+    String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
+    .getParentFile().getParentFile().getAbsolutePath();
+
+    Iterator<StorageDirectory> it = fsimage.dirIterator(); 
+    while (it.hasNext()) {
+      StorageDirectory sd = it.next();
+      FSNamesystem.LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath());

+      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
+        return sd;
     }
+    return null;
   }
   
   /**
-   * The specified streams have IO errors. Remove them from logging
-   * new transactions.
-   */
-  synchronized void processIOError(ArrayList<EditLogOutputStream> errorStreams) {
-    if (errorStreams == null) {
-      return;                       // nothing to do
-    }
-    for (int idx = 0; idx < errorStreams.size(); idx++) {
-      EditLogOutputStream eStream = errorStreams.get(idx);
-      int j = 0;
-      int numEditStreams = editStreams.size();
-      for (j = 0; j < numEditStreams; j++) {
-        if (editStreams.get(j) == eStream) {
-          FSNamesystem.LOG.error("Unable to log edits to " + eStream.getName());
-          break;
-        }
-      }
-      if (j == numEditStreams) {
-          FSNamesystem.LOG.error("Unable to find sync log on which " +
-                                 " IO error occured.");
-          continue;
-      }
-      processIOError(j);
-    }
-    incrementCheckpointTime();
+   * get an editStream corresponding to a sd
+   * @param sd
+   * @return the matching stream
+   */
+  public EditLogOutputStream getEditsStream(StorageDirectory sd) {
+	for (EditLogOutputStream es : editStreams) {
+	  File parentStorageDir = ((EditLogFileOutputStream)es).getFile()
+	  .getParentFile().getParentFile();
+	  if (parentStorageDir.getName().equals(sd.getRoot().getName()))
+		return es;
+	}
+	return null;
   }
 
   /**
@@ -905,6 +943,7 @@
     ArrayList<EditLogOutputStream> errorStreams = null;
     long start = FSNamesystem.now();
     for(EditLogOutputStream eStream : editStreams) {
+      Log.debug("loggin edits into " + eStream.getName()  + " stream");
       if(!eStream.isOperationSupported(op))
         continue;
       try {
@@ -916,7 +955,7 @@
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams);
+    processIOError(errorStreams, true);
     recordTransaction(start);
   }
 
@@ -1001,7 +1040,7 @@
     long elapsed = FSNamesystem.now() - start;
 
     synchronized (this) {
-       processIOError(errorStreams);
+       processIOError(errorStreams, true);
        synctxid = syncStart;
        isSyncRunning = false;
        this.notifyAll();
@@ -1178,6 +1217,7 @@
     assert getNumEditsDirs() <= getNumEditStreams() : 
         "Number of edits directories should not exceed the number of streams.";
     long size = 0;
+    ArrayList<EditLogOutputStream> al = null;
     for (int idx = 0; idx < getNumEditStreams(); idx++) {
       EditLogOutputStream es = editStreams.get(idx);
       try {
@@ -1188,11 +1228,21 @@
       } catch (IOException e) {
         FSImage.LOG.warn("getEditLogSize: editstream.length failed. removing editlog (" +
             idx + ") " + es.getName());
-        processIOError(idx);
+        if(al==null) al = new ArrayList<EditLogOutputStream>(1);
+        al.add(es);
       }
     }
+    if(al!=null) processIOError(al, true);
     return size;
   }
+  
+  public String listEditsStreams() {
+    StringBuffer buf = new StringBuffer();
+    for (EditLogOutputStream os : editStreams) {
+      buf.append(os.getName()  + ";");
+    }
+    return buf.toString();
+  }
 
   /**
    * Closes the current edit log and opens edits.new. 
@@ -1256,7 +1306,7 @@
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams);
+    processIOError(errorStreams, true);
   }
 
   /**
@@ -1316,18 +1366,21 @@
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams);
+    processIOError(errorStreams, true);
   }
 
   /**
    * Return the name of the edit file
    */
   synchronized File getFsEditName() {
-    StorageDirectory sd = null;
+    StorageDirectory sd = null;   
     for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();)
-      sd = it.next();
-    return getEditFile(sd);
+      fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      sd = it.next();   
+      if(sd.getRoot().canRead())
+        return getEditFile(sd);
+    }
+    return null;
   }
 
   /**
@@ -1463,7 +1516,7 @@
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams);
+    processIOError(errorStreams, true);
     recordTransaction(start);
   }
 
@@ -1552,7 +1605,7 @@
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    processIOError(errorStreams);
+    processIOError(errorStreams, true);
   }
 
   synchronized boolean checkBackupRegistration(
@@ -1579,7 +1632,7 @@
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    processIOError(errorStreams);
+    processIOError(errorStreams, true);
     return regAllowed;
   }
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=761439&r1=761438&r2=761439&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Thu Apr
 2 21:53:49 2009
@@ -718,34 +718,48 @@
       } catch(IOException e) {
         // Close any edits stream associated with this dir and remove directory
         LOG.warn("incrementCheckpointTime failed on " + sd.getRoot().getPath() + ";type="+sd.getStorageDirType());
-        if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
-          editLog.processIOError(sd);
-
-        //add storage to the removed list
-        removedStorageDirs.add(sd);
-        it.remove();
       }
     }
   }
-  
+
   /**
-   * Remove storage directory given directory
-   */
-  
-  void processIOError(File dirName) {
-    for (Iterator<StorageDirectory> it = 
-      dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      if (sd.getRoot().getPath().equals(dirName.getPath())) {
-        //add storage to the removed list
-        LOG.warn("FSImage:processIOError: removing storage: " + dirName.getPath());
-        try {
-          sd.unlock(); //try to unlock before removing (in case it is restored)
-        } catch (Exception e) {}
-        removedStorageDirs.add(sd);
-        it.remove();
+   * @param sds - array of SDs to process
+   * @param propagate - flag, if set - then call corresponding EditLog stream's 
+   * processIOError function.
+   */
+  void processIOError(ArrayList<StorageDirectory> sds, boolean propagate) {
+    ArrayList<EditLogOutputStream> al = null;
+    for(StorageDirectory sd:sds) {
+      // if has a stream assosiated with it - remove it too..
+      if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+        EditLogOutputStream eStream = editLog.getEditsStream(sd);
+        if(al == null) al = new ArrayList<EditLogOutputStream>(1);
+        al.add(eStream);
+      }
+      
+      for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+        StorageDirectory sd1 = it.next();
+        if (sd.equals(sd1)) {
+          //add storage to the removed list
+          LOG.warn("FSImage:processIOError: removing storage: "
+              + sd.getRoot().getPath());
+          try {
+            sd1.unlock(); //unlock before removing (in case it will be restored)
+          } catch (Exception e) {
+            // nothing
+          }
+          removedStorageDirs.add(sd1);
+          it.remove();
+          break;
+        }
       }
     }
+    // if there are some edit log streams to remove		
+    if(propagate && al != null) 
+      editLog.processIOError(al, false);
+    
+    //if called from edits log, the it will call increment from there
+    if(propagate) incrementCheckpointTime(); 
   }
 
   public FSEditLog getEditLog() {
@@ -1421,8 +1435,9 @@
    * Renames new image
    */
   void renameCheckpoint() {
+    ArrayList<StorageDirectory> al = null;
     for (Iterator<StorageDirectory> it = 
-                       dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       StorageDirectory sd = it.next();
       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
       File curFile = getImageFile(sd, NameNodeFile.IMAGE);
@@ -1433,17 +1448,13 @@
         if (!curFile.delete() || !ckpt.renameTo(curFile)) {
           LOG.warn("renaming  " + ckpt.getAbsolutePath() + " to "  + 
               curFile.getAbsolutePath() + " FAILED");
-          
-          // Close edit stream, if this directory is also used for edits
-          if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
-            editLog.processIOError(sd);
-          
-          // add storage to the removed list
-          removedStorageDirs.add(sd);
-          it.remove();
+
+          if(al == null) al = new ArrayList<StorageDirectory> (1);
+          al.add(sd);
         }
       }
     }
+    if(al != null) processIOError(al, true);
   }
 
   /**
@@ -1453,6 +1464,8 @@
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     if(renewCheckpointTime)
       this.checkpointTime = FSNamesystem.now();
+    
+    ArrayList<StorageDirectory> al = null;
     for (Iterator<StorageDirectory> it = 
                            dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
@@ -1474,14 +1487,12 @@
         sd.write();
       } catch (IOException e) {
         LOG.error("Cannot write file " + sd.getRoot(), e);
-        // Close edit stream, if this directory is also used for edits
-        if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
-          editLog.processIOError(sd);
-      //add storage to the removed list
-        removedStorageDirs.add(sd);
-        it.remove();
+        
+        if(al == null) al = new ArrayList<StorageDirectory> (1);
+        al.add(sd);       
       }
     }
+    if(al != null) processIOError(al, true);
     ckptState = FSImage.CheckpointStates.START;
   }
 
@@ -1625,18 +1636,21 @@
    * Return the name of the image file.
    */
   File getFsImageName() {
-  StorageDirectory sd = null;
-  for (Iterator<StorageDirectory> it = 
-              dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
-    sd = it.next();
-  return getImageFile(sd, NameNodeFile.IMAGE); 
+    StorageDirectory sd = null;
+    for (Iterator<StorageDirectory> it = 
+      dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      sd = it.next();
+      if(sd.getRoot().canRead())
+        return getImageFile(sd, NameNodeFile.IMAGE); 
+    }
+    return null;
   }
 
   /**
    * See if any of removed storages iw "writable" again, and can be returned 
    * into service
    */
-  void attemptRestoreRemovedStorage() {   
+  synchronized void attemptRestoreRemovedStorage() {   
     // if directory is "alive" - copy the images there...
     if(!restoreFailedStorage || removedStorageDirs.size() == 0) 
       return; //nothing to restore
@@ -1653,6 +1667,10 @@
         if(root.exists() && root.canWrite()) { 
           format(sd);
           LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
+          if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+            File eFile = getEditFile(sd);
+            editLog.addNewEditLogStream(eFile);
+          }
           this.addStorageDir(sd); // restore
           it.remove();
         }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java?rev=761439&r1=761438&r2=761439&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
Thu Apr  2 21:53:49 2009
@@ -18,8 +18,14 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 
+import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Random;
 
@@ -38,7 +44,6 @@
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
-import org.apache.hadoop.util.StringUtils;
 
 
 /**
@@ -80,14 +85,14 @@
       throw new IOException("Could not delete hdfs directory '" + hdfsDir + "'");
     }
     
-    hdfsDir.mkdir();
+    hdfsDir.mkdirs();
     path1 = new File(hdfsDir, "name1");
     path2 = new File(hdfsDir, "name2");
     path3 = new File(hdfsDir, "name3");
     
     path1.mkdir(); path2.mkdir(); path3.mkdir();
     if(!path2.exists() ||  !path3.exists() || !path1.exists()) {
-      throw new IOException("Couldn't create dfs.name dirs");
+      throw new IOException("Couldn't create dfs.name dirs in " + hdfsDir.getAbsolutePath());
     }
     
     String dfs_name_dir = new String(path1.getPath() + "," + path2.getPath());
@@ -117,11 +122,20 @@
   }
   
   /**
-   * invalidate storage by removing current directories
+   * invalidate storage by removing storage directories
    */
   public void invalidateStorage(FSImage fi) throws IOException {
-    fi.getEditLog().processIOError(2); //name3
-    fi.getEditLog().processIOError(1); // name2
+    ArrayList<StorageDirectory> al = new ArrayList<StorageDirectory>(2);
+    Iterator<StorageDirectory> it = fi.dirIterator();
+    while(it.hasNext()) {
+      StorageDirectory sd = it.next();
+      if(sd.getRoot().getAbsolutePath().equals(path2.getAbsolutePath()) ||
+          sd.getRoot().getAbsolutePath().equals(path3.getAbsolutePath())) {
+        al.add(sd);
+      }
+    }
+    // simulate an error
+    fi.processIOError(al, true);
   }
   
   /**
@@ -143,10 +157,57 @@
     }
   }
   
+  
+  /**
+   * This function returns a md5 hash of a file.
+   * 
+   * @param FileToMd5
+   * @return The md5 string
+   */
+  public String getFileMD5(File file) throws Exception {
+    String res = new String();
+    MessageDigest mD = MessageDigest.getInstance("MD5");
+    DataInputStream dis = new DataInputStream(new FileInputStream(file));
+
+    try {
+      while(true) {
+        mD.update(dis.readByte());
+      }
+    } catch (EOFException eof) {}
+
+    BigInteger bigInt = new BigInteger(1, mD.digest());
+    res = bigInt.toString(16);
+    dis.close();
+
+    return res;
+  }
+
+  
+  /**
+   * read currentCheckpointTime directly from the file
+   * @param currDir
+   * @return
+   * @throws IOException
+   */
+  long readCheckpointTime(File currDir) throws IOException {
+    File timeFile = new File(currDir, NameNodeFile.TIME.getName()); 
+    long timeStamp = 0L;
+    if (timeFile.exists() && timeFile.canRead()) {
+      DataInputStream in = new DataInputStream(new FileInputStream(timeFile));
+      try {
+        timeStamp = in.readLong();
+      } finally {
+        in.close();
+      }
+    }
+    return timeStamp;
+  }
+  
   /**
    *  check if files exist/not exist
+   * @throws IOException 
    */
-  public void checkFiles(boolean valid) {
+  public void checkFiles(boolean valid) throws IOException {
     //look at the valid storage
     File fsImg1 = new File(path1, Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.IMAGE.getName());
     File fsImg2 = new File(path2, Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.IMAGE.getName());
@@ -155,13 +216,29 @@
     File fsEdits1 = new File(path1, Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS.getName());
     File fsEdits2 = new File(path2, Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS.getName());
     File fsEdits3 = new File(path3, Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS.getName());
-
+    
+    long chkPt1 = readCheckpointTime(new File(path1, Storage.STORAGE_DIR_CURRENT));
+    long chkPt2 = readCheckpointTime(new File(path2, Storage.STORAGE_DIR_CURRENT));
+    long chkPt3 = readCheckpointTime(new File(path3, Storage.STORAGE_DIR_CURRENT));
+    
+    String md5_1 = null,md5_2 = null,md5_3 = null;
+    try {
+      md5_1 = getFileMD5(fsEdits1);
+      md5_2 = getFileMD5(fsEdits2);
+      md5_3 = getFileMD5(fsEdits3);
+    } catch (Exception e) {
+      System.err.println("md 5 calculation failed:" + e.getLocalizedMessage());
+    }
     this.printStorages(cluster.getNameNode().getFSImage());
     
     LOG.info("++++ image files = "+fsImg1.getAbsolutePath() + "," + fsImg2.getAbsolutePath()
+ ","+ fsImg3.getAbsolutePath());
     LOG.info("++++ edits files = "+fsEdits1.getAbsolutePath() + "," + fsEdits2.getAbsolutePath()
+ ","+ fsEdits3.getAbsolutePath());
     LOG.info("checkFiles compares lengths: img1=" + fsImg1.length()  + ",img2=" + fsImg2.length()
 + ",img3=" + fsImg3.length());
     LOG.info("checkFiles compares lengths: edits1=" + fsEdits1.length()  + ",edits2=" + fsEdits2.length()
 + ",edits3=" + fsEdits3.length());
+    LOG.info("checkFiles compares chkPts: name1=" + chkPt1  + ",name2=" + chkPt2  + ",name3="
+ chkPt3);
+    LOG.info("checkFiles compares md5s: " + fsEdits1.getAbsolutePath() + 
+        "="+ md5_1  + "," + fsEdits2.getAbsolutePath() + "=" + md5_2  + "," +
+        fsEdits3.getAbsolutePath() + "=" + md5_3);  
     
     if(valid) {
       // should be the same
@@ -169,12 +246,26 @@
       assertTrue(0 == fsImg3.length()); //shouldn't be created
       assertTrue(fsEdits1.length() == fsEdits2.length());
       assertTrue(fsEdits1.length() == fsEdits3.length());
+      assertTrue(md5_1.equals(md5_2));
+      assertTrue(md5_1.equals(md5_3));
+      
+      // checkpoint times
+      assertTrue(chkPt1 == chkPt2);
+      assertTrue(chkPt1 == chkPt3);
     } else {
       // should be different
       //assertTrue(fsImg1.length() != fsImg2.length());
       //assertTrue(fsImg1.length() != fsImg3.length());
       assertTrue(fsEdits1.length() != fsEdits2.length());
       assertTrue(fsEdits1.length() != fsEdits3.length());
+      
+      assertTrue(!md5_1.equals(md5_2));
+      assertTrue(!md5_1.equals(md5_3));
+      
+      
+   // checkpoint times
+      assertTrue(chkPt1 > chkPt2);
+      assertTrue(chkPt1 > chkPt3);
     }
   }
   
@@ -222,6 +313,13 @@
     
     checkFiles(true);
     System.out.println("****testStorageRestore: second Checkpoint done and checkFiles(true)
run");
+    
+    // verify that all the logs are active
+    path = new Path("/", "test2");
+    writeFile(fs, path, 2);
+    System.out.println("****testStorageRestore: wrote a file and checkFiles(true) run");
+    checkFiles(true);
+    
     secondary.shutdown();
     cluster.shutdown();
   }



Mime
View raw message