hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r644505 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/util/CopyFiles.java src/test/org/apache/hadoop/fs/TestCopyFiles.java
Date Thu, 03 Apr 2008 21:08:55 GMT
Author: cdouglas
Date: Thu Apr  3 14:08:53 2008
New Revision: 644505

URL: http://svn.apache.org/viewvc?rev=644505&view=rev
Log:
HADOOP-3099. Add an option to distcp to preserve user, group, and
permission information. Contributed by Tsz Wo (Nicholas), SZE.


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=644505&r1=644504&r2=644505&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr  3 14:08:53 2008
@@ -188,6 +188,9 @@
     HADOOP-3106. Adds documentation in forrest for debugging.
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-3099. Add an option to distcp to preserve user, group, and
+    permission information. (Tsz Wo (Nicholas), SZE via cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-2790.  Fixed inefficient method hasSpeculativeTask by removing

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=644505&r1=644504&r2=644505&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Thu Apr  3 14:08:53 2008
@@ -20,11 +20,9 @@
 
 import java.io.BufferedReader;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.text.DecimalFormat;
 import java.util.*;
 
 import org.apache.commons.logging.Log;
@@ -36,6 +34,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -65,7 +64,13 @@
   private static final String usage = NAME
     + " [OPTIONS] <srcurl>* <desturl>" +
     "\n\nOPTIONS:" +
-    "\n-p                     Preserve status" +
+    "\n-p[rbugp]              Preserve status" +
+    "\n                       r: replication number" +
+    "\n                       b: block size" +
+    "\n                       u: user" + 
+    "\n                       g: group" +
+    "\n                       p: permission" +
+    "\n                       -p alone is equivalent to -prbugp" +
     "\n-i                     Ignore failures" +
     "\n-log <logdir>          Write logs to <logdir>" +
     "\n-overwrite             Overwrite destination" +
@@ -86,7 +91,7 @@
   static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
   static enum Options {
     IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
-    PRESERVE_STATUS("-p", NAME + ".preserve.status.info"),
+    PRESERVE_STATUS("-p", NAME + ".preserve.status"),
     OVERWRITE("-overwrite", NAME + ".overwrite.always"),
     UPDATE("-update", NAME + ".overwrite.ifnewer");
 
@@ -97,6 +102,38 @@
       this.propertyname = propertyname;
     }
   }
+  static enum FileAttribute {
+    BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION;
+
+    final char symbol;
+
+    private FileAttribute() {symbol = toString().toLowerCase().charAt(0);}
+    
+    static EnumSet<FileAttribute> parse(String s) {
+      if (s == null || s.length() == 0) {
+        return EnumSet.allOf(FileAttribute.class);
+      }
+
+      EnumSet<FileAttribute> set = EnumSet.noneOf(FileAttribute.class);
+      FileAttribute[] attributes = values();
+      for(char c : s.toCharArray()) {
+        int i = 0;
+        for(; i < attributes.length && c != attributes[i].symbol; i++);
+        if (i < attributes.length) {
+          if (!set.contains(attributes[i])) {
+            set.add(attributes[i]);
+          } else {
+            throw new IllegalArgumentException("There are more than one '"
+                + attributes[i].symbol + "' in " + s); 
+          }
+        } else {
+          throw new IllegalArgumentException("'" + c + "' in " + s
+              + " is undefined.");
+        }
+      }
+      return set;
+    }
+  }
 
   static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
   static final String DST_DIR_LABEL = NAME + ".dest.path";
@@ -104,6 +141,9 @@
   static final String SRC_LIST_LABEL = NAME + ".src.list";
   static final String SRC_COUNT_LABEL = NAME + ".src.count";
   static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
+  static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list";
+  static final String PRESERVE_STATUS_LABEL
+      = Options.PRESERVE_STATUS.propertyname + ".value";
 
   private JobConf conf;
 
