hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r555360 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/tools/Logalyzer.java src/java/org/apache/hadoop/util/CopyFiles.java src/test/org/apache/hadoop/fs/TestCopyFiles.java
Date Wed, 11 Jul 2007 18:38:05 GMT
Author: omalley
Date: Wed Jul 11 11:38:00 2007
New Revision: 555360

URL: http://svn.apache.org/viewvc?view=rev&rev=555360
Log:
HADOOP-1533. Add persistent logging of errors to distcp.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=555360&r1=555359&r2=555360
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jul 11 11:38:00 2007
@@ -286,6 +286,9 @@
  88. HADOOP-1554.  Log killed tasks to the job history and display them on the
      web/ui. (Devaraj Das via omalley)
 
+ 89. HADOOP-1533.  Add persistent error logging for distcp. The logs are stored
+    into a specified hdfs directory. (Senthil Subramanian via omalley)
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java?view=diff&rev=555360&r1=555359&r2=555360
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java Wed Jul 11 11:38:00
2007
@@ -177,7 +177,7 @@
   {
     String destURL = "hdfs://" + fsConfig.get("fs.default.name", "local") + 
                          archiveDirectory;
-    CopyFiles.copy(fsConfig, logListURI, destURL, true, false);
+    CopyFiles.copy(fsConfig, logListURI, destURL, null, true, false);
   }
   
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?view=diff&rev=555360&r1=555359&r2=555360
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Wed Jul 11 11:38:00
2007
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -54,7 +55,6 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 
 /**
  * A Map-reduce program to recursively copy directories between
@@ -65,7 +65,7 @@
   private static final String S3 = "s3";
   
   private static final String usage = "distcp "+
-    "[-i] <srcurl> | -f <urilist_uri> <desturl>";
+    "[-i] <srcurl> | -f <urilist_uri> <desturl> [-log <logpath>]";
   
   private static final long MIN_BYTES_PER_MAP = 1L << 28;
   private static final int MAX_NUM_MAPS = 10000;
@@ -93,11 +93,13 @@
      * @param jobConf : The handle to the jobConf object to be initialized.
      * @param srcPaths : The source paths.
      * @param destPath : The destination path.
+     * @param logPath : The log path.
      * @param ignoreReadFailures : Ignore read failures?
      * @throws IOException
      */
     public abstract void setup(Configuration conf, JobConf jobConf, 
-                               String[] srcPaths, String destPath, boolean ignoreReadFailures)

+                               String[] srcPaths, String destPath,
+                               Path logPath, boolean ignoreReadFailures) 
       throws IOException;
     
     /**
@@ -198,7 +200,8 @@
       // open source file
       Path srcFile = new Path(srcPath, src);
       FSDataInputStream in = srcFileSys.open(srcFile);
-      long totalBytes = srcFileSys.getLength(srcFile);
+      FileStatus srcFileStatus = srcFileSys.getFileStatus(srcFile);
+      long totalBytes = srcFileStatus.getLen();
       
       // create directories to hold destination file and create destFile
       Path destFile = new Path(destPath, src);
@@ -244,11 +247,12 @@
      * @param jobConf : The handle to the jobConf object to be initialized.
      * @param srcPaths : The source URIs.
      * @param destPath : The destination URI.
+     * @param logPath : The log Path.
      * @param ignoreReadFailures : Ignore read failures?
      */
     public void setup(Configuration conf, JobConf jobConf, 
                       String[] srcPaths, String destPath, 
-                      boolean ignoreReadFailures) 
+                      Path logPath, boolean ignoreReadFailures) 
       throws IOException
     {
       URI srcURI = toURI(srcPaths[0]);
@@ -284,20 +288,15 @@
       jobConf.setSpeculativeExecution(false);
       jobConf.setInputFormat(SequenceFileInputFormat.class);
       
-      jobConf.setOutputKeyClass(Text.class);
-      jobConf.setOutputValueClass(Text.class);
-      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-      
       jobConf.setMapperClass(FSCopyFilesMapper.class);
       
-      jobConf.setNumReduceTasks(1);
+      jobConf.setNumReduceTasks(0);
       jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
       
       Random r = new Random();
       Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_"
                                    + Integer.toString(r.nextInt(Integer.MAX_VALUE), 36));
       Path inDir = new Path(jobDirectory, "in");
