hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r447626 [1/3] - in /lucene/hadoop/trunk: ./ src/contrib/ src/contrib/streaming/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/
Date Mon, 18 Sep 2006 23:08:08 GMT
Author: cutting
Date: Mon Sep 18 16:08:06 2006
New Revision: 447626

URL: http://svn.apache.org/viewvc?view=rev&rev=447626
Log:
HADOOP-542.  Omnibus patch for contrib/streaming.  Contributed by Michel.

Added:
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/CompoundDirSpec.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MuxOutputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/src/contrib/build-contrib.xml
    lucene/hadoop/trunk/src/contrib/streaming/build.xml
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Sep 18 16:08:06 2006
@@ -23,6 +23,10 @@
 6. HADOOP-527.  Permit specification of the local address that various
    Hadoop daemons should bind to.  (Philippe Gassmann via cutting)
 
+7. HADOOP-542.  Updates to contrib/streaming: reformatted source code,
+   on-the-fly merge sort, a fix for HADOOP-540, etc.
+   (Michel Tourn via cutting)
+
 
 Release 0.6.2 (unreleased)
 

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Mon Sep 18 16:08:06 2006
@@ -317,7 +317,7 @@
 
   </target>   
 
-  <target name="test-contrib" depends="compile-core">
+  <target name="test-contrib" depends="compile-core, compile-core-test">
     <subant target="test">
         <fileset file="${basedir}/src/contrib/build.xml"/>
     </subant>  	

Modified: lucene/hadoop/trunk/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/build-contrib.xml?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/build-contrib.xml (original)
+++ lucene/hadoop/trunk/src/contrib/build-contrib.xml Mon Sep 18 16:08:06 2006
@@ -43,7 +43,7 @@
     <fileset refid="lib.jars"/>
     <pathelement location="${hadoop.root}/build/classes"/>
     <fileset dir="${hadoop.root}/lib">
-      <include name="*.jar" />
+      <include name="**/*.jar" />
     </fileset>
   </path>
 
@@ -131,7 +131,7 @@
   <!-- ================================================================== -->
   <target name="test" depends="compile-test, deploy" if="test.available">
     <echo message="contrib: ${name}"/>
-    <junit 
+    <junit
       printsummary="withOutAndErr" haltonfailure="no" fork="yes"
       errorProperty="tests.failed" failureProperty="tests.failed">
       
@@ -143,6 +143,9 @@
       -->
       <sysproperty key="user.dir" value="${build.test}/data"/>
       
+      <sysproperty key="fs.default.name" value="${fs.default.name}"/>
+      <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
+   
       <classpath refid="test.classpath"/>
       <formatter type="plain" />
       <batchtest todir="${build.test}" unless="testcase">

