hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r1028581 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
Date Fri, 29 Oct 2010 01:29:29 GMT
Author: schen
Date: Fri Oct 29 01:29:29 2010
New Revision: 1028581

URL: http://svn.apache.org/viewvc?rev=1028581&view=rev
Log:
MAPREDUCE-2099.  RaidNode recreates outdated parity HARs. (Ramkumar Vadali via
schen)

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1028581&r1=1028580&r2=1028581&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Oct 29 01:29:29 2010
@@ -359,6 +359,9 @@ Trunk (unreleased changes)
     MAPREDUCE-2150.  RaidNode periodically fixes corrupt blocks. (Ramkumar Vadali via
     schen)
 
+    MAPREDUCE-2099.  RaidNode recreates outdated parity HARs. (Ramkumar Vadali
+    via schen)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1028581&r1=1028580&r2=1028581&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Fri Oct 29 01:29:29 2010
@@ -105,6 +105,7 @@ public class RaidNode implements RaidPro
   Daemon triggerThread = null;
 
   /** Deamon thread to delete obsolete parity files */
+  PurgeMonitor purgeMonitor = null;
   Daemon purgeThread = null;
   
   /** Deamon thread to har raid directories */
@@ -289,7 +290,8 @@ public class RaidNode implements RaidPro
     this.triggerThread.start();
 
     // start the thread that deletes obsolete parity files
-    this.purgeThread = new Daemon(new PurgeMonitor());
+    this.purgeMonitor = new PurgeMonitor();
+    this.purgeThread = new Daemon(purgeMonitor);
     this.purgeThread.start();
 
     // start the thread that creates HAR files
@@ -899,12 +901,22 @@ public class RaidNode implements RaidPro
       LOG.debug("Checking " + destPath + " prefix " + destPrefix);
 
       // Verify if it is a har file
-      if (destStr.endsWith(HAR_SUFFIX)) {
-        String destParentStr = destPath.getParent().toUri().getPath();
-        String src = destParentStr.replaceFirst(destPrefix, "");
-        Path srcPath = new Path(src);
-        if (!srcFs.exists(srcPath)) {
-          destFs.delete(destPath, true);
+      if (dest.isDirectory() && destStr.endsWith(HAR_SUFFIX)) {
+        try {
+          int harUsedPercent =
+            usefulHar(srcFs, destFs, destPath, destPrefix, conf);
+          LOG.info("Useful percentage of " + destStr + " " + harUsedPercent);
+          // Delete the har if its usefulness reaches a threshold.
+          if (harUsedPercent <= conf.getInt("raid.har.usage.threshold", 0)) {
+            LOG.info("Purging " + destStr + " at usage " + harUsedPercent);
+            boolean done = destFs.delete(destPath, true);
+            if (!done) {
+              LOG.error("Could not purge " + destPath);
+            }
+          }
+        } catch (IOException e) {
+          LOG.warn("Error during purging " + destStr + " " +
+              StringUtils.stringifyException(e));
         }
         return;
       }
@@ -919,14 +931,9 @@ public class RaidNode implements RaidPro
       if (dest.isDirectory()) {
         FileStatus[] files = null;
         files = destFs.listStatus(destPath);
-        if (files != null) {
-          for (FileStatus one:files) {
-            recursePurge(srcFs, destFs, destPrefix, one);
-          }
-        }
-        files = destFs.listStatus(destPath);
         if (files == null || files.length == 0){
-          boolean done = destFs.delete(destPath,false);
+          boolean done = destFs.delete(destPath,true); // ideal is false, but
+                  // DFSClient only deletes directories if it is recursive
           if (done) {
             LOG.info("Purged directory " + destPath );
           }
@@ -934,6 +941,13 @@ public class RaidNode implements RaidPro
             LOG.info("Unable to purge directory " + destPath);
           }
         }
+        if (files != null) {
+          for (FileStatus one:files) {
+            recursePurge(srcFs, destFs, destPrefix, one);
+          }
+        }
+        // If the directory is empty now, it will be purged the next time this
+        // thread runs.
         return; // the code below does the file checking
       }
       
@@ -955,7 +969,53 @@ public class RaidNode implements RaidPro
     } 
   }
 
