hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r922395 - in /hadoop/mapreduce/trunk: CHANGES.txt src/test/mapred/org/apache/hadoop/tools/TestHadoopArchives.java src/tools/org/apache/hadoop/tools/HadoopArchives.java
Date Fri, 12 Mar 2010 18:56:39 GMT
Author: szetszwo
Date: Fri Mar 12 18:56:39 2010
New Revision: 922395

URL: http://svn.apache.org/viewvc?rev=922395&view=rev
Log:
MAPREDUCE-1579. archive: check and possibly replace the space charater in source paths.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHadoopArchives.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=922395&r1=922394&r2=922395&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Mar 12 18:56:39 2010
@@ -1379,3 +1379,6 @@ Release 0.21.0 - Unreleased
 
     MAPREDUCE-890. After HADOOP-4491, the user who started mapred system is 
     not able to run job. (Ravi Gummadi via vinodkv)
+
+    MAPREDUCE-1579. archive: check and possibly replace the space charater
+    in source paths.  (szetszwo)

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHadoopArchives.java?rev=922395&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHadoopArchives.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHadoopArchives.java
Fri Mar 12 18:56:39 2010
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+
+/**
+ * test {@link HadoopArchives}
+ */
+public class TestHadoopArchives extends TestCase {
+  {
+    ((Log4JLogger)LogFactory.getLog(org.apache.hadoop.security.Groups.class)
+        ).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)org.apache.hadoop.ipc.Server.LOG
+        ).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)org.apache.hadoop.util.AsyncDiskService.LOG
+        ).getLogger().setLevel(Level.OFF);
+  }
+
+  private static final String inputDir = "input";
+
+  private Path inputPath;
+  private MiniDFSCluster dfscluster;
+  private MiniMRCluster mapred;
+  private FileSystem fs;
+  private Path archivePath;
+  
+  static private Path createFile(Path dir, String filename, FileSystem fs
+      ) throws IOException {
+    final Path f = new Path(dir, filename);
+    final FSDataOutputStream out = fs.create(f); 
+    out.write(filename.getBytes());
+    out.close();
+    return f;
+  }
+  
+  protected void setUp() throws Exception {
+    super.setUp();
+    dfscluster = new MiniDFSCluster(new Configuration(), 2, true, null);
+    fs = dfscluster.getFileSystem();
+    mapred = new MiniMRCluster(2, fs.getUri().toString(), 1);
+    inputPath = new Path(fs.getHomeDirectory(), inputDir); 
+    archivePath = new Path(fs.getHomeDirectory(), "archive");
+    fs.mkdirs(inputPath);
+    createFile(inputPath, "a", fs);
+    createFile(inputPath, "b", fs);
+    createFile(inputPath, "c", fs);
+  }
+  
+  protected void tearDown() throws Exception {
+    try {
+      if (mapred != null) {
+        mapred.shutdown();
+      }
+      if (dfscluster != null) {
+        dfscluster.shutdown();
+      }
+    } catch(Exception e) {
+      System.err.println(e);
+    }
+    super.tearDown();
+  }
+  
+  
+  public void testPathWithSpaces() throws Exception {
+    fs.delete(archivePath, true);
+
+    //create files/directories with spaces
+    createFile(inputPath, "c c", fs);
+    final Path sub1 = new Path(inputPath, "sub 1");
+    fs.mkdirs(sub1);
+    createFile(sub1, "file x y z", fs);
+    createFile(sub1, "file", fs);
+    createFile(sub1, "x", fs);
+    createFile(sub1, "y", fs);
+    createFile(sub1, "z", fs);
+    final Path sub2 = new Path(inputPath, "sub 2");
+    fs.mkdirs(sub2);
+    final Configuration conf = mapred.createJobConf();
+    final FsShell shell = new FsShell(conf);
+
+    final String inputPathStr = inputPath.toUri().getPath();
+    System.out.println("inputPathStr = " + inputPathStr);
+
+    final List<String> originalPaths = lsr(shell, inputPathStr);
+    final URI uri = fs.getUri();
+    final String prefix = "har://hdfs-" + uri.getHost() +":" + uri.getPort()
+        + archivePath.toUri().getPath() + Path.SEPARATOR;
+
+    {//space replacement is not enabled
+      final String[] args = {
+          "-archiveName",
+          "fail.har",
+          "-p",
+          inputPathStr,
+          "*",
+          archivePath.toString()
+      };
+      final HadoopArchives har = new HadoopArchives(mapred.createJobConf());
+      assertEquals(-1, ToolRunner.run(har, args));
+    }
+
+    {//Enable space replacement
+      final String harName = "foo.har";
+      final String[] args = {
+          "-D" + HadoopArchives.SPACE_REPLACE_LABEL + "=true",
+          "-archiveName",
+          harName,
+          "-p",
+          inputPathStr,
+          "*",
+          archivePath.toString()
+      };
+      final HadoopArchives har = new HadoopArchives(mapred.createJobConf());
+      assertEquals(0, ToolRunner.run(har, args));
+
+      //compare results
+      final List<String> harPaths = lsr(shell, prefix + harName);
+      final List<String> replaced = replace(originalPaths, HadoopArchives.SPACE_REPLACEMENT_DEFAULT);
+      assertEquals(replaced, harPaths);
+    }
+
+    {//Replace space with space
+      final String[] args = {
+          "-D" + HadoopArchives.SPACE_REPLACE_LABEL + "=true",
+          "-D" + HadoopArchives.SPACE_REPLACEMENT_LABEL + "=p q",
+          "-Dhar.space.replace.enable=true",
+          "-archiveName",
+          "fail.har",
+          "-p",
+          inputPathStr,
+          "*",
+          archivePath.toString()
+      };
+      final HadoopArchives har = new HadoopArchives(mapred.createJobConf());
+      try {
+        ToolRunner.run(har, args);
+        fail();
+      } catch(IllegalArgumentException iae) {
+        System.out.println("GOOD");
+        iae.printStackTrace();
+      }
+    }
+
+    {//Replace space with Path.SEPARATOR
+      final String[] args = {
+          "-D" + HadoopArchives.SPACE_REPLACE_LABEL + "=true",
+          "-D" + HadoopArchives.SPACE_REPLACEMENT_LABEL + "=" + Path.SEPARATOR,
+          "-Dhar.space.replace.enable=true",
+          "-archiveName",
+          "fail.har",
+          "-p",
+          inputPathStr,
+          "*",
+          archivePath.toString()
+      };
+      final HadoopArchives har = new HadoopArchives(mapred.createJobConf());
+      try {
+        ToolRunner.run(har, args);
+        fail();
+      } catch(IllegalArgumentException iae) {
+        System.out.println("GOOD");
+        iae.printStackTrace();
+      }
+    }
+
+    {//Replace space with a valid replacement
+      final String harName = "bar.har";
+      final String replacement = "+-";
+      final String[] args = {
+          "-D" + HadoopArchives.SPACE_REPLACE_LABEL + "=true",
+          "-D" + HadoopArchives.SPACE_REPLACEMENT_LABEL + "=" + replacement,
+          "-Dhar.space.replace.enable=true",
+          "-archiveName",
+          harName,
+          "-p",
+          inputPathStr,
+          "*",
+          archivePath.toString()
+      };
+      final HadoopArchives har = new HadoopArchives(mapred.createJobConf());
+      assertEquals(0, ToolRunner.run(har, args));
+
+      //compare results
+      final List<String> harPaths = lsr(shell, prefix + harName);
+      final List<String> replaced = replace(originalPaths, replacement);
+      assertEquals(replaced, harPaths);
+    }
+  }
+
+  private static List<String> replace(List<String> paths, String replacement)
{
+    final List<String> replaced = new ArrayList<String>();
+    for(int i = 0; i < paths.size(); i++) {
+      replaced.add(paths.get(i).replace(" ", replacement));
+    }
+    Collections.sort(replaced);
+    return replaced;   
+  }
+      
+  private static List<String> lsr(final FsShell shell, String dir
+      ) throws Exception {
+    System.out.println("lsr root=" + dir);
+    final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); 
+    final PrintStream out = new PrintStream(bytes);
+    final PrintStream oldOut = System.out;
+    final PrintStream oldErr = System.err;
+    System.setOut(out);
+    System.setErr(out);
+    final String results;
+    try {
+      assertEquals(0, shell.run(new String[]{"-lsr", dir}));
+      results = bytes.toString();
+    } finally {
+      IOUtils.closeStream(out);
+      System.setOut(oldOut);
+      System.setErr(oldErr);
+    }
+    System.out.println("lsr results:\n" + results);
+
+    final String dirname = dir.substring(dir.lastIndexOf(Path.SEPARATOR));
+    final List<String> paths = new ArrayList<String>();
+    for(StringTokenizer t = new StringTokenizer(results, "\n");
+        t.hasMoreTokens(); ) {
+      final String s = t.nextToken();
+      final int i = s.indexOf(dirname);
+      if (i >= 0) {
+        paths.add(s.substring(i + dirname.length()));
+      }
+    }
+    Collections.sort(paths);
+    System.out.println("lsr paths = " + paths.toString().replace(", ", ",\n  "));
+    return paths;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java?rev=922395&r1=922394&r2=922395&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java Fri Mar 12
18:56:39 2010
@@ -18,9 +18,10 @@
 
 package org.apache.hadoop.tools;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -45,6 +46,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -87,6 +89,24 @@ public class HadoopArchives implements T
   static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
   static final String DST_HAR_LABEL = NAME + ".archive.name";
   static final String SRC_PARENT_LABEL = NAME + ".parent.path";