Modified: lucene/hadoop/trunk/src/contrib/streaming/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/build.xml?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/build.xml (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/build.xml Mon Sep 18 16:08:06 2006
@@ -1,5 +1,9 @@
 <?xml version="1.0"?>
 
+<!-- 
+Before you can run these subtargets directly, you need 
+to call at top-level: ant deploy-contrib compile-core-test
+-->
 <project name="streaming" default="jar">
 
   <import file="../build-contrib.xml"/>
@@ -15,5 +19,23 @@
 	</manifest>
     </jar>
   </target>
-  
+
+  <!-- Run only pure-Java unit tests. superdottest -->
+  <target name="test">
+   <antcall target="hadoopbuildcontrib.test"> 
+     <param name="test.exclude" value="TestStreamedMerge"/>
+   </antcall>
+  </target>  
+ 
+  <!-- Run all unit tests
+  This is not called as part of the nightly build
+  because it will only run on platforms that have standard 
+  Unix utilities available. 
+  -->
+ <target name="test-unix">
+   <antcall target="hadoopbuildcontrib.test">
+   </antcall>
+ </target>  
+
+
 </project>

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/CompoundDirSpec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/CompoundDirSpec.java?view=auto&rev=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/CompoundDirSpec.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/CompoundDirSpec.java Mon Sep 18 16:08:06 2006
@@ -0,0 +1,270 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+
+/** Parses a -input &lt;spec> that determines the DFS paths that will 
+ be accessed by a MergedInputFormat.<br>
+ CompoundDirSpec.getPaths() is a 2-D ragged array of DFS paths.<br>
+ One of the paths is the <b>primary</b> and can contain a globbing pattern
+ to match multiple files.<br>
+ The other paths are <b>secondary</b> and must indicate either a directory or a single file.
+ During execution secondary files are computed to be the secondary path 
+ plus the primary non-qualified filename.
+ Example: <tt>
+ -input "/filter/colsx NULL | +/batch1/colsx/* /batch1/colsy/"
+ -input "/filter/colsx NULL | +/batch2/colsx/* /batch2/colsy/"
+ </tt>
+ Files and contents:<tt>
+ /filter/colsx/part-00000:
+ /batch1/colsx/part-00000:
+ /batch1/colsy/part-00000:
+ /batch2/colsx/part-00000:
+ /batch2/colsy/part-00000:
+ </tt>
+ Mapper input:<tt>
+ </tt>
+ Side-effect outputs with Identity "mapper":<tt>
+
+ </tt>
+ @author Michel Tourn
+ */
+class CompoundDirSpec {
+
+  // Keep the Usage messages and docs in sync!
+  public final static String MERGEGLOB_PREFIX = "||";
+  public final static String MERGE_SEP = "|";
+  public final static String COL_SEP = " ";
+  public final static String PRIMARY_PREFIX = "+";
+
+  CompoundDirSpec(String argSpec, boolean isInputSpec) {
+    argSpec_ = argSpec;
+    isInputSpec_ = isInputSpec;
+
+    direction_ = isInputSpec_ ? "input" : "output";
+    parse();
+  }
+
+  public void parse() throws IllegalStateException {
+    String[] mergerSpecs = argSpec_.split(StreamUtil.regexpEscape(MERGE_SEP));
+
+    int msup = mergerSpecs.length;
+    paths_ = new String[msup][];
+
+    if (msup == 0) {
+      throw new IllegalStateException("A -" + direction_ + " spec needs at list one path");
+    }
+    if (false == isInputSpec_) {
+      if (msup > 1) {
+        throw new IllegalStateException("A -output spec cannot use merged streams ('" + MERGE_SEP
+            + "' delimiter)");
+      }
+    }
+    for (int m = 0; m < msup; m++) {
+      String merged = mergerSpecs[m];
+      merged = merged.trim();
+      String[] colSpecs = merged.split(StreamUtil.regexpEscape(COL_SEP));
+      int csup = colSpecs.length;
+      if (csup == 0) {
+        throw new IllegalStateException("A -input spec needs at list one path spec per |<column>|");
+      }
+      paths_[m] = new String[csup];
+      for (int c = 0; c < csup; c++) {
+        String spec = colSpecs[c];
+        if (spec.startsWith(PRIMARY_PREFIX)) {
+          // for (!isInputSpec_) the tuple paths should be symmetric.
+          // but we still allow user to specify one in case setOutputDir makes a difference
+          if (prow_ != NA) {
+            throwBadNumPrimaryInputSpecs();
+          }
+          spec = spec.substring(PRIMARY_PREFIX.length());
+          prow_ = m;
+          pcol_ = c;
+        }
+        paths_[m][c] = spec;
+      }
+    }
+    if (prow_ == NA) {
+      if (!isInputSpec_) {
+        // pick an 'arbitrary' one -- the tuple paths should be symmetric.
+        prow_ = 0;
+        pcol_ = 0;
+      } else if (msup == 1 && paths_[0].length == 1) {
+        // pick the only one available. That's also bw-compatible syntax
+        prow_ = 0;
+        pcol_ = 0;
+      } else {
+        throwBadNumPrimaryInputSpecs();
+      }
+    }
+  }
+
+  void throwBadNumPrimaryInputSpecs() throws IllegalStateException {
+    String msg = "A compound -input spec needs exactly one primary path prefixed with "
+        + PRIMARY_PREFIX;
+    msg += ":\n";
+    msg += toTableString();
+    throw new IllegalStateException(msg);
+  }
+
+  // TBD need to decide early whether they are dirs or files or globs?
+  public void validatePaths(FileSystem fs) {
+    int rsup = paths_.length;
+    for (int r = 0; r < rsup; r++) {
+      int csup = paths_[r].length;
+      for (int c = 0; c < csup; c++) {
+        String path = paths_[r][c];
+      }
+    }
+  }
+
+  public int primaryRow() {
+    return prow_;
+  }
+
+  public int primaryCol() {
+    return pcol_;
+  }
+
+  public String primarySpec() {
+    return paths_[prow_][pcol_];
+  }
+  
+  /*
+   Example input spec in table form:
+   <1 +[/input/part-00] 
+   <2  [/input/part-01] 
+   <3  [/input/part-02] 
+   Example output spec in table form:
+   +[/my.output] 
+   */
+  public String toTableString() {
+    StringBuffer buf = new StringBuffer();
+    int maxWid = 0;
+    for (int pass = 1; pass <= 2; pass++) {
+      int rsup = paths_.length;
+      for (int r = 0; r < rsup; r++) {
+        int csup = paths_[r].length;
+        for (int c = 0; c < csup; c++) {
+          String cell = "[" + paths_[r][c] + "]";
+          if (r == prow_ && c == pcol_) {
+            cell = PRIMARY_PREFIX + cell;
+          } else {
+            cell = StreamUtil.rjustify(cell, cell.length() + PRIMARY_PREFIX.length());
+          }
+          if (isInputSpec_) {
+            // channels are for tagged input streams: r-based
+            if (rsup > 1) {
+              String channel = "<" + (r + 1);
+              cell = channel + " " + cell;
+            }
+          } else {
+            // channels are for columns (multiple files) c-based
+            if (csup > 1) {
+              String channel = ">" + (c + 1);
+              cell = channel + " " + cell;
+            }
+          }
+          if (pass == 2) {
+            cell = StreamUtil.ljustify(cell, maxWid);
+            buf.append(cell);
+            buf.append(" ");
+          } else {
+            if (cell.length() > maxWid) {
+              maxWid = cell.length();
+            }
+          }
+        }
+        if (pass == 2) {
+          buf.append("\n");
+        }
+      }
+    }
+    return buf.toString();
+  }
+
+  /** 
+   @see #primaryRow 
+   @see #primaryCol
+   */
+  public String[][] getPaths() {
+    return paths_;
+  }
+
+  // ==== Static helpers that depend on a JobConf. ====
+  
+  // Unlike CompoundDirSpec.parse() which is reexecuted at Task runtime,
+  // this is expanded once in advance and relies on client-side DFS access.
+  // Main reason is we need to choose a primary input file at submission time. 
+  public static String expandGlobInputSpec(String inputSpec, JobConf job)
+  {
+    inputSpec = inputSpec.trim();
+    if(!inputSpec.startsWith(MERGEGLOB_PREFIX)) {
+      return inputSpec;
+    }
+    inputSpec = inputSpec.substring(MERGEGLOB_PREFIX.length());
+    // TODO use upcoming DFSShell wildcarding code..
+    return inputSpec;
+  }
+  
+  // find the -input statement that contains the job's split
+  // TODO test with globbing / directory /single file
+  public static CompoundDirSpec findInputSpecForPrimary(String primary, JobConf job) {
+    int num = job.getInt("stream.numinputspecs", -1);
+    for (int s = 0; s < num; s++) {
+      String specStr = job.get("stream.inputspecs." + s);
+      CompoundDirSpec spec = new CompoundDirSpec(specStr, true);
+      if (pathsMatch(spec.primarySpec(), primary, job)) {
+        return spec;
+      }
+    }
+    return null;
+  }
+
+  // There can be only one output spec but this provides some server-side validation
+  public static CompoundDirSpec findOutputSpecForPrimary(String primary, JobConf job) {
+    String specStr = job.get("stream.outputspec");
+    CompoundDirSpec spec = new CompoundDirSpec(specStr, false);
+    if (pathsMatch(spec.primarySpec(), primary, job)) {
+      return spec;
+    }
+    return spec;
+  }
+
+  static boolean pathsMatch(String s1, String s2, JobConf job) {
+    boolean isLocalFS = job.get("fs.default.name", "").equals("local");
+    if (isLocalFS) {
+      s1 = StreamUtil.safeGetCanonicalPath(new File(s1));
+      s2 = StreamUtil.safeGetCanonicalPath(new File(s2));
+    }
+    return (s1.equals(s2));
+  }
+
+  final static int NA = -1;
+
+  String argSpec_;
+  boolean isInputSpec_;
+
+  String direction_;
+  String[][] paths_;
+  int prow_ = NA;
+  int pcol_ = NA;
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java Mon Sep 18 16:08:06 2006
@@ -23,89 +23,78 @@
 /*
  * If we move to Java 1.5, we can get rid of this class and just use System.getenv
  */
-public class Environment extends Properties
-{
-   public Environment()
-      throws IOException
-   {
-      // Extend this code to fit all operating
-      // environments that you expect to run in
-      // http://lopica.sourceforge.net/os.html
-      String command = null;
-      String OS = System.getProperty("os.name");
-      String lowerOs = OS.toLowerCase();
-      if (OS.indexOf("Windows") > -1) {
-         command = "cmd /C set";
-      } else if (lowerOs.indexOf("ix") > -1 || lowerOs.indexOf("linux") > -1 
-        || lowerOs.indexOf("freebsd") > -1
-        || lowerOs.indexOf("sunos") > -1 || lowerOs.indexOf("solaris") > -1
-        || lowerOs.indexOf("hp-ux") > -1) {
-         command = "env";
-      } else if(lowerOs.startsWith("mac os x")) {
-         command = "env";
-      } else {
-         // Add others here
-      }
+public class Environment extends Properties {
 
-      if (command == null) {
-         throw new RuntimeException("Operating system " + OS
-            + " not supported by this class");
-      }
+  public Environment() throws IOException {
+    // Extend this code to fit all operating
+    // environments that you expect to run in
+    // http://lopica.sourceforge.net/os.html
+    String command = null;
+    String OS = System.getProperty("os.name");
+    String lowerOs = OS.toLowerCase();
+    if (OS.indexOf("Windows") > -1) {
+      command = "cmd /C set";
+    } else if (lowerOs.indexOf("ix") > -1 || lowerOs.indexOf("linux") > -1
+        || lowerOs.indexOf("freebsd") > -1 || lowerOs.indexOf("sunos") > -1
+        || lowerOs.indexOf("solaris") > -1 || lowerOs.indexOf("hp-ux") > -1) {
+      command = "env";
+    } else if (lowerOs.startsWith("mac os x")) {
+      command = "env";
+    } else {
+      // Add others here
+    }
 
-      // Read the environment variables
+    if (command == null) {
+      throw new RuntimeException("Operating system " + OS + " not supported by this class");
+    }
 
-      Process pid = Runtime.getRuntime().exec(command);
-      BufferedReader in =
-         new BufferedReader(
-         new InputStreamReader(
-         pid.getInputStream()));
-      while(true) {
-         String line = in.readLine();
-         if (line == null)
-            break;
-         int p = line.indexOf("=");
-         if (p != -1) {
-            String name = line.substring(0, p);
-            String value = line.substring(p+1);
-            setProperty(name, value);
-         }
+    // Read the environment variables
+
+    Process pid = Runtime.getRuntime().exec(command);
+    BufferedReader in = new BufferedReader(new InputStreamReader(pid.getInputStream()));
+    while (true) {
+      String line = in.readLine();
+      if (line == null) break;
+      int p = line.indexOf("=");
+      if (p != -1) {
+        String name = line.substring(0, p);
+        String value = line.substring(p + 1);
+        setProperty(name, value);
       }
-      in.close();
+    }
+    in.close();
+    try {
+      pid.waitFor();
+    } catch (InterruptedException e) {
+      throw new IOException(e.getMessage());
+    }
+  }
+
+  // to be used with Runtime.exec(String[] cmdarray, String[] envp) 
+  String[] toArray() {
+    String[] arr = new String[super.size()];
+    Enumeration it = super.keys();
+    int i = -1;
+    while (it.hasMoreElements()) {
+      String key = (String) it.nextElement();
+      String val = (String) get(key);
+      i++;
+      arr[i] = key + "=" + val;
+    }
+    return arr;
+  }
+
+  public String getHost() {
+    String host = getProperty("HOST");
+    if (host == null) {
+      // HOST isn't always in the environment
       try {
-         pid.waitFor();
-      }
-      catch (InterruptedException e) {
-         throw new IOException(e.getMessage());
+        host = InetAddress.getLocalHost().getHostName();
+      } catch (IOException io) {
+        io.printStackTrace();
       }
-   }
-   
-   // to be used with Runtime.exec(String[] cmdarray, String[] envp) 
-   String[] toArray()
-   {
-     String[] arr = new String[super.size()];
-     Enumeration it = super.keys();
-     int i = -1;
-     while(it.hasMoreElements()) {
-        String key = (String)it.nextElement();
-        String val = (String)get(key);
-        i++;   
-        arr[i] = key + "=" + val;
-     }     
-     return arr;
-   }
-   
-   public String getHost()
-   {
-     String host = getProperty("HOST");
-     if(host == null) {
-       // HOST isn't always in the environment
-       try {
-         host = InetAddress.getLocalHost().getHostName();
-       } catch(IOException io) {
-         io.printStackTrace();
-       }
-     }
-     return host;
-   }
-   
-} 
+    }
+    return host;
+  }
+
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Mon Sep 18 16:08:06 2006
@@ -19,13 +19,12 @@
 import java.io.IOException;
 
 /** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
-*/
-public class HadoopStreaming 
-{
-  public static void main(String[] args) throws IOException
-  {
+ */
+public class HadoopStreaming {
+
+  public static void main(String[] args) throws IOException {
     boolean mayExit = true;
     StreamJob job = new StreamJob(args, mayExit);
     job.go();
-  }  
+  }
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java Mon Sep 18 16:08:06 2006
@@ -21,190 +21,175 @@
 import java.util.jar.*;
 import java.util.zip.ZipException;
 
-public class JarBuilder
-{
-    public JarBuilder()
-    {
-    }
-
-    public void setVerbose(boolean v)
-    {
-        this.verbose = v;
-    }
-    
-    public void merge(List srcNames, List srcUnjar, String dstJar) 
-        throws IOException
-    {
-        String source = null;
-        JarOutputStream jarOut = null;
-        JarFile jarSource = null;
-        jarOut = new JarOutputStream(new FileOutputStream(dstJar));
-        boolean throwing = false;
-        try {
-          if(srcNames != null) {
-            Iterator iter = srcNames.iterator(); 
-            while(iter.hasNext()) {
-                source = (String)iter.next();
-                File fsource = new File(source);
-                String base = getBasePathInJarOut(source);
-                if(!fsource.exists()) {
-                    throwing = true;
-                    throw new FileNotFoundException(fsource.getAbsolutePath());
-                }
-                if(fsource.isDirectory()) {
-                    addDirectory(jarOut, base, fsource, 0);
-                } else {
-                    addFileStream(jarOut, base, fsource);
-                }
-            }
+public class JarBuilder {
+
+  public JarBuilder() {
+  }
+
+  public void setVerbose(boolean v) {
+    this.verbose = v;
+  }
+
+  public void merge(List srcNames, List srcUnjar, String dstJar) throws IOException {
+    String source = null;
+    JarOutputStream jarOut = null;
+    JarFile jarSource = null;
+    jarOut = new JarOutputStream(new FileOutputStream(dstJar));
+    boolean throwing = false;
+    try {
+      if (srcNames != null) {
+        Iterator iter = srcNames.iterator();
+        while (iter.hasNext()) {
+          source = (String) iter.next();
+          File fsource = new File(source);
+          String base = getBasePathInJarOut(source);
+          if (!fsource.exists()) {
+            throwing = true;
+            throw new FileNotFoundException(fsource.getAbsolutePath());
           }
-          if(srcUnjar != null) {
-            Iterator iter = srcUnjar.iterator(); 
-            while(iter.hasNext()) {
-                source = (String)iter.next();
-                jarSource = new JarFile(source);
-                addJarEntries(jarOut, jarSource);
-                jarSource.close();
-            }
-          
+          if (fsource.isDirectory()) {
+            addDirectory(jarOut, base, fsource, 0);
+          } else {
+            addFileStream(jarOut, base, fsource);
           }
-        } finally {
-            try {
-              jarOut.close();
-            } catch(ZipException z) {
-                if(! throwing) {
-                    throw new IOException(z.toString());
-                }
-            }
         }
+      }
+      if (srcUnjar != null) {
+        Iterator iter = srcUnjar.iterator();
+        while (iter.hasNext()) {
+          source = (String) iter.next();
+          jarSource = new JarFile(source);
+          addJarEntries(jarOut, jarSource);
+          jarSource.close();
+        }
+
+      }
+    } finally {
+      try {
+        jarOut.close();
+      } catch (ZipException z) {
+        if (!throwing) {
+          throw new IOException(z.toString());
+        }
+      }
     }
+  }
 
-    protected String fileExtension(String file)
-    {
-    	int leafPos = file.lastIndexOf('/');
-    	if(leafPos == file.length()-1) return "";
-        String leafName = file.substring(leafPos+1);
-    	int dotPos = leafName.lastIndexOf('.');
-    	if(dotPos == -1) return "";
-        String ext = leafName.substring(dotPos+1);
-    	return ext;
-    }
-    
-    /** @return empty or a jar base path. Must not start with '/' */
-    protected String getBasePathInJarOut(String sourceFile)
-    {
-        // TaskRunner will unjar and append to classpath: .:classes/:lib/*    	
-    	String ext = fileExtension(sourceFile);
-    	if(ext.equals("class")) {
-    		return "classes/"; // or ""
-        } else if(ext.equals("jar") || ext.equals("zip")) {
-    		return "lib/";
-    	} else {
-            return "";
-        }
-    }
-    
-    private void addJarEntries(JarOutputStream dst, JarFile src)
-        throws IOException
-    {
-        Enumeration entries = src.entries();
-        JarEntry entry = null;
-        while(entries.hasMoreElements()) {
-            entry = (JarEntry)entries.nextElement();
-            //if(entry.getName().startsWith("META-INF/")) continue; 
-            InputStream in = src.getInputStream(entry);
-            addNamedStream(dst, entry.getName(), in);
-        }
-    }
-    
-    /** @param name path in jar for this jar element. Must not start with '/' */
-    void addNamedStream(JarOutputStream dst, String name, InputStream in) 
-        throws IOException
-    {
-        if(verbose) {
-            System.err.println("JarBuilder.addNamedStream " + name);
-        }
-        try {
-          dst.putNextEntry(new JarEntry(name));
-          int bytesRead = 0;
-          while((bytesRead = in.read(buffer, 0, BUFF_SIZE)) != -1) {
-              dst.write(buffer, 0, bytesRead);
-          }
-        } catch(ZipException ze) {
-            if(ze.getMessage().indexOf("duplicate entry") >= 0) {
-              if(verbose) {
-                  System.err.println(ze + " Skip duplicate entry " + name);
-              }
-            } else {
-                throw ze;
-            }
-        } finally {
-          in.close();
-          dst.flush();
-          dst.closeEntry();        
+  protected String fileExtension(String file) {
+    int leafPos = file.lastIndexOf('/');
+    if (leafPos == file.length() - 1) return "";
+    String leafName = file.substring(leafPos + 1);
+    int dotPos = leafName.lastIndexOf('.');
+    if (dotPos == -1) return "";
+    String ext = leafName.substring(dotPos + 1);
+    return ext;
+  }
+
+  /** @return empty or a jar base path. Must not start with '/' */
+  protected String getBasePathInJarOut(String sourceFile) {
+    // TaskRunner will unjar and append to classpath: .:classes/:lib/*    	
+    String ext = fileExtension(sourceFile);
+    if (ext.equals("class")) {
+      return "classes/"; // or ""
+    } else if (ext.equals("jar") || ext.equals("zip")) {
+      return "lib/";
+    } else {
+      return "";
+    }
+  }
+
+  private void addJarEntries(JarOutputStream dst, JarFile src) throws IOException {
+    Enumeration entries = src.entries();
+    JarEntry entry = null;
+    while (entries.hasMoreElements()) {
+      entry = (JarEntry) entries.nextElement();
+      //if(entry.getName().startsWith("META-INF/")) continue; 
+      InputStream in = src.getInputStream(entry);
+      addNamedStream(dst, entry.getName(), in);
+    }
+  }
+
+  /** @param name path in jar for this jar element. Must not start with '/' */
+  void addNamedStream(JarOutputStream dst, String name, InputStream in) throws IOException {
+    if (verbose) {
+      System.err.println("JarBuilder.addNamedStream " + name);
+    }
+    try {
+      dst.putNextEntry(new JarEntry(name));
+      int bytesRead = 0;
+      while ((bytesRead = in.read(buffer, 0, BUFF_SIZE)) != -1) {
+        dst.write(buffer, 0, bytesRead);
+      }
+    } catch (ZipException ze) {
+      if (ze.getMessage().indexOf("duplicate entry") >= 0) {
+        if (verbose) {
+          System.err.println(ze + " Skip duplicate entry " + name);
+        }
+      } else {
+        throw ze;
+      }
+    } finally {
+      in.close();
+      dst.flush();
+      dst.closeEntry();
+    }
+  }
+
+  void addFileStream(JarOutputStream dst, String jarBaseName, File file) throws IOException {
+    FileInputStream in = new FileInputStream(file);
+    String name = jarBaseName + file.getName();
+    addNamedStream(dst, name, in);
+    in.close();
+  }
+
+  void addDirectory(JarOutputStream dst, String jarBaseName, File dir, int depth) throws IOException {
+    File[] contents = dir.listFiles();
+    if (contents != null) {
+      for (int i = 0; i < contents.length; i++) {
+        File f = contents[i];
+        String fBaseName = (depth == 0) ? "" : dir.getName();
+        if (jarBaseName.length() > 0) {
+          fBaseName = jarBaseName + "/" + fBaseName;
         }
+        if (f.isDirectory()) {
+          addDirectory(dst, fBaseName, f, depth + 1);
+        } else {
+          addFileStream(dst, fBaseName + "/", f);
+        }
+      }
     }
+  }
 
-    void addFileStream(JarOutputStream dst, String jarBaseName, File file) 
-        throws IOException 
-    {
-    	FileInputStream in = new FileInputStream(file);
-    	String name = jarBaseName + file.getName();
-    	addNamedStream(dst, name, in);
-    	in.close();
-    }
-    
-    void addDirectory(JarOutputStream dst, String jarBaseName, File dir, int depth) 
-        throws IOException
-    {
-    	File[] contents = dir.listFiles();
-    	if(contents != null) {
-    		for(int i=0; i<contents.length; i++) {
-    			File f = contents[i];
-    			String fBaseName = (depth==0) ? "" : dir.getName();
-    			if(jarBaseName.length()>0) {
-    				fBaseName = jarBaseName + "/" + fBaseName;
-    			}
-    			if(f.isDirectory()) {
-    				addDirectory(dst, fBaseName, f, depth+1);
-    			} else {
-    				addFileStream(dst, fBaseName+"/", f);
-    			}
-    		}
-    	}
-    }
-
-    /** Test program */    
-    public static void main(String args[])
-    {
-        // args = new String[] { "C:/Temp/merged.jar", "C:/jdk1.5.0/jre/lib/ext/dnsns.jar",  "/Temp/addtojar2.log", "C:/jdk1.5.0/jre/lib/ext/mtest.jar", "C:/Temp/base"};
-        if(args.length < 2) {
-            System.err.println("Usage: JarFiles merged.jar [src.jar | dir | file ]+");
+  /** Test program */
+  public static void main(String args[]) {
+    // args = new String[] { "C:/Temp/merged.jar", "C:/jdk1.5.0/jre/lib/ext/dnsns.jar",  "/Temp/addtojar2.log", "C:/jdk1.5.0/jre/lib/ext/mtest.jar", "C:/Temp/base"};
+    if (args.length < 2) {
+      System.err.println("Usage: JarFiles merged.jar [src.jar | dir | file ]+");
+    } else {
+      JarBuilder jarFiles = new JarBuilder();
+      List names = new ArrayList();
+      List unjar = new ArrayList();
+      for (int i = 1; i < args.length; i++) {
+        String f = args[i];
+        String ext = jarFiles.fileExtension(f);
+        boolean expandAsJar = ext.equals("jar") || ext.equals("zip");
+        if (expandAsJar) {
+          unjar.add(f);
         } else {
-            JarBuilder jarFiles = new JarBuilder();
-            List names = new ArrayList();
-            List unjar = new ArrayList();
-            for(int i = 1; i < args.length; i++) {
-                String f = args[i];
-                String ext = jarFiles.fileExtension(f);
-                boolean expandAsJar = ext.equals("jar") || ext.equals("zip");                
-                if(expandAsJar) {
-                    unjar.add(f);
-                } else {
-                    names.add(f);
-                }                
-            }
-            try {
-                jarFiles.merge(names, unjar, args[0]);
-                Date lastMod = new Date(new File(args[0]).lastModified());
-                System.out.println("Merge done to " + args[0] + " " + lastMod);
-            } catch(Exception ge) {
-                ge.printStackTrace(System.err);
-            }
-        }
-    }
-    
-    private static final int BUFF_SIZE = 32*1024;
-    private byte buffer[] = new byte[BUFF_SIZE];
-    protected boolean verbose = false;
+          names.add(f);
+        }
+      }
+      try {
+        jarFiles.merge(names, unjar, args[0]);
+        Date lastMod = new Date(new File(args[0]).lastModified());
+        System.out.println("Merge done to " + args[0] + " " + lastMod);
+      } catch (Exception ge) {
+        ge.printStackTrace(System.err);
+      }
+    }
+  }
+
+  private static final int BUFF_SIZE = 32 * 1024;
+  private byte buffer[] = new byte[BUFF_SIZE];
+  protected boolean verbose = false;
 }

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java?view=auto&rev=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java Mon Sep 18 16:08:06 2006
@@ -0,0 +1,334 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import org.apache.lucene.util.PriorityQueue;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+/**
+ Eventually will be fed TupleInputFormats. 
+ For now will be fed primitive InputFormats.
+ @author Michel Tourn
+ */
+public class MergerInputFormat extends InputFormatBase {
+
+  public MergerInputFormat() {
+  }
+
+  void checkReady(FileSystem fs, JobConf job) {
+    if (ready_) {
+      // could complain if fs / job changes
+      return;
+    }
+    fs_ = fs;
+    job_ = job;
+    debug_ = (job.get("stream.debug") != null);
+
+    String someInputSpec = job_.get("stream.inputspecs.0");
+    CompoundDirSpec someSpec = new CompoundDirSpec(someInputSpec, true);
+    fmts_ = new ArrayList();
+    int n = someSpec.paths_.length;
+    inputTagged_ = job.getBoolean("stream.inputtagged", false);
+    //  0 is primary
+    //  Curr. secondaries are NOT used for getSplits(), only as RecordReader factory
+    for (int i = 0; i < n; i++) {
+      // this ignores -inputreader.. 
+      // That's why if hasSimpleInputSpecs_=true (n=1) then StreamJob will set
+      // the top-level format to StreamInputFormat rather than MergeInputFormat.
+      // So we only support custom -inputformat for n=1. 
+      // Probably OK for now since custom inputformats would be constrained (no \t and \n in payload) 
+      fmts_.add(new StreamInputFormat()); // will be TupleInputFormat
+    }
+    primary_ = (InputFormat) fmts_.get(0);
+    ready_ = true;
+  }
+
+  /** This implementation always returns true. */
+  public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
+    // must do this without JobConf...
+    boolean[] b = new boolean[inputDirs.length];
+    for (int i = 0; i < inputDirs.length; ++i) {
+      b[i] = true;
+    }
+    return b;
+  }
+
+  /** Delegate to the primary InputFormat. 
+   Force full-file splits since there's no index to sync secondaries.
+   (and if there was, this index may need to be created for the first time
+   full file at a time...    )
+   */
+  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) throws IOException {
+    checkReady(fs, job);
+    return ((StreamInputFormat) primary_).getFullFileSplits(fs, job);
+  }
+
+  /**
+   */
+  public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
+    checkReady(fs, job);
+
+    reporter.setStatus(split.toString());
+
+    ArrayList readers = new ArrayList();
+    String primary = split.getPath().toString();
+    CompoundDirSpec spec = CompoundDirSpec.findInputSpecForPrimary(primary, job);
+    if (spec == null) {
+      throw new IOException("Did not find -input spec in JobConf for primary:" + primary);
+    }
+    for (int i = 0; i < fmts_.size(); i++) {
+      InputFormat f = (InputFormat) fmts_.get(i);
+      Path path = new Path(spec.getPaths()[i][0]);
+      FileSplit fsplit = makeFullFileSplit(path);
+      RecordReader r = f.getRecordReader(fs, fsplit, job, reporter);
+      readers.add(r);
+    }
+
+    return new MergedRecordReader(readers);
+  }
+
+  private FileSplit makeFullFileSplit(Path path) throws IOException {
+    long len = fs_.getLength(path);
+    return new FileSplit(path, 0, len);
+  }
+
+  /*
+   private FileSplit relatedSplit(FileSplit primarySplit, int i, CompoundDirSpec spec) throws IOException
+   {
+   if(i == 0) {
+   return primarySplit;
+   }
+
+   // TODO based on custom JobConf (or indirectly: InputFormat-s?)
+   String path = primarySplit.getFile().getAbsolutePath();
+   Path rpath = new Path(path + "." + i);
+
+   long rlength = fs_.getLength(rpath);
+   FileSplit related = new FileSplit(rpath, 0, rlength);
+   return related;    
+   }*/
+
+  class MergedRecordReader implements RecordReader {
+
+    MergedRecordReader(ArrayList/*<RecordReader>*/readers) throws IOException {
+      try {
+        readers_ = readers;
+        primaryReader_ = (RecordReader) readers.get(0);
+        q_ = new MergeQueue(readers.size(), debug_);
+        for (int i = 0; i < readers_.size(); i++) {
+          RecordReader reader = (RecordReader) readers.get(i);
+          WritableComparable k = (WritableComparable) job_.getInputKeyClass().newInstance();
+          Writable v = (Writable) job_.getInputValueClass().newInstance();
+          MergeRecordStream si = new MergeRecordStream(i, reader, k, v);
+          if (si.next()) {
+            q_.add(si);
+          }
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw new IOException(e.toString());
+      }
+    }
+
+    // 1. implements RecordReader
+
+    public boolean next(Writable key, Writable value) throws IOException {
+      boolean more = (q_.size() > 0);
+      if (!more) return false;
+
+      MergeRecordStream ms = (MergeRecordStream) q_.top();
+      int keyTag = inputTagged_ ? (ms.index_ + 1) : NOTAG;
+      assignTaggedWritable(key, ms.k_, keyTag);
+      assignTaggedWritable(value, ms.v_, NOTAG);
+
+      if (ms.next()) { // has another entry
+        q_.adjustTop();
+      } else {
+        q_.pop(); // done with this file
+        if (ms.reader_ == primaryReader_) {
+          primaryClosed_ = true;
+          primaryLastPos_ = primaryReader_.getPos();
+        }
+        ms.reader_.close();
+      }
+      return true;
+    }
+
+    public long getPos() throws IOException {
+      if (primaryClosed_) {
+        return primaryLastPos_;
+      } else {
+        return primaryReader_.getPos();
+      }
+    }
+
+    public void close() throws IOException {
+      IOException firstErr = null;
+
+      for (int i = 0; i < readers_.size(); i++) {
+        RecordReader r = (RecordReader) readers_.get(i);
+        try {
+          r.close();
+        } catch (IOException io) {
+          io.printStackTrace();
+          if (firstErr == null) {
+            firstErr = io;
+          }
+        }
+      }
+      if (firstErr != null) {
+        throw firstErr;
+      }
+    }
+
+    public WritableComparable createKey() {
+      return new Text();
+    }
+
+    public Writable createValue() {
+      return new Text();
+    }
+
+    // 2. utilities
+
+    final static int NOTAG = -1;
+
+    private void assignTaggedWritable(Writable dst, Writable src, int tag) {
+      try {
+        outBuf.reset();
+        if (tag != NOTAG) {
+          if (src instanceof UTF8) {
+            src = new UTF8(">" + tag + "\t" + src.toString()); // breaks anything?
+          } else if (src instanceof Text) {
+            src = new Text(">" + tag + "\t" + src.toString()); // breaks anything?
+          } else {
+            throw new UnsupportedOperationException("Cannot use with tags with key class "
+                + src.getClass());
+          }
+        }
+        src.write(outBuf);
+        inBuf.reset(outBuf.getData(), outBuf.getLength());
+        dst.readFields(inBuf); // throws..
+      } catch (IOException io) {
+        // streams are backed by buffers, but buffers can run out
+        throw new IllegalStateException(io);
+      }
+    }
+
+    private DataInputBuffer inBuf = new DataInputBuffer();
+    private DataOutputBuffer outBuf = new DataOutputBuffer();
+
+    ArrayList/*<RecordReader>*/readers_;
+
+    RecordReader primaryReader_;
+    boolean primaryClosed_;
+    long primaryLastPos_;
+
+    MergeQueue q_;
+
+  }
+
+  boolean ready_;
+  FileSystem fs_;
+  JobConf job_;
+  boolean debug_;
+
+  // we need the JobConf: the other delegated InputFormat-s 
+  // will only be created in the delegator RecordReader
+  InputFormat primary_;
+  boolean inputTagged_;
+  ArrayList/*<InputFormat>*/fmts_;
+}
+
+class MergeQueue extends PriorityQueue // <MergeRecordStream>
+{
+
+  private boolean done;
+  private boolean debug;
+
+  public void add(MergeRecordStream reader) throws IOException {
+    super.put(reader);
+  }
+
+  public MergeQueue(int size, boolean debug) throws IOException {
+    initialize(size);
+    this.debug = debug;
+  }
+
+  protected boolean lessThan(Object a, Object b) {
+    MergeRecordStream ra = (MergeRecordStream) a;
+    MergeRecordStream rb = (MergeRecordStream) b;
+    int cp = ra.k_.compareTo(rb.k_);
+    if (debug) {
+      System.err.println("MergerInputFormat:lessThan " + ra.k_ + ", " + rb.k_ + " cp=" + cp);
+    }
+    if (cp == 0) {
+      return (ra.index_ < rb.index_);
+    } else {
+      return (cp < 0);
+    }
+  }
+
+  public void close() throws IOException {
+    IOException firstErr = null;
+    MergeRecordStream mr;
+    while ((mr = (MergeRecordStream) pop()) != null) {
+      try {
+        mr.reader_.close();
+      } catch (IOException io) {
+        io.printStackTrace();
+        if (firstErr == null) {
+          firstErr = io;
+        }
+      }
+    }
+    if (firstErr != null) {
+      throw firstErr;
+    }
+  }
+}
+
+class MergeRecordStream {
+
+  int index_;
+  RecordReader reader_;
+  WritableComparable k_;
+  Writable v_;
+
+  public MergeRecordStream(int index, RecordReader reader, WritableComparable k, Writable v)
+      throws IOException {
+    index_ = index;
+    reader_ = reader;
+    k_ = k;
+    v_ = v;
+  }
+
+  public boolean next() throws IOException {
+    boolean more = reader_.next(k_, v_);
+    return more;
+  }
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java Mon Sep 18 16:08:06 2006
@@ -30,71 +30,60 @@
  * From man chmod: If no user specs are given, the effect is as if `a' were given. 
  * 
  */
-public class MustangFile extends File
-{
+public class MustangFile extends File {
 
-    public MustangFile(File parent, String child)
-    {
-      super(parent, child);
-    }
-
-    public MustangFile(String pathname)
-    {
-      super(pathname);
-    }
-
-    public MustangFile(String parent, String child) 
-    {
-      super(parent, child);
-    }
-
-    public boolean setReadable(boolean readable, boolean ownerOnly) 
-    {
-      chmod("r", readable, ownerOnly);
-      return SUCCESS;
-    }
-
-    public boolean setReadable(boolean readable)
-    {
-      chmod("r", readable, false);
-      return SUCCESS;
-    }
-
-    public boolean setWritable(boolean writable, boolean ownerOnly) 
-    {
-      chmod("w", writable, ownerOnly);
-      return SUCCESS;
-    }
-    
-    public boolean setWritable(boolean writable) 
-    {
-      chmod("w", writable, false);
-      return SUCCESS;
-    }
-
-    public boolean setExecutable(boolean executable, boolean ownerOnly) 
-    {
-      chmod("x", executable, ownerOnly);
-      return SUCCESS;
-    }
-    
-    public boolean setExecutable(boolean executable)
-    {
-      chmod("x", executable, false);
-      return SUCCESS;
-    }
-    
-    void chmod(String perms, boolean plus, boolean ownerOnly)
-    {
-       String[] argv = new String[3];
-       argv[0] = "/bin/chmod";
-       String spec = ownerOnly ? "u" : "ugoa";
-       spec += (plus ? "+" : "-");
-       spec += perms;
-       argv[1] = spec;
-       argv[2] = getAbsolutePath();
-       StreamUtil.exec(argv, System.err);
-    }
-    
-    final static boolean SUCCESS = true;
-}    
+  public MustangFile(File parent, String child) {
+    super(parent, child);
+  }
+
+  public MustangFile(String pathname) {
+    super(pathname);
+  }
+
+  public MustangFile(String parent, String child) {
+    super(parent, child);
+  }
+
+  public boolean setReadable(boolean readable, boolean ownerOnly) {
+    chmod("r", readable, ownerOnly);
+    return SUCCESS;
+  }
+
+  public boolean setReadable(boolean readable) {
+    chmod("r", readable, false);
+    return SUCCESS;
+  }
+
+  public boolean setWritable(boolean writable, boolean ownerOnly) {
+    chmod("w", writable, ownerOnly);
+    return SUCCESS;
+  }
+
+  public boolean setWritable(boolean writable) {
+    chmod("w", writable, false);
+    return SUCCESS;
+  }
+
+  public boolean setExecutable(boolean executable, boolean ownerOnly) {
+    chmod("x", executable, ownerOnly);
+    return SUCCESS;
+  }
+
+  public boolean setExecutable(boolean executable) {
+    chmod("x", executable, false);
+    return SUCCESS;
+  }
+
+  void chmod(String perms, boolean plus, boolean ownerOnly) {
+    String[] argv = new String[3];
+    argv[0] = "/bin/chmod";
+    String spec = ownerOnly ? "u" : "ugoa";
+    spec += (plus ? "+" : "-");
+    spec += perms;
+    argv[1] = spec;
+    argv[2] = getAbsolutePath();
+    StreamUtil.exec(argv, System.err);
+  }
+
+  final static boolean SUCCESS = true;
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MuxOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MuxOutputFormat.java?view=auto&rev=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MuxOutputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MuxOutputFormat.java Mon Sep 18 16:08:06 2006
@@ -0,0 +1,172 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A multiplexed OutputFormat. The channel choice is encoded within the key.
+ * If channels are fed at the same rate then the data can be read back in 
+ * with a TupleInputFormat. (in a different Job)
+ * @see TupleInputFormat 
+ * @author Michel Tourn
+ */
+public class MuxOutputFormat implements OutputFormat {
+
+  public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progr) throws IOException {
+    fs_ = fs;
+    job_ = job;
+
+    String primary = job.getOutputPath().toString();
+    CompoundDirSpec spec = CompoundDirSpec.findOutputSpecForPrimary(primary, job);
+    if (spec == null) {
+      throw new IOException("Did not find -output spec in JobConf for primary:" + primary);
+    }
+    String[] outPaths = spec.getPaths()[0];
+    int n = outPaths.length;
+    RecordWriter[] writers = new RecordWriter[n];
+    Path[] paths = new Path[n];
+    for (int i = 0; i < n; i++) {
+      OutputFormat f = new StreamOutputFormat(); // the only one supported
+      writers[i] = f.getRecordWriter(fs, job, name, progr);
+      paths[i] = new Path(outPaths[i], name); // same leaf name in different dir
+    }
+    return new MuxRecordWriter(writers, paths);
+  }
+
+  class MuxRecordWriter implements RecordWriter {
+
+    MuxRecordWriter(RecordWriter[] writers, Path[] paths) throws IOException {
+      writers_ = writers;
+      paths_ = paths;
+      numChannels_ = writers_.length;
+      out_ = new FSDataOutputStream[numChannels_];
+      for (int i = 0; i < out_.length; i++) {
+        System.err.println("MuxRecordWriter [" + i + "] create: " + paths[i]);
+        out_[i] = fs_.create(paths[i]);
+      }
+    }
+
+    final static int ONE_BASED = 1;
+    final static char CHANOUT = '>';
+    final static char CHANIN = '<';
+    final static String BADCHANOUT = "Invalid output channel spec: ";
+
+    int parseOutputChannel(String s, int max) throws IOException {
+      try {
+        if (s.charAt(s.length() - 1) != CHANOUT) {
+          throw new IOException(BADCHANOUT + s);
+        }
+        String s1 = s.substring(0, s.length() - 1);
+        int c = Integer.parseInt(s1);
+        if (c < 1 || c > max) {
+          String msg = "Output channel '" + s + "': must be an integer between 1 and " + max
+              + " followed by '" + CHANOUT + "' and TAB";
+          throw new IndexOutOfBoundsException(msg);
+        }
+        return c;
+      } catch (Exception e) {
+        throw new IOException(BADCHANOUT + s + " cause:" + e);
+      }
+    }
+
+    // TODO after Text patch, share code with StreamLineRecordReader.next()
+    void splitFirstTab(String input, UTF8 first, UTF8 second) {
+      int tab = input.indexOf('\t');
+      if (tab == -1) {
+        ((UTF8) first).set(input);
+        ((UTF8) second).set("");
+      } else {
+        ((UTF8) first).set(input.substring(0, tab));
+        ((UTF8) second).set(input);
+      }
+
+    }
+
+    void writeKeyTabVal(Writable key, Writable val, FSDataOutputStream out) throws IOException {
+      out.write(key.toString().getBytes("UTF-8"));
+      out.writeByte('\t');
+      out.write(val.toString().getBytes("UTF-8"));
+      out.writeByte('\n');
+    }
+
+    public void write(WritableComparable key, Writable value) throws IOException {
+      // convention: Application code must put a channel spec in first column
+      // iff there is more than one (output) channel
+      if (numChannels_ == 1) {
+        writeKeyTabVal(key, value, out_[0]);
+      } else {
+        // StreamInputFormat does not know about channels 
+        // Now reinterpret key as channel and split value as new key-value
+        // A more general mechanism would still require Reader classes to know about channels. 
+        // (and encode it as part of key or value)
+        int channel = parseOutputChannel(key.toString(), numChannels_);
+        FSDataOutputStream oi = out_[channel - ONE_BASED];
+        splitFirstTab(value.toString(), key2, val2);
+        writeKeyTabVal(key2, val2, oi);
+      }
+    }
+
+    public void close(Reporter reporter) throws IOException {
+      IOException firstErr = null;
+
+      for (int i = 0; i < writers_.length; i++) {
+        FSDataOutputStream oi = out_[i];
+        RecordWriter r = writers_[i];
+        try {
+          oi.close();
+          r.close(reporter);
+        } catch (IOException io) {
+          System.err.println("paths_[" + i + "]: " + paths_[i]);
+          io.printStackTrace();
+          if (firstErr == null) {
+            firstErr = io;
+          }
+        }
+      }
+      if (firstErr != null) {
+        throw firstErr;
+      }
+    }
+
+    UTF8 key2 = new UTF8();
+    UTF8 val2 = new UTF8();
+
+    RecordWriter[] writers_;
+    Path[] paths_;
+    int numChannels_;
+    FSDataOutputStream[] out_;
+  }
+
+  public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
+    // allow existing data (for app-level restartability)
+  }
+
+  FileSystem fs_;
+  JobConf job_;
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java Mon Sep 18 16:08:06 2006
@@ -40,11 +40,9 @@
  *  such a multipass aggregation.
  *  @author Michel Tourn
  */
-public class PipeCombiner extends PipeReducer
-{
+public class PipeCombiner extends PipeReducer {
 
-  String getPipeCommand(JobConf job)
-  {
+  String getPipeCommand(JobConf job) {
     return job.get("stream.combine.streamprocessor");
   }
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Mon Sep 18 16:08:06 2006
@@ -17,6 +17,9 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.channels.*;
 import java.nio.charset.CharacterCodingException;
 import java.io.IOException;
 import java.util.Date;
@@ -29,6 +32,7 @@
 
 import org.apache.commons.logging.*;
 
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -38,6 +42,7 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -53,17 +58,19 @@
    * Mapper/Reducer operations will delegate to it
    */
   abstract String getPipeCommand(JobConf job);
+
   /*
-  */
+   */
   abstract String getKeyColPropName();
 
   /** Write output as side-effect files rather than as map outputs.
-      This is useful to do "Map" tasks rather than "MapReduce" tasks. */
-  boolean getUseSideEffect()
-  {
+   This is useful to do "Map" tasks rather than "MapReduce" tasks. */
+  boolean getUseSideEffect() {
     return false;
   }
 
+  abstract boolean getDoPipe();
+
   /**
    * @returns how many TABS before the end of the key part
    * usually: 1 or "ALL"
@@ -71,29 +78,28 @@
    * configured via tool's argv: splitKeyVal=ALL or 1..
    * although it is interpreted here, not by tool
    */
-  int getKeyColsFromPipeCommand(String cmd)
-  {
+  int getKeyColsFromPipeCommand(String cmd) {
     String key = getKeyColPropName();
     Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");
     Matcher match = kcPat.matcher(cmd);
     String kc;
-    if(!match.matches()) {
+    if (!match.matches()) {
       kc = null;
     } else {
       kc = match.group(1);
     }
 
     int cols;
-    if(kc== null) {
+    if (kc == null) {
       // default value is 1 and the Stream applications could instead
       // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL
       cols = 1;
-    } else if(kc.equals("ALL")) {
+    } else if (kc.equals("ALL")) {
       cols = ALL_COLS;
     } else {
       try {
         cols = Integer.parseInt(kc);
-      } catch(NumberFormatException nf) {
+      } catch (NumberFormatException nf) {
         cols = Integer.MAX_VALUE;
       }
     }
@@ -107,87 +113,135 @@
   final static int SINGLEQ = 2;
   final static int DOUBLEQ = 3;
 
-  static String[] splitArgs(String args)
-  {
+  static String[] splitArgs(String args) {
     ArrayList argList = new ArrayList();
     char[] ch = args.toCharArray();
     int clen = ch.length;
     int state = OUTSIDE;
     int argstart = 0;
-    for(int c=0; c<=clen; c++) {
-        boolean last = (c==clen);
-        int lastState = state;
-        boolean endToken = false;
-        if(!last) {
-          if(ch[c]=='\'') {
-            if(state == OUTSIDE) {
-              state = SINGLEQ;
-            } else if(state == SINGLEQ) {
-              state = OUTSIDE;
-            }
-            endToken = (state != lastState);
-          } else if(ch[c]=='"') {
-            if(state == OUTSIDE) {
-              state = DOUBLEQ;
-            } else if(state == DOUBLEQ) {
-              state = OUTSIDE;
-            }
-            endToken = (state != lastState);
-          } else if(ch[c]==' ') {
-            if(state == OUTSIDE) {
-              endToken = true;
-            }
+    for (int c = 0; c <= clen; c++) {
+      boolean last = (c == clen);
+      int lastState = state;
+      boolean endToken = false;
+      if (!last) {
+        if (ch[c] == '\'') {
+          if (state == OUTSIDE) {
+            state = SINGLEQ;
+          } else if (state == SINGLEQ) {
+            state = OUTSIDE;
           }
-        }
-        if(last || endToken) {
-          if(c == argstart) {
-            // unquoted space
-          } else {
-            String a;
-            a = args.substring(argstart, c);
-            argList.add(a);
+          endToken = (state != lastState);
+        } else if (ch[c] == '"') {
+          if (state == OUTSIDE) {
+            state = DOUBLEQ;
+          } else if (state == DOUBLEQ) {
+            state = OUTSIDE;
+          }
+          endToken = (state != lastState);
+        } else if (ch[c] == ' ') {
+          if (state == OUTSIDE) {
+            endToken = true;
           }
-          argstart = c+1;
-          lastState = state;
         }
+      }
+      if (last || endToken) {
+        if (c == argstart) {
+          // unquoted space
+        } else {
+          String a;
+          a = args.substring(argstart, c);
+          argList.add(a);
+        }
+        argstart = c + 1;
+        lastState = state;
+      }
     }
-    return (String[])argList.toArray(new String[0]);
+    return (String[]) argList.toArray(new String[0]);
   }
 
-  public void configure(JobConf job)
-  {
+  OutputStream getURIOutputStream(URI uri, boolean allowSocket) throws IOException {
+    final String SOCKET = "socket";
+    if (uri.getScheme().equals(SOCKET)) {
+      if (!allowSocket) {
+        throw new IOException(SOCKET + " not allowed on outputstream " + uri);
+      }
+      final Socket sock = new Socket(uri.getHost(), uri.getPort());
+      OutputStream out = new FilterOutputStream(sock.getOutputStream()) {
+        public void close() throws IOException {
+          sock.close();
+          super.close();
+        }
+      };
+      return out;
+    } else {
+      // a FSDataOutputStreamm, localFS or HDFS.
+      // localFS file may be set up as a FIFO.
+      return sideFs_.create(new Path(uri.getSchemeSpecificPart()));
+    }
+  }
+
+  String getSideEffectFileName() {
+    FileSplit split = StreamUtil.getCurrentSplit(job_);
+    String leaf = split.getPath().getName();
+    if (split.getStart() == 0) {
+      return leaf;
+    } else {
+      return new FileSplit(new Path(leaf), split.getStart(), split.getLength()).toString();
+    }
+  }
 
+  String makeUniqueFileSuffix() {
+    return "." + System.currentTimeMillis() + "." + job_.get("mapred.task.id");
+  }
+
+  public void configure(JobConf job) {
     try {
       String argv = getPipeCommand(job);
+
       keyCols_ = getKeyColsFromPipeCommand(argv);
 
       debug_ = (job.get("stream.debug") != null);
-      if(debug_) {
+      if (debug_) {
         System.out.println("PipeMapRed: stream.debug=true");
       }
-      
+
       joinDelay_ = job.getLong("stream.joindelay.milli", 0);
-      
+
       job_ = job;
+      fs_ = FileSystem.get(job_);
+      if (job_.getBoolean("stream.sideoutput.localfs", false)) {
+        //sideFs_ = new LocalFileSystem(job_);
+        sideFs_ = FileSystem.getNamed("local", job_);
+      } else {
+        sideFs_ = fs_;
+      }
+
+      if (debug_) {
+        System.out.println("kind   :" + this.getClass());
+        System.out.println("split  :" + StreamUtil.getCurrentSplit(job_));
+        System.out.println("fs     :" + fs_.toString());
+        System.out.println("sideFs :" + sideFs_.toString());
+      }
 
-      // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
-      doPipe_ = (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
-      if(!doPipe_) return;
+      doPipe_ = getDoPipe();
+      if (!doPipe_) return;
 
       setStreamJobDetails(job);
       setStreamProperties();
 
+      if (debugFailEarly_) {
+        throw new RuntimeException("debugFailEarly_");
+      }
       String[] argvSplit = splitArgs(argv);
       String prog = argvSplit[0];
       String userdir = System.getProperty("user.dir");
-      if(new File(prog).isAbsolute()) {
+      if (new File(prog).isAbsolute()) {
         // we don't own it. Hope it is executable
       } else {
         new MustangFile(prog).setExecutable(true, true);
       }
 
-
-      if(job_.getInputValueClass().equals(BytesWritable.class)) {
+      if (job_.getInputValueClass().equals(BytesWritable.class)) {
         // TODO expose as separate config:
         // job or semistandard inputformat property
         optUseKey_ = false;
@@ -195,12 +249,28 @@
 
       optSideEffect_ = getUseSideEffect();
 
-      if(optSideEffect_) {
-        // in cluster local named: outnone/map_bw5nzv
-        String fileName = job_.get("mapred.task.id");
-        sideEffectPath_ = new Path(job_.getOutputPath(), fileName);
-        FileSystem fs = FileSystem.get(job_);
-        sideEffectOut_ = fs.create(sideEffectPath_);
+      if (optSideEffect_) {
+        // during work: use a completely unique filename to avoid HDFS namespace conflicts
+        // after work: rename to a filename that depends only on the workload (the FileSplit)
+        //   it's a friendly name and in case of reexecution it will clobber. 
+        // reexecution can be due to: other job, failed task and speculative task
+        // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect then: 
+        // client has renamed outputPath and saved the argv's original output path as:
+        if (useSingleSideOutputURI_) {
+          sideEffectURI_ = new URI(sideOutputURI_);
+          sideEffectPathFinal_ = null; // in-place, no renaming to final
+        } else {
+          String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: job_.getOutputPath() 
+          String fileName = getSideEffectFileName(); // see HADOOP-444 for rationale
+          sideEffectPathFinal_ = new Path(sideOutputPath, fileName);
+          sideEffectURI_ = new URI(sideEffectPathFinal_ + makeUniqueFileSuffix()); // implicit dfs: 
+        }
+        // apply default scheme
+        if(sideEffectURI_.getScheme() == null) {
+          sideEffectURI_ = new URI("file", sideEffectURI_.getSchemeSpecificPart(), null);
+        }
+        boolean allowSocket = useSingleSideOutputURI_;
+        sideEffectOut_ = getURIOutputStream(sideEffectURI_, allowSocket);
       }
 
       // argvSplit[0]:
@@ -209,109 +279,131 @@
       // In this case, force an absolute path to make sure exec finds it.
       argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
       logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
-      logprintln("sideEffectPath_=" + sideEffectPath_);
+      logprintln("sideEffectURI_=" + sideEffectURI_);
 
-      Environment childEnv = (Environment)StreamUtil.env().clone();
+      Environment childEnv = (Environment) StreamUtil.env().clone();
       addJobConfToEnvironment(job_, childEnv);
       addEnvironment(childEnv, job_.get("stream.addenvironment"));
       sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
 
       /* // This way required jdk1.5
-      ProcessBuilder processBuilder = new ProcessBuilder(argvSplit);
-      Map<String, String> env = processBuilder.environment();
-      addEnvironment(env, job_.get("stream.addenvironment"));
-      sim = processBuilder.start();
-      */
+       Builder processBuilder = new ProcessBuilder(argvSplit);
+       Map<String, String> env = processBuilder.environment();
+       addEnvironment(env, job_.get("stream.addenvironment"));
+       sim = processBuilder.start();
+       */
 
       clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
-      clientIn_  = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
+      clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
       clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
       startTime_ = System.currentTimeMillis();
 
-    } catch(Exception e) {
-        e.printStackTrace();
-        e.printStackTrace(log_);
+    } catch (Exception e) {
+      logStackTrace(e);
     }
   }
 
-  void setStreamJobDetails(JobConf job)
-  {
+  void setStreamJobDetails(JobConf job) {
     jobLog_ = job.get("stream.jobLog_");
     String s = job.get("stream.minRecWrittenToEnableSkip_");
-    if(s != null) {
+    if (s != null) {
       minRecWrittenToEnableSkip_ = Long.parseLong(s);
       logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
     }
+    taskId_ = StreamUtil.getTaskInfo(job_);
+    debugFailEarly_ = isDebugFail("early");
+    debugFailDuring_ = isDebugFail("during");
+    debugFailLate_ = isDebugFail("late");
+
+    sideOutputURI_ = job_.get("stream.sideoutput.uri");
+    useSingleSideOutputURI_ = (sideOutputURI_ != null);
+  }
+
+  boolean isDebugFail(String kind) {
+    String execidlist = job_.get("stream.debugfail.reexec." + kind);
+    if (execidlist == null) {
+      return false;
+    }
+    String[] e = execidlist.split(",");
+    for (int i = 0; i < e.length; i++) {
+      int ei = Integer.parseInt(e[i]);
+      if (taskId_.execid == ei) {
+        return true;
+      }
+    }
+    return false;
   }
 
-  void setStreamProperties()
-  {
-    taskid_ = System.getProperty("stream.taskid");
-    if(taskid_ == null) {
-      taskid_ = "noid" + System.currentTimeMillis();
-    }
+  void setStreamProperties() {
     String s = System.getProperty("stream.port");
-    if(s != null) {
+    if (s != null) {
       reportPortPlusOne_ = Integer.parseInt(s);
     }
+  }
 
+  void logStackTrace(Exception e) {
+    if (e == null) return;
+    e.printStackTrace();
+    if (log_ != null) {
+      e.printStackTrace(log_);
+    }
   }
 
-  void logprintln(String s)
-  {
-    if(log_ != null) {
+  void logprintln(String s) {
+    if (log_ != null) {
       log_.println(s);
     } else {
       LOG.info(s); // or LOG.info()
     }
   }
 
-  void logflush()
-  {
-    if(log_ != null) {
+  void logflush() {
+    if (log_ != null) {
       log_.flush();
     }
   }
 
-  void addJobConfToEnvironment(JobConf conf, Properties env)
-  {
-    logprintln("addJobConfToEnvironment: begin");
+  void addJobConfToEnvironment(JobConf conf, Properties env) {
+    if (debug_) {
+      logprintln("addJobConfToEnvironment: begin");
+    }
     Iterator it = conf.entries();
-    while(it.hasNext()) {
-        Map.Entry en = (Map.Entry)it.next();
-        String name = (String)en.getKey();
-        String value = (String)en.getValue();
-        name = safeEnvVarName(name);
-        envPut(env, name, value);
-    }
-    logprintln("addJobConfToEnvironment: end");
-  }
-  
-  String safeEnvVarName(String var)
-  {
+    while (it.hasNext()) {
+      Map.Entry en = (Map.Entry) it.next();
+      String name = (String) en.getKey();
+      //String value = (String)en.getValue(); // does not apply variable expansion
+      String value = conf.get(name); // does variable expansion 
+      name = safeEnvVarName(name);
+      envPut(env, name, value);
+    }
+    if (debug_) {
+      logprintln("addJobConfToEnvironment: end");
+    }
+  }
+
+  String safeEnvVarName(String var) {
     StringBuffer safe = new StringBuffer();
     int len = var.length();
-    for(int i=0; i<len; i++) {
-        char c = var.charAt(i);
-        char s;
-        if((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
-          s = c;
-        } else {
-          s = '_';
-        }
-        safe.append(s);
+    for (int i = 0; i < len; i++) {
+      char c = var.charAt(i);
+      char s;
+      if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
+        s = c;
+      } else {
+        s = '_';
+      }
+      safe.append(s);
     }
     return safe.toString();
   }
 
-  void addEnvironment(Properties env, String nameVals)
-  {
+  void addEnvironment(Properties env, String nameVals) {
     // encoding "a=b c=d" from StreamJob
-    if(nameVals == null) return;
+    if (nameVals == null) return;
     String[] nv = nameVals.split(" ");
-    for(int i=0; i<nv.length; i++) {
+    for (int i = 0; i < nv.length; i++) {
       String[] pair = nv[i].split("=", 2);
-      if(pair.length != 2) {
+      if (pair.length != 2) {
         logprintln("Skip ev entry:" + nv[i]);
       } else {
         envPut(env, pair[0], pair[1]);
@@ -319,43 +411,43 @@
     }
   }
 
-  void envPut(Properties env, String name, String value)
-  {
-    logprintln("Add  ev entry:" + name + "=" + value);
+  void envPut(Properties env, String name, String value) {
+    if (debug_) {
+      logprintln("Add  ev entry:" + name + "=" + value);
+    }
     env.put(name, value);
   }
-  
+
   /** .. and if successful: delete the task log */
-  void appendLogToJobLog(String status)
-  {
-    if(jobLog_ == null) {
+  void appendLogToJobLog(String status) {
+    if (jobLog_ == null) {
       return; // not using a common joblog
     }
-    StreamUtil.exec("/bin/rm " + LOGNAME, log_);
+    if (log_ != null) {
+      StreamUtil.exec("/bin/rm " + LOGNAME, log_);
+    }
     // TODO socket-based aggregator (in JobTrackerInfoServer)
   }
 
-
-  void startOutputThreads(OutputCollector output, Reporter reporter)
-  {
-      outThread_ = new MROutputThread(output, reporter);
-      outThread_.start();
-      errThread_ = new MRErrorThread(reporter);
-      errThread_.start();
+  void startOutputThreads(OutputCollector output, Reporter reporter) {
+    outThread_ = new MROutputThread(output, reporter);
+    outThread_.start();
+    errThread_ = new MRErrorThread(reporter);
+    errThread_.start();
   }
-  
+
   void waitOutputThreads() {
-      try {
-          sim.waitFor();
-          if(outThread_ != null) {
-              outThread_.join(joinDelay_);
-          }
-          if(errThread_ != null) {
-              errThread_.join(joinDelay_);
-          }
-      } catch(InterruptedException e) {
-          //ignore
+    try {
+      sim.waitFor();
+      if (outThread_ != null) {
+        outThread_.join(joinDelay_);
+      }
+      if (errThread_ != null) {
+        errThread_.join(joinDelay_);
       }
+    } catch (InterruptedException e) {
+      //ignore
+    }
   }
 
   /**
@@ -365,124 +457,137 @@
    * @param val: value of a record
    * @throws IOException
    */
-  void splitKeyVal(byte [] line, Text key, Text val) throws IOException
-  {
-    int pos=-1;
-    if(keyCols_ != ALL_COLS) {
-        pos = UTF8ByteArrayUtils.findTab(line);
+  void splitKeyVal(byte[] line, Text key, Text val) throws IOException {
+    int pos = -1;
+    if (keyCols_ != ALL_COLS) {
+      pos = UTF8ByteArrayUtils.findTab(line);
     }
     try {
-        if(pos == -1) {
-            key.set(line);
-            val.set("");
-        } else {
-            UTF8ByteArrayUtils.splitKeyVal(line, key, val, pos);
-        }
+      if (pos == -1) {
+        key.set(line);
+        val.set("");
+      } else {
+        UTF8ByteArrayUtils.splitKeyVal(line, key, val, pos);
+      }
     } catch (CharacterCodingException e) {
-        LOG.warn(e);
-        StringUtils.stringifyException(e);
+      LOG.warn(StringUtils.stringifyException(e));
     }
   }
-  
-  class MROutputThread extends Thread
-  {
-    MROutputThread(OutputCollector output, Reporter reporter)
-    {
+
+  class MROutputThread extends Thread {
+
+    MROutputThread(OutputCollector output, Reporter reporter) {
       setDaemon(true);
       this.output = output;
       this.reporter = reporter;
     }
+
     public void run() {
-        try {
-            Text key = new Text();
-            Text val = new Text();
-            // 3/4 Tool to Hadoop
-            while((answer=UTF8ByteArrayUtils.readLine((InputStream)clientIn_))!= null) {
-                // 4/4 Hadoop out
-                if(optSideEffect_) {
-                    sideEffectOut_.write(answer);
-                    sideEffectOut_.write('\n');
-                } else {
-                    splitKeyVal(answer, key, val);
-                    output.collect(key, val);
-                }
-                numRecWritten_++;
-                if(numRecWritten_ % 100 == 0) {
-                    logprintln(numRecRead_+"/"+numRecWritten_);
-                    logflush();
-                }
-            }
-        } catch(IOException io) {
-            io.printStackTrace(log_);
+      try {
+        Text key = new Text();
+        Text val = new Text();
+        // 3/4 Tool to Hadoop
+        while ((answer = UTF8ByteArrayUtils.readLine((InputStream) clientIn_)) != null) {
+          // 4/4 Hadoop out
+          if (optSideEffect_) {
+            sideEffectOut_.write(answer);
+            sideEffectOut_.write('\n');
+          } else {
+            splitKeyVal(answer, key, val);
+            output.collect(key, val);
+          }
+          numRecWritten_++;
+          if (numRecWritten_ % 100 == 0) {
+            logprintln(numRecRead_ + "/" + numRecWritten_);
+            logflush();
+          }
         }
-        logprintln("MROutputThread done");
+      } catch (IOException io) {
+        io.printStackTrace(log_);
+      }
+      logprintln("MROutputThread done");
     }
+
     OutputCollector output;
     Reporter reporter;
-    byte [] answer;
+    byte[] answer;
   }
 
-  class MRErrorThread extends Thread
-  {
-    public MRErrorThread(Reporter reporter)
-    {
+  class MRErrorThread extends Thread {
+
+    public MRErrorThread(Reporter reporter) {
       this.reporter = reporter;
       setDaemon(true);
     }
-    public void run()
-    {
-      byte [] line;
+
+    public void run() {
+      byte[] line;
       try {
         long num = 0;
-        while((line=UTF8ByteArrayUtils.readLine((InputStream)clientErr_)) != null) {
+        while ((line = UTF8ByteArrayUtils.readLine((InputStream) clientErr_)) != null) {
           num++;
-          String lineStr = new String(line, "UTF-8"); 
+          String lineStr = new String(line, "UTF-8");
           logprintln(lineStr);
-          if(num < 10) {
+          long now = System.currentTimeMillis(); 
+          if (num < 10 || (now-lastStderrReport > 10*1000)) {
             String hline = "MRErr: " + lineStr;
             System.err.println(hline);
             reporter.setStatus(hline);
+            lastStderrReport = now;
           }
         }
-      } catch(IOException io) {
-        io.printStackTrace(log_);
+      } catch (IOException io) {
+        logStackTrace(io);
       }
     }
+    long lastStderrReport = 0;
     Reporter reporter;
   }
 
-  public void mapRedFinished()
-  {
-      logprintln("mapRedFinished");
-      if(!doPipe_) return;
-
+  public void mapRedFinished() {
+    logprintln("mapRedFinished");
+    try {
+      if (!doPipe_) return;
+      try {
+        if (clientOut_ != null) {
+          clientOut_.close();
+        }
+      } catch (IOException io) {
+      }
+      waitOutputThreads();
       try {
-          try {
-              if(clientOut_ != null) {
-                  clientOut_.close();
-              }
-          } catch(IOException io) {
+        if (optSideEffect_) {
+          logprintln("closing " + sideEffectURI_);
+          if (sideEffectOut_ != null) sideEffectOut_.close();
+          logprintln("closed  " + sideEffectURI_);
+          if (sideEffectURI_.getScheme().equals("file")) {
+            logprintln("size  " + new File(sideEffectURI_).length());
           }
-          waitOutputThreads();
-          try {
-              if(optSideEffect_) {
-                  logprintln("closing " + sideEffectPath_);
-                  sideEffectOut_.close();
-                  logprintln("closed  " + sideEffectPath_);
-              }
-          } catch(IOException io) {
-              io.printStackTrace();
+          if (useSingleSideOutputURI_) {
+            // With sideEffectPath_ we wrote in-place. 
+            // Possibly a named pipe set up by user or a socket.
+          } else {
+            boolean del = sideFs_.delete(sideEffectPathFinal_);
+            logprintln("deleted  (" + del + ") " + sideEffectPathFinal_);
+            sideFs_.rename(new Path(sideEffectURI_.getSchemeSpecificPart()), sideEffectPathFinal_);
+            logprintln("renamed  " + sideEffectPathFinal_);
           }
-          sim.destroy();
-      } catch(RuntimeException e) {
-          e.printStackTrace(log_);
-          throw e;
+        }
+      } catch (IOException io) {
+        io.printStackTrace();
       }
+      if (sim != null) sim.destroy();
+    } catch (RuntimeException e) {
+      logStackTrace(e);
+      throw e;
+    }
+    if (debugFailLate_) {
+      throw new RuntimeException("debugFailLate_");
+    }
   }
 
-  void maybeLogRecord()
-  {
-    if(numRecRead_ >= nextRecReadLog_) {
+  void maybeLogRecord() {
+    if (numRecRead_ >= nextRecReadLog_) {
       String info = numRecInfo();
       logprintln(info);
       logflush();
@@ -492,8 +597,7 @@
     }
   }
 
-  public String getContext()
-  {
+  public String getContext() {
 
     String s = numRecInfo() + "\n";
     s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";
@@ -503,38 +607,38 @@
     s += envline("HADOOP_USER");
     //s += envline("PWD"); // =/home/crawler/hadoop/trunk
     s += "last Hadoop input: |" + mapredKey_ + "|\n";
-    s += "last tool output: |" + outThread_.answer + "|\n";
+    if (outThread_ != null) {
+      s += "last tool output: |" + outThread_.answer + "|\n";
+    }
     s += "Date: " + new Date() + "\n";
     // s += envline("HADOOP_HOME");
     // s += envline("REMOTE_HOST");
     return s;
   }
 
-  String envline(String var)
-  {
+  String envline(String var) {
     return var + "=" + StreamUtil.env().get(var) + "\n";
   }
 
-  String numRecInfo()
-  {
-    long elapsed = (System.currentTimeMillis() - startTime_)/1000;
-    long total = numRecRead_+numRecWritten_+numRecSkipped_;
-    return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_
-     + " in:"  + safeDiv(numRecRead_, elapsed) + " [rec/s]"
-     + " out:" + safeDiv(numRecWritten_, elapsed) + " [rec/s]";
-  }
-  String safeDiv(long n, long d)
-  {
-    return (d==0) ? "NA" : ""+n/d + "=" + n + "/" + d;
-  }
-  String logFailure(Exception e)
-  {
-      StringWriter sw = new StringWriter();
-      PrintWriter pw = new PrintWriter(sw);
-      e.printStackTrace(pw);
-      String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
-      logprintln(msg);
-      return msg;
+  String numRecInfo() {
+    long elapsed = (System.currentTimeMillis() - startTime_) / 1000;
+    long total = numRecRead_ + numRecWritten_ + numRecSkipped_;
+    return "R/W/S=" + numRecRead_ + "/" + numRecWritten_ + "/" + numRecSkipped_ + " in:"
+        + safeDiv(numRecRead_, elapsed) + " [rec/s]" + " out:" + safeDiv(numRecWritten_, elapsed)
+        + " [rec/s]";
+  }
+
+  String safeDiv(long n, long d) {
+    return (d == 0) ? "NA" : "" + n / d + "=" + n + "/" + d;
+  }
+
+  String logFailure(Exception e) {
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    e.printStackTrace(pw);
+    String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
+    logprintln(msg);
+    return msg;
   }
 
   /**
@@ -543,24 +647,24 @@
    * @throws IOException
    */
   void write(Writable value) throws IOException {
-      byte[] bval;
-      int valSize;
-      if(value instanceof BytesWritable) {
-          BytesWritable val = (BytesWritable)value;
-          bval = val.get();
-          valSize = val.getSize();
-      } else if(value instanceof Text){
-          Text val = (Text)value;
-          bval = val.getBytes();
-          valSize = val.getLength();
-      } else  {
-          String sval = value.toString();
-          bval = sval.getBytes("UTF-8");
-          valSize = bval.length;
-      }
-      clientOut_.write(bval, 0, valSize);
+    byte[] bval;
+    int valSize;
+    if (value instanceof BytesWritable) {
+      BytesWritable val = (BytesWritable) value;
+      bval = val.get();
+      valSize = val.getSize();
+    } else if (value instanceof Text) {
+      Text val = (Text) value;
+      bval = val.getBytes();
+      valSize = val.getLength();
+    } else {
+      String sval = value.toString();
+      bval = sval.getBytes("UTF-8");
+      valSize = bval.length;
+    }
+    clientOut_.write(bval, 0, valSize);
   }
-  
+
   long startTime_;
   long numRecRead_ = 0;
   long numRecWritten_ = 0;
@@ -574,52 +678,43 @@
 
   long joinDelay_;
   JobConf job_;
+  FileSystem fs_;
+  FileSystem sideFs_;
 
   // generic MapRed parameters passed on by hadoopStreaming
-  String taskid_;
   int reportPortPlusOne_;
 
   boolean doPipe_;
   boolean debug_;
+  boolean debugFailEarly_;
+  boolean debugFailDuring_;
+  boolean debugFailLate_;
 
   Process sim;
   MROutputThread outThread_;
   String jobLog_;
   MRErrorThread errThread_;
   DataOutputStream clientOut_;
-  DataInputStream  clientErr_;
-  DataInputStream   clientIn_;
+  DataInputStream clientErr_;
+  DataInputStream clientIn_;
 
   // set in PipeMapper/PipeReducer subclasses
   String mapredKey_;
   int numExceptions_;
+  StreamUtil.TaskId taskId_;
 
   boolean optUseKey_ = true;
 
-  boolean optSideEffect_;
-  Path sideEffectPath_;
-  FSDataOutputStream sideEffectOut_;
+  private boolean optSideEffect_;
+  private URI sideEffectURI_;
+  private Path sideEffectPathFinal_;
+
+  private boolean useSingleSideOutputURI_;
+  private String sideOutputURI_;
+
+  private OutputStream sideEffectOut_;
 
   String LOGNAME;
   PrintStream log_;
 
-  /* curr. going to stderr so that it is preserved
-  { // instance initializer
-    try {
-      int id = (int)((System.currentTimeMillis()/2000) % 10);
-      String sid = id+ "." + StreamUtil.env().get("USER");
-      LOGNAME = "/tmp/PipeMapRed." + sid + ".log";
-      log_ = new PrintStream(new FileOutputStream(LOGNAME));
-      logprintln(new java.util.Date());
-      logflush();
-    } catch(IOException io) {
-      System.err.println("LOGNAME=" + LOGNAME);
-      io.printStackTrace();
-    } finally {
-      if(log_ == null) {
-        log_ = System.err;
-      }
-    }
-  }
-  */
 }



Mime
View raw message