-      Path fakeOutDir = new Path(jobDirectory, "out");
       FileSystem fileSys = FileSystem.get(jobConf);
       if (!fileSys.mkdirs(inDir)) {
         throw new IOException("Mkdirs failed to create " +
@@ -306,7 +305,7 @@
       jobConf.set("distcp.job.dir", jobDirectory.toString());
       
       jobConf.setInputPath(inDir);
-      jobConf.setOutputPath(fakeOutDir);
+      jobConf.setOutputPath(logPath);
       
       // create new sequence-files for holding paths
       ArrayList<Path> pathList = new ArrayList<Path>();
@@ -317,7 +316,7 @@
       while(!pathList.isEmpty()) {
         Path top = pathList.remove(0);
         if (srcfs.isFile(top)) {
-          totalBytes += srcfs.getLength(top);
+          totalBytes += srcfs.getFileStatus(top).getLen();
           top = makeRelative(rootPath, top);
           finalPathList.add(top.toString());
         } else {
@@ -406,6 +405,8 @@
       try {
         copy(src, reporter);
       } catch (IOException except) {
+        out.collect(null, new Text("Failed to copy " + src + " : " +
+                           StringUtils.stringifyException(except)));
         if (ignoreReadFailures) {
           reporter.setStatus("Failed to copy " + src + " : " + 
                              StringUtils.stringifyException(except));
@@ -441,11 +442,12 @@
      * @param jobConf : The handle to the jobConf object to be initialized.
      * @param srcPaths : The source URI.
      * @param destPath : The destination URI.
+     * @param logPath : The log Path.
      * @param ignoreReadFailures : Ignore read failures?
      */
     public void setup(Configuration conf, JobConf jobConf, 
                       String[] srcPaths, String destPath, 
-                      boolean ignoreReadFailures) 
+                      Path logPath, boolean ignoreReadFailures) 
       throws IOException
     {
       //Destination
@@ -453,16 +455,12 @@
       jobConf.set("copy.dest.fs", destURI.toString());
       destPath = destURI.getPath();
       jobConf.set("copy.dest.path", destPath);
-      
+
       //Setup the MR-job configuration
       jobConf.setSpeculativeExecution(false);
       
       jobConf.setInputFormat(SequenceFileInputFormat.class);
       
-      jobConf.setOutputKeyClass(Text.class);
-      jobConf.setOutputValueClass(Text.class);
-      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-      
       jobConf.setMapperClass(HTTPCopyFilesMapper.class);
       
       JobClient client = new JobClient(jobConf);
@@ -481,8 +479,7 @@
       jobConf.setInputPath(jobInputDir);
       
       jobConf.set("distcp.job.dir", jobDirectory.toString());
-      Path jobOutputDir = new Path(jobDirectory, "out");
-      jobConf.setOutputPath(jobOutputDir);
+      jobConf.setOutputPath(logPath);
       
       for(int i=0; i < srcPaths.length; ++i) {
         Path ipFile = new Path(jobInputDir, "part" + i);
@@ -514,8 +511,7 @@
       
       try {
         //Destination
-        destFileSys = 
-          FileSystem.getNamed(job.get("copy.dest.fs", "local"), job);
+        destFileSys = FileSystem.get(URI.create(job.get("copy.dest.fs", "file:///")), job);
         destPath = new Path(job.get("copy.dest.path", "/"));
         if (!destFileSys.exists(destPath)) {
           return;
@@ -577,7 +573,7 @@
     /* handle exceptions */
     private void handleException( Reporter reporter, Text key, Throwable e )
     throws IOException {
-      String errMsg = "Failed to copy from: " + (Text)key;
+      String errMsg = "Failed to copy from: " + key;
       reporter.setStatus(errMsg);
       if ( !ignoreReadFailures ) {
         throw new IOException(errMsg);
@@ -700,10 +696,12 @@
    * @param conf Configuration
    * @param srcPath Source path URL
    * @param destPath Destination path URL
+   * @param logPath the log path
    * @param srcAsList List of source URLs to copy.
    * @param ignoreReadFailures True if we are to ignore read failures.
    */
-  public static void copy(Configuration conf, String srcPath, String destPath,
+  public static void copy(Configuration conf, String srcPath,
+                          String destPath, Path logPath,
                           boolean srcAsList, boolean ignoreReadFailures) 
     throws IOException
   {
@@ -715,6 +713,12 @@
     URI srcURI = toURI(srcPath);
     toURI(destPath);
   
+    // default logPath
+    if (logPath == null) {
+      logPath = new Path(toURI(destPath).getPath() + "/_distcp_logs_" +
+                         System.currentTimeMillis());
+    }
+
     //Create the task-specific mapper 
     CopyFilesMapper mapper = null;
     String[] srcPaths = null;
@@ -728,7 +732,7 @@
       String[] dfsUrls = parseInputFile(HDFS, srcPaths);
       if (dfsUrls != null) {
         for(int i=0; i < dfsUrls.length; ++i) {
-          copy(conf, dfsUrls[i], destPath, false, ignoreReadFailures);
+          copy(conf, dfsUrls[i], destPath, logPath, false, ignoreReadFailures);
         }
       }
       
@@ -736,7 +740,7 @@
       String[] localUrls = parseInputFile("file", srcPaths);
       if (localUrls != null) {
         for(int i=0; i < localUrls.length; ++i) {
-          copy(conf, localUrls[i], destPath, false, ignoreReadFailures);
+          copy(conf, localUrls[i], destPath, logPath, false, ignoreReadFailures);
         }
       }
       
@@ -766,7 +770,7 @@
     }
     
     //Initialize the mapper
-    mapper.setup(conf, jobConf, srcPaths, destPath, ignoreReadFailures);
+    mapper.setup(conf, jobConf, srcPaths, destPath, logPath, ignoreReadFailures);
     
     //We are good to go!
     try {
@@ -787,6 +791,7 @@
   public int run(String[] args) throws Exception {
     String srcPath = null;
     String destPath = null;
+    Path logPath = null;
     boolean ignoreReadFailures = false;
     boolean srcAsList = false;
     
@@ -799,6 +804,8 @@
         srcPath = args[idx];
       } else if (destPath == null) {
         destPath = args[idx];
+      } else if ("-log".equals(args[idx])) {
+        logPath = new Path(args[++idx]);
       } else {
         System.out.println(usage);
         return -1;
@@ -810,9 +817,41 @@
       System.out.println(usage);
       return -1;
     }
-    
+  
+    // default logPath
+    if (logPath == null) {
+      logPath = new Path(toURI(destPath).getPath() + "/_distcp_logs_" +
+                         System.currentTimeMillis());
+      System.out.println("Using default logPath: " + logPath);
+    }
+
+    // verify if srcPath, destPath are valid and logPath is valid and doesnot exist
+    try {
+      URI srcURI = toURI(srcPath);
+      FileSystem srcfs = FileSystem.get(srcURI, conf);
+      if (!srcfs.exists(new Path(srcPath))) {
+        System.out.println(srcPath + " does not exist.");
+        return -1;
+      }
+
+      URI destURI = toURI(destPath);
+      FileSystem destfs = FileSystem.get(destURI, conf);
+      if (destfs.exists(new Path(destPath))) {
+        System.out.println("WARNING: " + destPath + " already exists.");
+      }
+
+      FileSystem logfs = FileSystem.get(logPath.toUri(), conf);
+      if (logfs.exists(logPath)) {
+        System.out.println("ERROR: " + logPath + " already exists.");
+        return -1;
+      }
+    } catch (Exception e) {
+      System.err.println("Copy failed: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+
     try {
-      copy(conf, srcPath, destPath, srcAsList, ignoreReadFailures);
+      copy(conf, srcPath, destPath, logPath, srcAsList, ignoreReadFailures);
     } catch (Exception e) {
       System.err.println("Copy failed: "+StringUtils.stringifyException(e));
       return -1;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?view=diff&rev=555360&r1=555359&r2=555360
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Wed Jul 11 11:38:00
2007
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.Random;
+import java.net.URI;
 import junit.framework.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
@@ -182,11 +183,17 @@
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
         new CopyFiles().doMain(conf, new String[] {"hdfs://"+namenode+"/srcdat",
-                                                   "hdfs://"+namenode+"/destdat"});
+                                                   "hdfs://"+namenode+"/destdat",
+                                                   "-log",
+                                                   "hdfs://"+namenode+"/logs"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(namenode, "/destdat", files));
+        FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
+        assertTrue("Log directory doesnot exist.",
+                    fs.exists(new Path("hdfs://"+namenode+"/logs")));
         deldir(namenode, "/destdat");
         deldir(namenode, "/srcdat");
+        deldir(namenode, "/logs");
       }
     } finally {
       if (cluster != null) { cluster.shutdown(); }
@@ -204,10 +211,16 @@
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
         new CopyFiles().doMain(conf, new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
-                                                   "hdfs://"+namenode+"/destdat"});
+                                                   "hdfs://"+namenode+"/destdat",
+                                                   "-log",
+                                                   "hdfs://"+namenode+"/logs"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(namenode, "/destdat", files));
+        FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
+        assertTrue("Log directory doesnot exist.",
+                    fs.exists(new Path("hdfs://"+namenode+"/logs")));
         deldir(namenode, "/destdat");
+        deldir(namenode, "/logs");
         deldir("local", TEST_ROOT_DIR+"/srcdat");
       }
     } finally {
@@ -226,10 +239,16 @@
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
         new CopyFiles().doMain(conf, new String[] {"hdfs://"+namenode+"/srcdat",
-                                                   "file://"+TEST_ROOT_DIR+"/destdat"});
+                                                   "file://"+TEST_ROOT_DIR+"/destdat",
+                                                   "-log",
+                                                   TEST_ROOT_DIR+"/logs"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
+        FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
+        assertTrue("Log directory doesnot exist.",
+                    fs.exists(new Path(TEST_ROOT_DIR+"/logs")));
         deldir("local", TEST_ROOT_DIR+"/destdat");
+        deldir("local", TEST_ROOT_DIR+"/logs");
         deldir(namenode, "/srcdat");
       }
     } finally {



Mime
View raw message