+
+  static final String SPACE_REPLACE_LABEL = NAME + ".space.replace.enable";
+  static final boolean SPACE_REPLACE_DEFAULT = false;
+  static final String SPACE_REPLACEMENT_LABEL = NAME + ".space.replacement";
+  static final String SPACE_REPLACEMENT_DEFAULT = "_";
+  static final String SPACE_REPLACEMENT_DESCRIPTION
+    = HadoopArchives.class.getSimpleName() + " (version=" + VERSION
+    + ") does not support source paths with the space character."
+    + "\n\nThere is a space replacement option, which can be enabled by"
+    + "\n  -D" + SPACE_REPLACE_LABEL + "=true"
+    + "\nThe space replacement string can be specified by"
+    + "\n  -D" + SPACE_REPLACEMENT_LABEL + "=<REPLACEMENT_STRING>"
+    + "\nThe default <REPLACEMENT_STRING> is \""
+    + SPACE_REPLACEMENT_DEFAULT
+    + "\".\n*** Note that the original paths will not be changed"
+    + " by the space replacement option."
+    + "  The resulted har contains only the replaced paths.";
+
   // size of each part file
   // its fixed for now.
   static final long partSize = 2 * 1024 * 1024 * 1024l;
@@ -97,6 +117,11 @@ public class HadoopArchives implements T
   
  
   private JobConf conf;