-  
+  //
+  // Returns the number of up-to-date files in the har as a percentage of the
+  // total number of files in the har.
+  //
+  protected static int usefulHar(
+    FileSystem srcFs, FileSystem destFs,
+    Path harPath, String destPrefix, Configuration conf) throws IOException {
+
+    FileSystem fsHar = new HarFileSystem(destFs);
+    String harURIPath = harPath.toUri().getPath();
+    Path qualifiedPath = new Path("har://", harURIPath +
+      Path.SEPARATOR + harPath.getParent().toUri().getPath());
+    fsHar.initialize(qualifiedPath.toUri(), conf);
+    FileStatus[] filesInHar = fsHar.listStatus(qualifiedPath);
+    if (filesInHar.length == 0) {
+      return 0;
+    }
+    int numUseless = 0;
+    for (FileStatus one: filesInHar) {
+      Path parityPath = one.getPath();
+      String parityStr = parityPath.toUri().getPath();
+      if (parityStr.startsWith("har:/")) {
+        LOG.error("Unexpected prefix har:/ for " + parityStr);
+        continue;
+      }
+      String prefixToReplace = harURIPath + destPrefix;
+      if (!parityStr.startsWith(prefixToReplace)) {
+        continue;
+      }
+      String src = parityStr.substring(prefixToReplace.length());
+      try {
+        FileStatus srcStatus = srcFs.getFileStatus(new Path(src));
+        if (srcStatus == null) {
+          numUseless++;
+        } else if (one.getModificationTime() !=
+                  srcStatus.getModificationTime()) {
+          numUseless++;
+        }
+      } catch (FileNotFoundException e) {
+        LOG.info("File not found: " + e);
+        numUseless++;
+      }
+    }
+    int uselessPercent = numUseless * 100 / filesInHar.length;
+    return 100 - uselessPercent;
+  }
+
   private void doHar() throws IOException, InterruptedException {
     
     PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java?rev=1028581&r1=1028580&r2=1028581&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
Fri Oct 29 01:29:29 2010
@@ -47,6 +47,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hdfs.TestRaidDfs;
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.raid.protocol.PolicyList;
 
@@ -107,7 +108,14 @@ public class TestRaidPurge extends TestC
    * create raid.xml file for RaidNode
    */
   private void mySetup(String srcPath, long targetReplication,
-                long metaReplication, long stripeLength) throws Exception {
+                long metaReplication, long stripeLength)
+    throws Exception {
+    mySetup(srcPath, targetReplication, metaReplication, stripeLength, 1);
+  }
+
+  private void mySetup(String srcPath, long targetReplication,
+                long metaReplication, long stripeLength, int harDelay)
+    throws Exception {
     FileWriter fileWriter = new FileWriter(CONFIG_FILE);
     fileWriter.write("<?xml version=\"1.0\"?>\n");
     String str = "<configuration> " +
@@ -139,6 +147,12 @@ public class TestRaidPurge extends TestC
                                          "a candidate for RAIDing " +
                           "</description> " + 
                         "</property> " +
+                        "<property> " +
+                          "<name>time_before_har</name> " +
+                          "<value> " + harDelay + "</value> " +
+                          "<description> time before har'ing parity files" +
+                          "</description> " +
+                        "</property> " +
                      "</policy>" +
                    "</srcPath>" +
                  "</configuration>";
@@ -260,4 +274,74 @@ public class TestRaidPurge extends TestC
     LOG.info("doTestPurge completed:" + " blockSize=" + blockSize +
              " stripeLength=" + stripeLength);
   }
+
+  /**
+   * Create a file, wait for parity file to get HARed. Then modify the file,
+   * wait for the HAR to get purged.
+   */
+  public void testPurgeHar() throws Exception {
+    LOG.info("testPurgeHar started");
+    int harDelay = 0;
+    createClusters(true);
+    mySetup("/user/dhruba/raidtest", 1, 1, 5, harDelay);
+    Path dir = new Path("/user/dhruba/raidtest/");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    Path file1 = new Path(dir + "/file");
+    RaidNode cnode = null;
+    try {
+      TestRaidNode.createOldFile(fileSys, file1, 1, 8, 8192L);
+      LOG.info("testPurgeHar created test files");
+
+      // create an instance of the RaidNode
+      Configuration localConf = new Configuration(conf);
+      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+      cnode = RaidNode.createRaidNode(null, localConf);
+
+      // Wait till har is created.
+      while (true) {
+        try {
+          FileStatus[] listPaths = listPaths = fileSys.listStatus(destPath);
+          if (listPaths != null && listPaths.length == 1) {
+            FileStatus s = listPaths[0];
+            LOG.info("testPurgeHar found path " + s.getPath());
+            if (s.getPath().toString().endsWith(".har")) {
+              break;
+            }
+          }
+        } catch (FileNotFoundException e) {
+          //ignore
+        }
+        Thread.sleep(1000);                  // keep waiting
+      }
+
+      // Set an old timestamp.
+      fileSys.setTimes(file1, 0, 0);
+
+      boolean found = false;
+      FileStatus[] listPaths = null;
+      while (!found || listPaths == null || listPaths.length > 1) {
+        listPaths = fileSys.listStatus(destPath);
+        if (listPaths != null) {
+          for (FileStatus s: listPaths) {
+            LOG.info("testPurgeHar waiting for parity file to be recreated" +
+              " and har to be deleted found " + s.getPath());
+            if (s.getPath().toString().endsWith("file") &&
+                s.getModificationTime() == 0) {
+              found = true;
+            }
+          }
+        }
+        Thread.sleep(1000);
+      }
+    } catch (Exception e) {
+      LOG.info("testPurgeHar Exception " + e +
+          StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      if (cnode != null) { cnode.stop(); cnode.join(); }
+      fileSys.delete(dir, true);
+      fileSys.delete(destPath, true);
+      stopClusters();
+    }
+  }
 }



Mime
View raw message