@@ -128,22 +168,22 @@
    */
   static class FilePair implements Writable {
     FileStatus input = new FileStatus();
-    Path output;
+    String output;
     FilePair() { }
-    FilePair(FileStatus input, Path output) {
+    FilePair(FileStatus input, String output) {
       this.input = input;
       this.output = output;
     }
     public void readFields(DataInput in) throws IOException {
       input.readFields(in);
-      output = new Path(Text.readString(in));
+      output = Text.readString(in);
     }
     public void write(DataOutput out) throws IOException {
       input.write(out);
-      Text.writeString(out, output.toString());
+      Text.writeString(out, output);
     }
     public String toString() {
-      return input.toString() + " : " + output.toString();
+      return input + " : " + output;
     }
   }
 
@@ -224,13 +264,14 @@
   /**
    * FSCopyFilesMapper: The mapper for copying files between FileSystems.
    */
-  public static class FSCopyFilesMapper
+  static class CopyFilesMapper
       implements Mapper<LongWritable, FilePair, WritableComparable, Text> {
     // config
     private int sizeBuf = 128 * 1024;
     private FileSystem destFileSys = null;
     private boolean ignoreReadFailures;
     private boolean preserve_status;
+    private EnumSet<FileAttribute> preseved;
     private boolean overwrite;
     private boolean update;
     private Path destPath = null;
@@ -238,14 +279,16 @@
     private JobConf job;
 
     // stats
-    private static final DecimalFormat pcntfmt = new DecimalFormat("0.00");
     private int failcount = 0;
     private int skipcount = 0;
     private int copycount = 0;
 
+    private String getCountString() {
+      return "Copied: " + copycount + " Skipped: " + skipcount
+          + " Failed: " + failcount;
+    }
     private void updateStatus(Reporter reporter) {
-      reporter.setStatus("Copied: " + copycount + " Skipped: " + skipcount +
-                    " Failed: " + failcount);
+      reporter.setStatus(getCountString());
     }
 
     /**
@@ -257,6 +300,22 @@
     private boolean needsUpdate(FileStatus src, FileStatus dst) {
       return update && src.getLen() != dst.getLen();
     }
+    
+    private FSDataOutputStream create(Path f, Reporter reporter,
+        FileStatus srcstat) throws IOException {
+      if (!preserve_status) {
+        return destFileSys.create(f, reporter);
+      }
+
+      FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
+          srcstat.getPermission(): null;
+      short replication = preseved.contains(FileAttribute.REPLICATION)?
+          srcstat.getReplication(): destFileSys.getDefaultReplication();
+      long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
+          srcstat.getBlockSize(): destFileSys.getDefaultBlockSize();
+      return destFileSys.create(f, permission, true, sizeBuf, replication,
+          blockSize, reporter);
+    }
 
     /**
      * Copy a file to a destination.
@@ -306,17 +365,14 @@
         in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
         reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
         // open tmp file
-        out = preserve_status
-          ? destFileSys.create(tmpfile, true, sizeBuf, srcstat.getReplication(),
-             srcstat.getBlockSize(), reporter)
-          : destFileSys.create(tmpfile, reporter);
+        out = create(tmpfile, reporter, srcstat);
         // copy file
-        int cbread;
-        while ((cbread = in.read(buffer)) >= 0) {
+        for(int cbread; (cbread = in.read(buffer)) >= 0; ) {
           out.write(buffer, 0, cbread);
           cbcopied += cbread;
-          reporter.setStatus(pcntfmt.format(100.0 * cbcopied / srcstat.getLen())
-              + " " + absdst + " [ " +
+          reporter.setStatus(
+              String.format("%.2f ", cbcopied*100.0/srcstat.getLen())
+              + absdst + " [ " +
               StringUtils.humanReadableInt(cbcopied) + " / " +
               StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
         }
@@ -350,7 +406,8 @@
         if (!destFileSys.mkdirs(absdst.getParent())) {
           throw new IOException("Failed to craete parent dir: " + absdst.getParent());
         }
-        rename(destFileSys, tmpfile, absdst);
+        rename(tmpfile, absdst);
+        updatePermissions(srcstat, destFileSys.getFileStatus(absdst));
       }
 
       // report at least once for each file
@@ -361,23 +418,28 @@
     }
     
     /** rename tmp to dst, delete dst if already exists */
-    private void rename(FileSystem fs, Path tmp, Path dst) throws IOException {
+    private void rename(Path tmp, Path dst) throws IOException {
       try {
-        if (fs.exists(dst)) {
-          fs.delete(dst, true);
+        if (destFileSys.exists(dst)) {
+          destFileSys.delete(dst, true);
         }
-        if (!fs.rename(tmp, dst)) {
+        if (!destFileSys.rename(tmp, dst)) {
           throw new IOException();
         }
       }
       catch(IOException cause) {
-        IOException ioe = new IOException("Fail to rename tmp file (=" + tmp 
-            + ") to destination file (=" + dst + ")");
-        ioe.initCause(cause);
-        throw ioe;
+        throw (IOException)new IOException("Fail to rename tmp file (=" + tmp 
+            + ") to destination file (=" + dst + ")").initCause(cause);
       }
     }
-    
+
+    private void updatePermissions(FileStatus src, FileStatus dst
+        ) throws IOException {
+      if (preserve_status) {
+        CopyFiles.updatePermissions(src, dst, preseved, destFileSys);
+      }
+    }
+
     static String bytesString(long b) {
       return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
     }
@@ -399,6 +461,9 @@
       buffer = new byte[sizeBuf];
       ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false);
       preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false);
+      if (preserve_status) {
+        preseved = FileAttribute.parse(job.get(PRESERVE_STATUS_LABEL));
+      }
       update = job.getBoolean(Options.UPDATE.propertyname, false);
       overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false);
       this.job = job;
@@ -415,7 +480,7 @@
                     OutputCollector<WritableComparable, Text> out,
                     Reporter reporter) throws IOException {
       FileStatus srcstat = value.input;
-      Path dstpath = value.output;
+      Path dstpath = new Path(value.output);
       try {
         copy(srcstat, dstpath, out, reporter);
       } catch (IOException e) {
@@ -451,10 +516,8 @@
       if (0 == failcount || ignoreReadFailures) {
         return;
       }
-      throw new IOException("Copied: " + copycount + " Skipped: " + skipcount +
-          " Failed: " + failcount);
+      throw new IOException(getCountString());
     }
-
   }
 
   private static List<Path> fetchFileList(Configuration conf, Path srcList)
@@ -490,7 +553,7 @@
     EnumSet<Options> flags = ignoreReadFailures
       ? EnumSet.of(Options.IGNORE_READ_FAILURES)
       : EnumSet.noneOf(Options.class);
-    copy(conf, tmp, new Path(destPath), logPath, flags);
+    copy(conf, tmp, new Path(destPath), logPath, flags, null);
   }
 
   /** Sanity check for srcPath */