+  private String spaceReplacement = null;
+
+  private boolean isSpaceReplaceEnabled() {
+    return spaceReplacement != null;
+  }
 
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
@@ -149,12 +174,56 @@ public class HadoopArchives implements T
     }
   }
 
+  /** HarEntry is used in the {@link HArchivesMapper} as the input value. */
+  private static class HarEntry implements Writable {
+    String path;
+    String[] children;
+
+    HarEntry() {}
+    
+    HarEntry(String path, String[] children) {
+      this.path = path;
+      this.children = children;
+    }
+
+    boolean isDir() {
+      return children != null;      
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      path = Text.readString(in);
+
+      if (in.readBoolean()) {
+        children = new String[in.readInt()];
+        for(int i = 0; i < children.length; i++) {
+          children[i] = Text.readString(in);
+        }
+      } else {
+        children = null;
+      }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, path);
+
+      final boolean dir = isDir();
+      out.writeBoolean(dir);
+      if (dir) {
+        out.writeInt(children.length);
+        for(String c : children) {
+          Text.writeString(out, c);
+        }
+      }
+    }
+  }
+
   /**
    * Input format of a hadoop archive job responsible for 
    * generating splits of the file list
    */
-
-  static class HArchiveInputFormat implements InputFormat<LongWritable, Text> {
+  static class HArchiveInputFormat implements InputFormat<LongWritable, HarEntry> {
 
     //generate input splits from the src file lists
     public InputSplit[] getSplits(JobConf jconf, int numSplits)
@@ -174,7 +243,7 @@ public class HadoopArchives implements T
       FileStatus fstatus = fs.getFileStatus(src);
       ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
       LongWritable key = new LongWritable();
-      Text value = new Text();
+      final HarEntry value = new HarEntry();
       SequenceFile.Reader reader = null;
       // the remaining bytes in the file split
       long remaining = fstatus.getLen();
@@ -211,9 +280,10 @@ public class HadoopArchives implements T
       return splits.toArray(new FileSplit[splits.size()]);
     }
 
-    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
+    @Override
+    public RecordReader<LongWritable, HarEntry> getRecordReader(InputSplit split,
         JobConf job, Reporter reporter) throws IOException {
-      return new SequenceFileRecordReader<LongWritable, Text>(job,
+      return new SequenceFileRecordReader<LongWritable, HarEntry>(job,
                  (FileSplit)split);
     }
   }
@@ -244,12 +314,13 @@ public class HadoopArchives implements T
    * @param fullPath the full path
    * @param root the prefix root to be truncated
    * @return the relative path
+   * @throws IOException 
    */
-  private Path relPathToRoot(Path fullPath, Path root) {
+  private String relPathToRoot(Path fullPath, Path root) throws IOException {
     // just take some effort to do it 
     // rather than just using substring 
     // so that we do not break sometime later
-    Path justRoot = new Path(Path.SEPARATOR);
+    final String justRoot = Path.SEPARATOR;
     if (fullPath.depth() == root.depth()) {
       return justRoot;
     }
@@ -260,7 +331,7 @@ public class HadoopArchives implements T
         retPath = new Path(parent.getName(), retPath);
         parent = parent.getParent();
       }
-      return new Path(justRoot, retPath);
+      return new Path(justRoot, retPath.toString()).toString();
     }
     return null;
   }
