hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r938282 - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/raid/
Date Tue, 27 Apr 2010 01:09:45 GMT
Author: dhruba
Date: Tue Apr 27 01:09:44 2010
New Revision: 938282

URL: http://svn.apache.org/viewvc?rev=938282&view=rev
Log:
MAPREDUCE-1705. Archiving and Purging of HDFS parity files should 
handle globbed policies accurately. (Rodrigo Schmidt via dhruba)


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=938282&r1=938281&r2=938282&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Apr 27 01:09:44 2010
@@ -590,6 +590,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1604. Add Forrest documentation for Job ACLs.
     (Amareshwari Sriramadasu via yhemanth)
 
+    MAPREDUCE-1705. Archiving and Purging of HDFS parity files should 
+    handle globbed policies accurately. (Rodrigo Schmidt via dhruba)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=938282&r1=938281&r2=938282&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Tue Apr 27 01:09:44 2010
@@ -1172,35 +1172,44 @@ public class RaidNode implements RaidPro
             try {
               // expand destination prefix path
               String destinationPrefix = getDestinationPath(conf, info);
-              Path destp = new Path(destinationPrefix.trim());
-              FileSystem destFs = FileSystem.get(destp.toUri(), conf);
-              destp = destp.makeQualified(destFs);
-              destinationPrefix = destp.toUri().getPath(); // strip host:port
-              destp = getOriginalParityFile(destp, info.getSrcPath());
+              Path destPref = new Path(destinationPrefix.trim());
+              FileSystem destFs = FileSystem.get(destPref.toUri(), conf);
+              destPref = destFs.makeQualified(destPref);
+
+              //get srcPaths
+              Path[] srcPaths = info.getSrcPathExpanded();
               
-              // if this destination path has already been processed as part
-              // of another policy, then nothing more to do
-              if (processed.contains(destp)) {
-                LOG.info("Obsolete parity files for policy " + 
-                        info.getName() + " has already been procesed.");
-                continue;
-              }
+              if ( srcPaths != null ){
+                for (Path srcPath: srcPaths) {
+                  // expand destination prefix
+                  Path destPath = getOriginalParityFile(destPref, srcPath);
+
+                  // if this destination path has already been processed as part
+                  // of another policy, then nothing more to do
+                  if (processed.contains(destPath)) {
+                    LOG.info("Obsolete parity files for policy " + 
+                            info.getName() + " has already been procesed.");
+                    continue;
+                  }
+
+                  FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
+                  FileStatus stat = null;
+                  try {
+                    stat = destFs.getFileStatus(destPath);
+                  } catch (FileNotFoundException e) {
+                    // do nothing, leave stat = null;
+                  }
+                  if (stat != null) {
+                    LOG.info("Purging obsolete parity files for policy " + 
+                              info.getName() + " " + destPath);
+                    recursePurge(srcFs, destFs, destPref.toUri().getPath(), stat);
+                  }
 
-              FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
-              FileStatus stat = null;
-              try {
-                stat = destFs.getFileStatus(destp);
-              } catch (FileNotFoundException e) {
-                // do nothing, leave stat = null;
-              }
-              if (stat != null) {
-                LOG.info("Purging obsolete parity files for policy " + 
-                          info.getName() + " " + destp);
-                recursePurge(srcFs, destFs, destinationPrefix, stat);
-              }
+                  // this destination path has already been processed
+                  processed.add(destPath);
 
-              // this destination path has already been processed
-              processed.add(destp);
+                }
+              }
 
             } catch (Exception e) {
               LOG.warn("Ignoring Exception while processing policy " + 
@@ -1283,13 +1292,13 @@ public class RaidNode implements RaidPro
     long prevExec = 0;
     while (running) {
 
-      LOG.info("Started HAR thread");
       // The config may be reloaded by the TriggerMonitor. 
       // This thread uses whatever config is currently active.
       while(now() < prevExec + configMgr.getPeriodicity()){
         Thread.sleep(SLEEP_TIME);
       }
 
+      LOG.info("Started archive scan");
       prevExec = now();
       
       // fetch all categories
@@ -1310,28 +1319,33 @@ public class RaidNode implements RaidPro
           if (str != null) {
             try {
               long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L );
-              // expand destination prefix path
+
               String destinationPrefix = getDestinationPath(conf, info);
-              Path destp = new Path(destinationPrefix.trim());
-              FileSystem destFs = FileSystem.get(destp.toUri(), conf);
-              destp = destp.makeQualified(destFs);
-              destinationPrefix = destp.toUri().getPath(); // strip host:port
-              destp = getOriginalParityFile(destp, info.getSrcPath());
-
-              FileStatus stat = null;
-              try {
-                stat = destFs.getFileStatus(destp);
-              } catch (FileNotFoundException e) {
-                // do nothing, leave stat = null;
-              }
-              if (stat != null) {
-                LOG.info("Haring parity files for policy " + 
-                    info.getName() + " " + destp);
+              Path destPref = new Path(destinationPrefix.trim());
+              FileSystem destFs = destPref.getFileSystem(conf); 
+              destPref = destFs.makeQualified(destPref);
 
-                recurseHar(destFs, stat, cutoff, tmpHarPath);
+              //get srcPaths
+              Path[] srcPaths = info.getSrcPathExpanded();
+              
+              if ( srcPaths != null ){
+                for (Path srcPath: srcPaths) {
+                  // expand destination prefix
+                  Path destPath = getOriginalParityFile(destPref, srcPath);
+
+                  FileStatus stat = null;
+                  try {
+                    stat = destFs.getFileStatus(destPath);
+                  } catch (FileNotFoundException e) {
+                    // do nothing, leave stat = null;
+                  }
+                  if (stat != null) {
+                    LOG.info("Haring parity files for policy " + 
+                        info.getName() + " " + destPath);
+                    recurseHar(destFs, stat, cutoff, tmpHarPath);
+                  }
+                }
               }
-
-
             } catch (Exception e) {
               LOG.warn("Ignoring Exception while processing policy " + 
                   info.getName() + " " + 

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java?rev=938282&r1=938281&r2=938282&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
Tue Apr 27 01:09:44 2010
@@ -101,12 +101,12 @@ public class TestRaidHar extends TestCas
   /**
    * create raid.xml file for RaidNode
    */
-  private void mySetup(long targetReplication,
-                long metaReplication, long stripeLength) throws Exception {
+  private void mySetup(String srcPath, long targetReplication,
+                long metaReplication, long stripeLength ) throws Exception {
     FileWriter fileWriter = new FileWriter(CONFIG_FILE);
     fileWriter.write("<?xml version=\"1.0\"?>\n");
     String str = "<configuration> " +
-                   "<srcPath prefix=\"/user/test/raidtest\"> " +
+                   "<srcPath prefix=\"" + srcPath + "\"> " +
                      "<policy name = \"RaidTest1\"> " +
                         "<destPath> /destraid</destPath> " +
                         "<property> " +
@@ -162,6 +162,7 @@ public class TestRaidHar extends TestCas
   public void testRaidHar() throws Exception {
     LOG.info("Test testRaidHar  started.");
 
+    String srcPaths    []  = { "/user/test/raidtest", "/user/test/raid*" };
     long blockSizes    []  = {1024L};
     long stripeLengths []  = {5};
     long targetReplication = 1;
@@ -171,11 +172,13 @@ public class TestRaidHar extends TestCas
 
     createClusters(true);
     try {
-      for (long blockSize : blockSizes) {
-        for (long stripeLength : stripeLengths) {
-           doTestHar(iter, targetReplication, metaReplication,
-                       stripeLength, blockSize, numBlock);
-           iter++;
+      for (String srcPath : srcPaths) {
+        for (long blockSize : blockSizes) {
+          for (long stripeLength : stripeLengths) {
+            doTestHar(iter, srcPath, targetReplication, metaReplication,
+                         stripeLength, blockSize, numBlock);
+            iter++;
+          }
         }
       }
     } finally {
@@ -188,12 +191,12 @@ public class TestRaidHar extends TestCas
    * Create parity file, delete original file and then validate that
    * parity file is automatically deleted.
    */
-  private void doTestHar(int iter, long targetReplication,
+  private void doTestHar(int iter, String srcPath, long targetReplication,
                           long metaReplication, long stripeLength,
                           long blockSize, int numBlock) throws Exception {
     LOG.info("doTestHar started---------------------------:" +  " iter " + iter +
              " blockSize=" + blockSize + " stripeLength=" + stripeLength);
-    mySetup(targetReplication, metaReplication, stripeLength);
+    mySetup(srcPath, targetReplication, metaReplication, stripeLength);
     RaidShell shell = null;
     Path dir = new Path("/user/test/raidtest/");
     Path file1 = new Path(dir + "/file" + iter);

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java?rev=938282&r1=938281&r2=938282&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
Tue Apr 27 01:09:44 2010
@@ -106,12 +106,12 @@ public class TestRaidPurge extends TestC
   /**
    * create raid.xml file for RaidNode
    */
-  private void mySetup(long targetReplication,
+  private void mySetup(String srcPath, long targetReplication,
                 long metaReplication, long stripeLength) throws Exception {
     FileWriter fileWriter = new FileWriter(CONFIG_FILE);
     fileWriter.write("<?xml version=\"1.0\"?>\n");
     String str = "<configuration> " +
-                   "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
+                   "<srcPath prefix=\"" + srcPath + "\"> " +
                      "<policy name = \"RaidTest1\"> " +
                         "<destPath> /destraid</destPath> " +
                         "<property> " +
@@ -160,6 +160,7 @@ public class TestRaidPurge extends TestC
   public void testPurge() throws Exception {
     LOG.info("Test testPurge  started.");
 
+    String srcPaths    []  = { "/user/dhruba/raidtest", "/user/dhruba/raid*" };
     long blockSizes    []  = {1024L};
     long stripeLengths []  = {5};
     long targetReplication = 1;
@@ -169,11 +170,13 @@ public class TestRaidPurge extends TestC
 
     createClusters(true);
     try {
-      for (long blockSize : blockSizes) {
-        for (long stripeLength : stripeLengths) {
-          doTestPurge(iter, targetReplication, metaReplication,
-              stripeLength, blockSize, numBlock);
-           iter++;
+      for (String srcPath : srcPaths ) {
+        for (long blockSize : blockSizes) {
+          for (long stripeLength : stripeLengths) {
+            doTestPurge(iter, srcPath, targetReplication, metaReplication,
+                stripeLength, blockSize, numBlock);
+            iter++;
+          }
         }
       }
     } finally {
@@ -186,12 +189,12 @@ public class TestRaidPurge extends TestC
    * Create parity file, delete original file and then validate that
    * parity file is automatically deleted.
    */
-  private void doTestPurge(int iter, long targetReplication,
+  private void doTestPurge(int iter, String srcPath, long targetReplication,
                           long metaReplication, long stripeLength,
                           long blockSize, int numBlock) throws Exception {
     LOG.info("doTestPurge started---------------------------:" +  " iter " + iter +
              " blockSize=" + blockSize + " stripeLength=" + stripeLength);
-    mySetup(targetReplication, metaReplication, stripeLength);
+    mySetup(srcPath, targetReplication, metaReplication, stripeLength);
     RaidShell shell = null;
     Path dir = new Path("/user/dhruba/raidtest/");
     Path file1 = new Path(dir + "/file" + iter);



Mime
View raw message