@@ -515,18 +578,23 @@
    * @param logPath Log output directory
    * @param flags Command-line flags
    */
-  public static void copy(Configuration conf, List<Path> srcPaths,
-      Path destPath, Path logPath,
-      EnumSet<Options> flags) throws IOException {
+  static void copy(Configuration conf, List<Path> srcPaths,
+      Path destPath, Path logPath, EnumSet<Options> flags,
+      String presevedAttributes) throws IOException {
     LOG.info("srcPaths=" + srcPaths);
     LOG.info("destPath=" + destPath);
     checkSrcPath(conf, srcPaths);
 
     JobConf job = createJobConf(conf);
+    if (presevedAttributes != null) {
+      job.set(PRESERVE_STATUS_LABEL, presevedAttributes);
+    }
+    
     //Initialize the mapper
     try {
       setup(conf, job, srcPaths, destPath, logPath, flags);
       JobClient.runJob(job);
+      finalize(conf, job, destPath, presevedAttributes);
     } finally {
       //delete tmp
       fullyDelete(job.get(TMP_DIR_LABEL), job);
@@ -535,6 +603,124 @@
     }
   }
 
+  private static void updatePermissions(FileStatus src, FileStatus dst,
+      EnumSet<FileAttribute> preseved, FileSystem destFileSys
+      ) throws IOException {
+    String owner = null;
+    String group = null;
+    if (preseved.contains(FileAttribute.USER)
+        && !src.getOwner().equals(dst.getOwner())) {
+      owner = src.getOwner();
+    }
+    if (preseved.contains(FileAttribute.GROUP)
+        && !src.getGroup().equals(dst.getGroup())) {
+      group = src.getGroup();
+    }
+    if (owner != null || group != null) {
+      destFileSys.setOwner(dst.getPath(), owner, group);
+    }
+    if (preseved.contains(FileAttribute.PERMISSION)
+        && !src.getPermission().equals(dst.getPermission())) {
+      destFileSys.setPermission(dst.getPath(), src.getPermission());
+    }
+  }
+
+  static private void finalize(Configuration conf, JobConf jobconf,
+      final Path destPath, String presevedAttributes) throws IOException {
+    if (presevedAttributes == null) {
+      return;
+    }
+    EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
+    if (!preseved.contains(FileAttribute.USER)
+        && !preseved.contains(FileAttribute.GROUP)
+        && !preseved.contains(FileAttribute.PERMISSION)) {
+      return;
+    }
+
+    FileSystem dstfs = destPath.getFileSystem(conf);
+    Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
+    SequenceFile.Reader in = null;
+    try {
+      in = new SequenceFile.Reader(dstdirlist.getFileSystem(jobconf),
+          dstdirlist, jobconf);
+      Text dsttext = new Text();
+      FilePair pair = new FilePair(); 
+      for(; in.next(dsttext, pair); ) {
+        Path absdst = new Path(destPath, pair.output);
+        updatePermissions(pair.input, dstfs.getFileStatus(absdst),
+            preseved, dstfs);
+      }
+    } finally {
+      checkAndClose(in);
+    }
+  }
+
+  static private class CommandArgument {
+    final List<Path> srcs;
+    final Path dst;
+    final Path log;
+    final EnumSet<Options> flags;
+    final String presevedAttributes;
+    
+    CommandArgument(List<Path> srcs, Path dst, Path log,
+        EnumSet<Options> flags, String presevedAttributes) {
+      this.srcs = srcs;
+      this.dst = dst;
+      this.log = log;
+      this.flags = flags;
+      this.presevedAttributes = presevedAttributes;      
+    }
+
+    static CommandArgument valueOf(String[] args, Configuration conf
+        ) throws IOException {
+      List<Path> srcs = new ArrayList<Path>();
+      Path dst = null;
+      Path log = null;
+      EnumSet<Options> flags = EnumSet.noneOf(Options.class);
+      String presevedAttributes = null;
+
+      for (int idx = 0; idx < args.length; idx++) {
+        Options[] opt = Options.values();
+        int i = 0;
+        for(; i < opt.length && !args[idx].startsWith(opt[i].cmd); i++);
+
+        if (i < opt.length) {
+          flags.add(opt[i]);
+          if (opt[i] == Options.PRESERVE_STATUS) {
+            presevedAttributes =  args[idx].substring(2);         
+            FileAttribute.parse(presevedAttributes); //validation
+          }
+        } else if ("-f".equals(args[idx])) {
+          if (++idx ==  args.length) {
+            throw new IllegalArgumentException("urilist_uri not specified in -f");
+          }
+          srcs.addAll(fetchFileList(conf, new Path(args[idx])));
+        } else if ("-log".equals(args[idx])) {
+          if (++idx ==  args.length) {
+            throw new IllegalArgumentException("logdir not specified in -log");
+          }
+          log = new Path(args[idx]);
+        } else if ('-' == args[idx].codePointAt(0)) {
+          throw new IllegalArgumentException("Invalid switch " + args[idx]);
+        } else if (idx == args.length -1) {
+          dst = new Path(args[idx]);
+        } else {
+          srcs.add(new Path(args[idx]));
+        }
+      }
+      // mandatory command-line parameters
+      if (srcs.isEmpty() || dst == null) {
+        throw new IllegalArgumentException("Missing "
+            + (dst == null ? "dst path" : "src"));
+      }
+      // incompatible command-line flags
+      if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
+        throw new IllegalArgumentException("Conflicting overwrite policies");
+      }
+      return new CommandArgument(srcs, dst, log, flags, presevedAttributes);
+    }
+  }
+
   /**
    * This is the main driver for recursively copying directories
    * across file systems. It takes at least two cmdline parameters. A source
@@ -544,59 +730,14 @@
    * reduce is empty.
    */
   public int run(String[] args) throws Exception {
-    List<Path> srcPath = new ArrayList<Path>();
-    Path destPath = null;
-    Path logPath = null;
-    EnumSet<Options> flags = EnumSet.noneOf(Options.class);
-
-    for (int idx = 0; idx < args.length; idx++) {
-      Options[] opt = Options.values();
-      int i = 0;
-      for(; i < opt.length && !opt[i].cmd.equals(args[idx]); i++);
-
-      if (i < opt.length) {
-        flags.add(opt[i]);
-      }        
-      else if ("-f".equals(args[idx])) {
-        if (++idx ==  args.length) {
-          System.out.println("urilist_uri not specified");
-          System.out.println(usage);
-          return -1;
-        }
-        srcPath.addAll(fetchFileList(conf, new Path(args[idx])));
-      } else if ("-log".equals(args[idx])) {
-        if (++idx ==  args.length) {
-          System.out.println("logdir not specified");
-          System.out.println(usage);
-          return -1;
-        }
-        logPath = new Path(args[idx]);
-      } else if ('-' == args[idx].codePointAt(0)) {
-        System.out.println("Invalid switch " + args[idx]);
-        System.out.println(usage);
-        ToolRunner.printGenericCommandUsage(System.out);
-        return -1;
-      } else if (idx == args.length -1) {
-        destPath = new Path(args[idx]);
-      } else {
-        srcPath.add(new Path(args[idx]));
-      }
-    }
-    // mandatory command-line parameters
-    if (srcPath.isEmpty() || destPath == null) {
-      System.out.println("Missing " + (destPath == null ? "dst path" : "src"));
-      System.out.println(usage);
-      ToolRunner.printGenericCommandUsage(System.out);
-      return -1;
-    }
-    // incompatible command-line flags
-    if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
-      System.out.println("Conflicting overwrite policies");
-      System.out.println(usage);
-      return -1;
-    }
     try {
-      copy(conf, srcPath, destPath, logPath, flags);
+      CommandArgument p = CommandArgument.valueOf(args, conf);
+      copy(conf, p.srcs, p.dst, p.log, p.flags, p.presevedAttributes);
+      return 0;
+    } catch (IllegalArgumentException e) {
+      System.err.println(StringUtils.stringifyException(e) + "\n" + usage);
+      ToolRunner.printGenericCommandUsage(System.err);
+      return -1;
     } catch (DuplicationException e) {
       System.err.println(StringUtils.stringifyException(e));
       return DuplicationException.ERROR_CODE;
@@ -604,9 +745,8 @@
       System.err.println("With failures, global counters are inaccurate; " +
           "consider running with -i");
       System.err.println("Copy failed: " + StringUtils.stringifyException(e));
-      return -1;
+      return -999;
     }