@@ -331,19 +402,36 @@ public class HadoopArchives implements T
     }
     Set<Map.Entry<String, HashSet<String>>> keyVals = allpaths.entrySet();
     for (Map.Entry<String, HashSet<String>> entry : keyVals) {
-      Path relPath = relPathToRoot(new Path(entry.getKey()), parentPath);
+      final String relPath = relPathToRoot(new Path(entry.getKey()), parentPath);
       if (relPath != null) {
-        String toWrite = relPath + " dir ";
-        HashSet<String> children = entry.getValue();
-        StringBuffer sbuff = new StringBuffer();
-        sbuff.append(toWrite);
-        for (String child: children) {
-          sbuff.append(child + " ");
+        final String[] children = new String[entry.getValue().size()];
+        int i = 0;
+        for(String child: entry.getValue()) {
+          children[i++] = child;
         }
-        toWrite = sbuff.toString();
-        srcWriter.append(new LongWritable(0L), new Text(toWrite));
+        append(srcWriter, 0L, relPath, children);
+      }
+    }
+  }
+
+  //check whether the path contains the space character.
+  private void checkSpace(String p) throws IOException {
+    //check only if space replacement is disabled.
+    if (!isSpaceReplaceEnabled() && p.indexOf(' ') >= 0) {
+      throw new IOException("Source \"" + p + "\" contains the space character.  "
+          + SPACE_REPLACEMENT_DESCRIPTION);
+    }
+  }
+
+  private void append(SequenceFile.Writer srcWriter, long len,
+      String path, String[] children) throws IOException {
+    checkSpace(path);
+    if (children != null) {
+      for(String child: children) {
+        checkSpace(child);
       }
     }
+    srcWriter.append(new LongWritable(len), new HarEntry(path, children));
   }
     
   /**
@@ -431,7 +519,7 @@ public class HadoopArchives implements T
     Path srcFiles = new Path(jobDirectory, "_har_src_files");
     conf.set(SRC_LIST_LABEL, srcFiles.toString());
     SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
-        srcFiles, LongWritable.class, Text.class, 
+        srcFiles, LongWritable.class, HarEntry.class, 
         SequenceFile.CompressionType.NONE);
     // get the list of files 
     // create single list of files and dirs
@@ -451,24 +539,21 @@ public class HadoopArchives implements T
         recursivels(fs, fdir, allFiles);
         for (FileStatusDir statDir: allFiles) {
           FileStatus stat = statDir.getFileStatus();
-          String toWrite = "";
           long len = stat.isDir()? 0:stat.getLen();
+          final String path = relPathToRoot(stat.getPath(), parentPath);
+          final String[] children;
           if (stat.isDir()) {
-            toWrite = "" + relPathToRoot(stat.getPath(), parentPath) + " dir ";
             //get the children 
             FileStatus[] list = statDir.getChildren();
-            StringBuffer sbuff = new StringBuffer();
-            sbuff.append(toWrite);
-            for (FileStatus stats: list) {
-              sbuff.append(stats.getPath().getName() + " ");
+            children = new String[list.length];
+            for (int i = 0; i < list.length; i++) {
+              children[i] = list[i].getPath().getName();
             }
-            toWrite = sbuff.toString();
           }
           else {
-            toWrite +=  relPathToRoot(stat.getPath(), parentPath) + " file ";
+            children = null;
           }
-          srcWriter.append(new LongWritable(len), new 
-              Text(toWrite));
+          append(srcWriter, len, path, children);
           srcWriter.sync();
           numFiles++;
           totalSize += len;
@@ -505,8 +590,10 @@ public class HadoopArchives implements T
   }
 
   static class HArchivesMapper 
-  implements Mapper<LongWritable, Text, IntWritable, Text> {
+  implements Mapper<LongWritable, HarEntry, IntWritable, Text> {
     private JobConf conf = null;
+    private String spaceReplacement;
+
     int partId = -1 ; 
     Path tmpOutputDir = null;
     Path tmpOutput = null;
@@ -523,6 +610,9 @@ public class HadoopArchives implements T
     // tmp files. 
     public void configure(JobConf conf) {
       this.conf = conf;
+      this.spaceReplacement = conf.get(SPACE_REPLACEMENT_LABEL,
+          SPACE_REPLACEMENT_DEFAULT);
+
       // this is tightly tied to map reduce
       // since it does not expose an api 
       // to get the partition
@@ -565,28 +655,6 @@ public class HadoopArchives implements T
         fsin.close();
       }
     }
-       
-    static class MapStat {
-      private String pathname;
-      private boolean isDir;
-      private List<String> children;
-      public MapStat(String line) {
-        String[] splits = line.split(" ");
-        pathname = splits[0];
-        if ("dir".equals(splits[1])) {
-          isDir = true;
-        }
-        else {
-          isDir = false;
-        }
-        if (isDir) {
-          children = new ArrayList<String>();
-          for (int i = 2; i < splits.length; i++) {
-            children.add(splits[i]);
-          }
-        }
-      }
-    }
     
     /**
      * get rid of / in the beginning of path
@@ -601,27 +669,29 @@ public class HadoopArchives implements T
       return new Path(parent, new Path(p.toString().substring(1)));
     }
 
+    private String replaceSpaces(String s) {
+      return s.replace(" ", spaceReplacement);
+    }
+
     // read files from the split input 
     // and write it onto the part files.
     // also output hash(name) and string 
     // for reducer to create index 
     // and masterindex files.
-    public void map(LongWritable key, Text value,
+    public void map(LongWritable key, HarEntry value,
         OutputCollector<IntWritable, Text> out,
         Reporter reporter) throws IOException {
-      String line  = value.toString();
-      MapStat mstat = new MapStat(line);
-      Path relPath = new Path(mstat.pathname);
+      Path relPath = new Path(value.path);
       int hash = HarFileSystem.getHarHash(relPath);
-      String towrite = null;
+      String towrite = replaceSpaces(relPath.toString());
       Path srcPath = realPath(relPath, rootPath);
       long startPos = partStream.getPos();
-      if (mstat.isDir) { 
-        towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " ";
+      if (value.isDir()) { 
+        towrite += " dir none " + 0 + " " + 0 + " ";
         StringBuffer sbuff = new StringBuffer();
         sbuff.append(towrite);
-        for (String child: mstat.children) {
-          sbuff.append(child + " ");
+        for (String child: value.children) {
+          sbuff.append(replaceSpaces(child) + " ");
         }
         towrite = sbuff.toString();
         //reading directories is also progress
@@ -634,7 +704,7 @@ public class HadoopArchives implements T
         reporter.setStatus("Copying file " + srcStatus.getPath() + 
             " to archive.");
         copyData(srcStatus.getPath(), input, partStream, reporter);
-        towrite = relPath.toString() + " file " + partname + " " + startPos
+        towrite += " file " + partname + " " + startPos
         + " " + srcStatus.getLen() + " ";
       }
       out.collect(new IntWritable(hash), new Text(towrite));
@@ -801,6 +871,23 @@ public class HadoopArchives implements T
           }
         }
       }
+
+      //process space replacement configuration
+      if (conf.getBoolean(SPACE_REPLACE_LABEL, SPACE_REPLACE_DEFAULT)) {
+        spaceReplacement = conf.get(SPACE_REPLACEMENT_LABEL,
+            SPACE_REPLACEMENT_DEFAULT);
+        if (spaceReplacement.indexOf(' ') >= 0) {
+          throw new IllegalArgumentException("spaceReplacement = \""
+              + spaceReplacement + "\" cannot contain the space character.");
+        }
+        if (spaceReplacement.indexOf(Path.SEPARATOR) >= 0) {
+          throw new IllegalArgumentException("spaceReplacement = \""
+              + spaceReplacement + "\" cannot contain the path separator \""
+              + Path.SEPARATOR + "\".");
+        }
+        LOG.info(SPACE_REPLACEMENT_LABEL + " = " + spaceReplacement);
+      }
+
       archive(parentPath, globPaths, archiveName, destPath);
     } catch(IOException ie) {
       System.err.println(ie.getLocalizedMessage());



Mime
View raw message