-    return 0;
   }
 
   public static void main(String[] args) throws Exception {
@@ -621,28 +761,26 @@
    * absPath is always assumed to descend from root.
    * Otherwise returned path is null.
    */
-  public static Path makeRelative(Path root, Path absPath) {
-    if (!absPath.isAbsolute()) { return absPath; }
-    String sRoot = root.toUri().getPath();
-    String sPath = absPath.toUri().getPath();
-    Enumeration<Object> rootTokens = new StringTokenizer(sRoot, "/");
-    ArrayList rList = Collections.list(rootTokens);
-    Enumeration<Object> pathTokens = new StringTokenizer(sPath, "/");
-    ArrayList pList = Collections.list(pathTokens);
-    Iterator rIter = rList.iterator();
-    Iterator pIter = pList.iterator();
-    while (rIter.hasNext()) {
-      String rElem = (String) rIter.next();
-      String pElem = (String) pIter.next();
-      if (!rElem.equals(pElem)) { return null; }
-    }
-    StringBuffer sb = new StringBuffer();
-    while (pIter.hasNext()) {
-      String pElem = (String) pIter.next();
-      sb.append(pElem);
-      if (pIter.hasNext()) { sb.append("/"); }
+  static String makeRelative(Path root, Path absPath) {
+    if (!absPath.isAbsolute()) {
+      throw new IllegalArgumentException("!absPath.isAbsolute(), absPath="
+          + absPath);
+    }
+    String p = absPath.toUri().getPath();
+
+    StringTokenizer pathTokens = new StringTokenizer(p, "/");
+    for(StringTokenizer rootTokens = new StringTokenizer(
+        root.toUri().getPath(), "/"); rootTokens.hasMoreTokens(); ) {
+      if (!rootTokens.nextToken().equals(pathTokens.nextToken())) {
+        return null;
+      }
+    }
+    StringBuilder sb = new StringBuilder();
+    for(; pathTokens.hasMoreTokens(); ) {
+      sb.append(pathTokens.nextToken());
+      if (pathTokens.hasMoreTokens()) { sb.append(Path.SEPARATOR); }
     }
-    return new Path(sb.toString());
+    return sb.length() == 0? ".": sb.toString();
   }
 
   /**
@@ -681,7 +819,7 @@
     jobconf.setOutputKeyClass(Text.class);
     jobconf.setOutputValueClass(Text.class);
 
-    jobconf.setMapperClass(FSCopyFilesMapper.class);
+    jobconf.setMapperClass(CopyFilesMapper.class);
     jobconf.setNumReduceTasks(0);
     return jobconf;
   }
@@ -735,7 +873,9 @@
       String filename = "_distcp_logs_" + randomId;
       if (!dstExists || !dstIsDir) {
         Path parent = destPath.getParent();
-        dstfs.mkdirs(parent);
+        if (!dstfs.exists(parent)) {
+          dstfs.mkdirs(parent);
+        }
         logPath = new Path(parent, filename);
       } else {
         logPath = new Path(destPath, filename);
@@ -757,35 +897,41 @@
         dstfilelist, Text.class, Text.class,
         SequenceFile.CompressionType.NONE);
 
+    Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs");
+    jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString());
+    SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, jobConf,
+        dstdirlist, Text.class, FilePair.class,
+        SequenceFile.CompressionType.NONE);
+
     // handle the case where the destination directory doesn't exist
     // and we've only a single src directory OR we're updating/overwriting
     // the contents of the destination directory.
     final boolean special =
       (srcPaths.size() == 1 && !dstExists) || updateORoverwrite;
-    int srcCount = 0, cnsyncf = 0;
+    int srcCount = 0, cnsyncf = 0, dirsyn = 0;
     long cbsize = 0L, cbsyncs = 0L;
     try {
-      for (Path p : srcPaths) {
-        FileSystem fs = p.getFileSystem(conf);
-        boolean pIsDir = fs.getFileStatus(p).isDir();
-        Path root = special && pIsDir? p: p.getParent();
-        if (pIsDir) {
+      for (Path src : srcPaths) {
+        FileSystem fs = src.getFileSystem(conf);
+        FileStatus srcfilestat = fs.getFileStatus(src);
+        Path root = special && srcfilestat.isDir()? src: src.getParent();
+        if (srcfilestat.isDir()) {
           ++srcCount;
         }
 
-        Stack<Path> pathstack = new Stack<Path>();
-        pathstack.push(p);
-        while (!pathstack.empty()) {
-          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
+        Stack<FileStatus> pathstack = new Stack<FileStatus>();
+        for(pathstack.push(srcfilestat); !pathstack.empty(); ) {
+          FileStatus cur = pathstack.pop();
+          for(FileStatus child : fs.listStatus(cur.getPath())) {
             ++srcCount;
 
-            if (stat.isDir()) {
-              pathstack.push(stat.getPath());
+            if (child.isDir()) {
+              pathstack.push(child);
             }
             else {
               ++cnsyncf;
-              cbsyncs += stat.getLen();
-              cbsize += stat.getLen();
+              cbsyncs += child.getLen();
+              cbsize += child.getLen();
 
               if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
                 src_writer.sync();
@@ -795,17 +941,27 @@
               }
             }
 
-            Path dst = makeRelative(root, stat.getPath());
-            src_writer.append(new LongWritable(stat.isDir()? 0: stat.getLen()),
-                new FilePair(stat, dst));
-            dst_writer.append(new Text(dst.toString()),
-                new Text(stat.getPath().toString()));
+            String dst = makeRelative(root, child.getPath());
+            src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
+                new FilePair(child, dst));
+            dst_writer.append(new Text(dst),
+                new Text(child.getPath().toString()));
+          }
+
+          if (cur.isDir()) {
+            String dst = makeRelative(root, cur.getPath());
+            dir_writer.append(new Text(dst), new FilePair(cur, dst));
+            if (++dirsyn > SYNC_FILE_MAX) {
+              dirsyn = 0;
+              dir_writer.sync();                
+            }
           }
         }
       }
     } finally {
       checkAndClose(src_writer);
       checkAndClose(dst_writer);
+      checkAndClose(dir_writer);
     }
 
     // create dest path dir if copying > 1 file
@@ -814,7 +970,7 @@
         throw new IOException("Failed to create" + destPath);
       }
     }
-    
+
     checkDuplication(jobfs, dstfilelist,
         new Path(jobDirectory, "_distcp_sorted"), conf);
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=644505&r1=644504&r2=644505&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu Apr  3 14:08:53
2008
@@ -26,6 +26,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.CopyFiles;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -444,6 +445,78 @@
       deldir("file:///", TEST_ROOT_DIR+"/destdat");
       deldir("file:///", TEST_ROOT_DIR+"/dest2");
       deldir("file:///", TEST_ROOT_DIR+"/srcdat");
+    }
+  }
+
+  public void testPreserveOption() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster(conf, 2, true, null);
+      String nnUri = FileSystem.getDefaultUri(conf).toString();
+      FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+
+      {//test preserving user
+        MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
+        FileStatus[] srcstat = getFileStatus(nnUri, "/srcdat", files);
+        for(int i = 0; i < srcstat.length; i++) {
+          fs.setOwner(srcstat[i].getPath(), "u" + i, null);
+        }
+        ToolRunner.run(new CopyFiles(conf),
+            new String[]{"-pu", nnUri+"/srcdat", nnUri+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+                   checkFiles(nnUri, "/destdat", files));
+        
+        FileStatus[] dststat = getFileStatus(nnUri, "/destdat", files);
+        for(int i = 0; i < dststat.length; i++) {
+          assertEquals("i=" + i, "u" + i, dststat[i].getOwner());
+        }
+        deldir(nnUri, "/destdat");
+        deldir(nnUri, "/srcdat");
+      }
+
+      {//test preserving group
+        MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
+        FileStatus[] srcstat = getFileStatus(nnUri, "/srcdat", files);
+        for(int i = 0; i < srcstat.length; i++) {
+          fs.setOwner(srcstat[i].getPath(), null, "g" + i);
+        }
+        ToolRunner.run(new CopyFiles(conf),
+            new String[]{"-pg", nnUri+"/srcdat", nnUri+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+                   checkFiles(nnUri, "/destdat", files));
+        
+        FileStatus[] dststat = getFileStatus(nnUri, "/destdat", files);
+        for(int i = 0; i < dststat.length; i++) {
+          assertEquals("i=" + i, "g" + i, dststat[i].getGroup());
+        }
+        deldir(nnUri, "/destdat");
+        deldir(nnUri, "/srcdat");
+      }
+
+      {//test preserving mode
+        MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
+        FileStatus[] srcstat = getFileStatus(nnUri, "/srcdat", files);
+        FsPermission[] permissions = new FsPermission[srcstat.length];
+        for(int i = 0; i < srcstat.length; i++) {
+          permissions[i] = new FsPermission((short)(i & 0666));
+          fs.setPermission(srcstat[i].getPath(), permissions[i]);
+        }
+
+        ToolRunner.run(new CopyFiles(conf),
+            new String[]{"-pp", nnUri+"/srcdat", nnUri+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+                   checkFiles(nnUri, "/destdat", files));
+  
+        FileStatus[] dststat = getFileStatus(nnUri, "/destdat", files);
+        for(int i = 0; i < dststat.length; i++) {
+          assertEquals("i=" + i, permissions[i], dststat[i].getPermission());
+        }
+        deldir(nnUri, "/destdat");
+        deldir(nnUri, "/srcdat");
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
     }
   }
 }



Mime
View raw message