hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r538693 [20/20] - in /lucene/hadoop/trunk: ./ bin/ src/c++/pipes/ src/c++/pipes/api/ src/c++/pipes/api/hadoop/ src/c++/pipes/impl/ src/c++/utils/ src/c++/utils/api/ src/c++/utils/api/hadoop/ src/c++/utils/impl/ src/c++/utils/m4/ src/example...
Date Wed, 16 May 2007 19:23:53 GMT
Added: lucene/hadoop/trunk/src/examples/pipes/missing
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/pipes/missing?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/examples/pipes/missing (added)
+++ lucene/hadoop/trunk/src/examples/pipes/missing Wed May 16 12:23:48 2007
@@ -0,0 +1,360 @@
+#! /bin/sh
+# Common stub for a few missing GNU programs while installing.
+
+scriptversion=2003-09-02.23
+
+# Copyright (C) 1996, 1997, 1999, 2000, 2002, 2003 
+#   Free Software Foundation, Inc.
+# Originally by Fran,cois Pinard <pinard@iro.umontreal.ca>, 1996.
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+# 02111-1307, USA.
+
+# As a special exception to the GNU General Public License, if you
+# distribute this file as part of a program that contains a
+# configuration script generated by Autoconf, you may include it under
+# the same distribution terms that you use for the rest of that program.
+
+if test $# -eq 0; then
+  echo 1>&2 "Try \`$0 --help' for more information"
+  exit 1
+fi
+
+run=:
+
+# In the cases where this matters, `missing' is being run in the
+# srcdir already.
+if test -f configure.ac; then
+  configure_ac=configure.ac
+else
+  configure_ac=configure.in
+fi
+
+msg="missing on your system"
+
+case "$1" in
+--run)
+  # Try to run requested program, and just exit if it succeeds.
+  run=
+  shift
+  "$@" && exit 0
+  # Exit code 63 means version mismatch.  This often happens
+  # when the user try to use an ancient version of a tool on
+  # a file that requires a minimum version.  In this case we
+  # we should proceed has if the program had been absent, or
+  # if --run hadn't been passed.
+  if test $? = 63; then
+    run=:
+    msg="probably too old"
+  fi
+  ;;
+esac
+
+# If it does not exist, or fails to run (possibly an outdated version),
+# try to emulate it.
+case "$1" in
+
+  -h|--h|--he|--hel|--help)
+    echo "\
+$0 [OPTION]... PROGRAM [ARGUMENT]...
+
+Handle \`PROGRAM [ARGUMENT]...' for when PROGRAM is missing, or return an
+error status if there is no known handling for PROGRAM.
+
+Options:
+  -h, --help      display this help and exit
+  -v, --version   output version information and exit
+  --run           try to run the given command, and emulate it if it fails
+
+Supported PROGRAM values:
+  aclocal      touch file \`aclocal.m4'
+  autoconf     touch file \`configure'
+  autoheader   touch file \`config.h.in'
+  automake     touch all \`Makefile.in' files
+  bison        create \`y.tab.[ch]', if possible, from existing .[ch]
+  flex         create \`lex.yy.c', if possible, from existing .c
+  help2man     touch the output file
+  lex          create \`lex.yy.c', if possible, from existing .c
+  makeinfo     touch the output file
+  tar          try tar, gnutar, gtar, then tar without non-portable flags
+  yacc         create \`y.tab.[ch]', if possible, from existing .[ch]
+
+Send bug reports to <bug-automake@gnu.org>."
+    ;;
+
+  -v|--v|--ve|--ver|--vers|--versi|--versio|--version)
+    echo "missing $scriptversion (GNU Automake)"
+    ;;
+
+  -*)
+    echo 1>&2 "$0: Unknown \`$1' option"
+    echo 1>&2 "Try \`$0 --help' for more information"
+    exit 1
+    ;;
+
+  aclocal*)
+    if test -z "$run" && ($1 --version) > /dev/null 2>&1; then
+       # We have it, but it failed.
+       exit 1
+    fi
+
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`acinclude.m4' or \`${configure_ac}'.  You might want
+         to install the \`Automake' and \`Perl' packages.  Grab them from
+         any GNU archive site."
+    touch aclocal.m4
+    ;;
+
+  autoconf)
+    if test -z "$run" && ($1 --version) > /dev/null 2>&1; then
+       # We have it, but it failed.
+       exit 1
+    fi
+
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`${configure_ac}'.  You might want to install the
+         \`Autoconf' and \`GNU m4' packages.  Grab them from any GNU
+         archive site."
+    touch configure
+    ;;
+
+  autoheader)
+    if test -z "$run" && ($1 --version) > /dev/null 2>&1; then
+       # We have it, but it failed.
+       exit 1
+    fi
+
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`acconfig.h' or \`${configure_ac}'.  You might want
+         to install the \`Autoconf' and \`GNU m4' packages.  Grab them
+         from any GNU archive site."
+    files=`sed -n 's/^[ ]*A[CM]_CONFIG_HEADER(\([^)]*\)).*/\1/p' ${configure_ac}`
+    test -z "$files" && files="config.h"
+    touch_files=
+    for f in $files; do
+      case "$f" in
+      *:*) touch_files="$touch_files "`echo "$f" |
+				       sed -e 's/^[^:]*://' -e 's/:.*//'`;;
+      *) touch_files="$touch_files $f.in";;
+      esac
+    done
+    touch $touch_files
+    ;;
+
+  automake*)
+    if test -z "$run" && ($1 --version) > /dev/null 2>&1; then
+       # We have it, but it failed.
+       exit 1
+    fi
+
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`Makefile.am', \`acinclude.m4' or \`${configure_ac}'.
+         You might want to install the \`Automake' and \`Perl' packages.
+         Grab them from any GNU archive site."
+    find . -type f -name Makefile.am -print |
+	   sed 's/\.am$/.in/' |
+	   while read f; do touch "$f"; done
+    ;;
+
+  autom4te)
+    if test -z "$run" && ($1 --version) > /dev/null 2>&1; then
+       # We have it, but it failed.
+       exit 1
+    fi
+
+    echo 1>&2 "\
+WARNING: \`$1' is needed, but is $msg.
+         You might have modified some files without having the
+         proper tools for further handling them.
+         You can get \`$1' as part of \`Autoconf' from any GNU
+         archive site."
+
+    file=`echo "$*" | sed -n 's/.*--output[ =]*\([^ ]*\).*/\1/p'`
+    test -z "$file" && file=`echo "$*" | sed -n 's/.*-o[ ]*\([^ ]*\).*/\1/p'`
+    if test -f "$file"; then
+	touch $file
+    else
+	test -z "$file" || exec >$file
+	echo "#! /bin/sh"
+	echo "# Created by GNU Automake missing as a replacement of"
+	echo "#  $ $@"
+	echo "exit 0"
+	chmod +x $file
+	exit 1
+    fi
+    ;;
+
+  bison|yacc)
+    echo 1>&2 "\
+WARNING: \`$1' $msg.  You should only need it if
+         you modified a \`.y' file.  You may need the \`Bison' package
+         in order for those modifications to take effect.  You can get
+         \`Bison' from any GNU archive site."
+    rm -f y.tab.c y.tab.h
+    if [ $# -ne 1 ]; then
+        eval LASTARG="\${$#}"
+	case "$LASTARG" in
+	*.y)
+	    SRCFILE=`echo "$LASTARG" | sed 's/y$/c/'`
+	    if [ -f "$SRCFILE" ]; then
+	         cp "$SRCFILE" y.tab.c
+	    fi
+	    SRCFILE=`echo "$LASTARG" | sed 's/y$/h/'`
+	    if [ -f "$SRCFILE" ]; then
+	         cp "$SRCFILE" y.tab.h
+	    fi
+	  ;;
+	esac
+    fi
+    if [ ! -f y.tab.h ]; then
+	echo >y.tab.h
+    fi
+    if [ ! -f y.tab.c ]; then
+	echo 'main() { return 0; }' >y.tab.c
+    fi
+    ;;
+
+  lex|flex)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified a \`.l' file.  You may need the \`Flex' package
+         in order for those modifications to take effect.  You can get
+         \`Flex' from any GNU archive site."
+    rm -f lex.yy.c
+    if [ $# -ne 1 ]; then
+        eval LASTARG="\${$#}"
+	case "$LASTARG" in
+	*.l)
+	    SRCFILE=`echo "$LASTARG" | sed 's/l$/c/'`
+	    if [ -f "$SRCFILE" ]; then
+	         cp "$SRCFILE" lex.yy.c
+	    fi
+	  ;;
+	esac
+    fi
+    if [ ! -f lex.yy.c ]; then
+	echo 'main() { return 0; }' >lex.yy.c
+    fi
+    ;;
+
+  help2man)
+    if test -z "$run" && ($1 --version) > /dev/null 2>&1; then
+       # We have it, but it failed.
+       exit 1
+    fi
+
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+	 you modified a dependency of a manual page.  You may need the
+	 \`Help2man' package in order for those modifications to take
+	 effect.  You can get \`Help2man' from any GNU archive site."
+
+    file=`echo "$*" | sed -n 's/.*-o \([^ ]*\).*/\1/p'`
+    if test -z "$file"; then
+	file=`echo "$*" | sed -n 's/.*--output=\([^ ]*\).*/\1/p'`
+    fi
+    if [ -f "$file" ]; then
+	touch $file
+    else
+	test -z "$file" || exec >$file
+	echo ".ab help2man is required to generate this page"
+	exit 1
+    fi
+    ;;
+
+  makeinfo)
+    if test -z "$run" && (makeinfo --version) > /dev/null 2>&1; then
+       # We have makeinfo, but it failed.
+       exit 1
+    fi
+
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified a \`.texi' or \`.texinfo' file, or any other file
+         indirectly affecting the aspect of the manual.  The spurious
+         call might also be the consequence of using a buggy \`make' (AIX,
+         DU, IRIX).  You might want to install the \`Texinfo' package or
+         the \`GNU make' package.  Grab either from any GNU archive site."
+    file=`echo "$*" | sed -n 's/.*-o \([^ ]*\).*/\1/p'`
+    if test -z "$file"; then
+      file=`echo "$*" | sed 's/.* \([^ ]*\) *$/\1/'`
+      file=`sed -n '/^@setfilename/ { s/.* \([^ ]*\) *$/\1/; p; q; }' $file`
+    fi
+    touch $file
+    ;;
+
+  tar)
+    shift
+    if test -n "$run"; then
+      echo 1>&2 "ERROR: \`tar' requires --run"
+      exit 1
+    fi
+
+    # We have already tried tar in the generic part.
+    # Look for gnutar/gtar before invocation to avoid ugly error
+    # messages.
+    if (gnutar --version > /dev/null 2>&1); then
+       gnutar "$@" && exit 0
+    fi
+    if (gtar --version > /dev/null 2>&1); then
+       gtar "$@" && exit 0
+    fi
+    firstarg="$1"
+    if shift; then
+	case "$firstarg" in
+	*o*)
+	    firstarg=`echo "$firstarg" | sed s/o//`
+	    tar "$firstarg" "$@" && exit 0
+	    ;;
+	esac
+	case "$firstarg" in
+	*h*)
+	    firstarg=`echo "$firstarg" | sed s/h//`
+	    tar "$firstarg" "$@" && exit 0
+	    ;;
+	esac
+    fi
+
+    echo 1>&2 "\
+WARNING: I can't seem to be able to run \`tar' with the given arguments.
+         You may want to install GNU tar or Free paxutils, or check the
+         command line arguments."
+    exit 1
+    ;;
+
+  *)
+    echo 1>&2 "\
+WARNING: \`$1' is needed, and is $msg.
+         You might have modified some files without having the
+         proper tools for further handling them.  Check the \`README' file,
+         it often tells you about the needed prerequisites for installing
+         this package.  You may also peek at any GNU archive site, in case
+         some other package would contain this missing \`$1' program."
+    exit 1
+    ;;
+esac
+
+exit 0
+
+# Local variables:
+# eval: (add-hook 'write-file-hooks 'time-stamp)
+# time-stamp-start: "scriptversion="
+# time-stamp-format: "%:y-%02m-%02d.%02H"
+# time-stamp-end: "$"
+# End:

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Wed May 16 12:23:48 2007
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class is responsible for launching and communicating with the child 
+ * process.
+ */
+class Application {
+  private static final Log LOG = LogFactory.getLog(Application.class.getName());
+  private ServerSocket serverSocket;
+  private Process process;
+  private Socket clientSocket;
+  private OutputHandler handler;
+  private BinaryProtocol downlink;
+
+  /**
+   * Start the child process to handle the task for us.
+   * @param conf the task's configuration
+   * @param output the collector to send output to
+   * @param reporter the reporter for the task
+   * @param outputKeyClass the class of the output keys
+   * @param outputValueClass the class of the output values
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  Application(JobConf conf, OutputCollector output, Reporter reporter,
+              Class outputKeyClass, Class outputValueClass
+              ) throws IOException, InterruptedException {
+    serverSocket = new ServerSocket(0);
+    Map<String, String> env = new HashMap<String,String>();
+    env.put("hadoop.pipes.command.port", 
+            Integer.toString(serverSocket.getLocalPort()));
+    List<String> cmd = new ArrayList<String>();
+    String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
+    FileUtil.chmod(executable, "a+x");
+    cmd.add(executable);
+    process = runClient(cmd, env);
+    clientSocket = serverSocket.accept();
+    handler = new OutputHandler(output, reporter);
+    WritableComparable outputKey = (WritableComparable)
+      ReflectionUtils.newInstance(outputKeyClass, conf);
+    Writable outputValue = (Writable) 
+      ReflectionUtils.newInstance(outputValueClass, conf);
+    downlink = new BinaryProtocol(clientSocket, handler, 
+                                  outputKey, outputValue, conf);
+    downlink.start();
+    downlink.setJobConf(conf);
+  }
+
+  /**
+   * Get the downward protocol object that can send commands down to the
+   * application.
+   * @return the downlink proxy
+   */
+  DownwardProtocol getDownlink() {
+    return downlink;
+  }
+
+  /**
+   * Wait for the application to finish
+   * @return did the application finish correctly?
+   * @throws Throwable
+   */
+  boolean waitForFinish() throws Throwable {
+    return handler.waitForFinish();
+  }
+
+  /**
+   * Abort the application and wait for it to finish.
+   * @param t the exception that signalled the problem
+   * @throws IOException A wrapper around the exception that was passed in
+   */
+  void abort(Throwable t) throws IOException {
+    LOG.info("Aborting because of " + StringUtils.stringifyException(t));
+    try {
+      downlink.abort();
+    } catch (IOException e) {
+      // IGNORE cleanup problems
+    }
+    try {
+      handler.waitForFinish();
+    } catch (Throwable ignored) {
+      process.destroy();
+    }
+    IOException wrapper = new IOException("pipe child exception");
+    wrapper.initCause(t);
+    throw wrapper;      
+  }
+  
+  /**
+   * Clean up the child procress and socket.
+   * @throws IOException
+   */
+  void cleanup() throws IOException {
+    serverSocket.close();
+    try {
+      downlink.closeConnection();
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }      
+  }
+
+  /**
+   * A thread to copy an input stream to an output stream.
+   * Errors cause the copy to stop and are not reported back.
+   * The input stream is closed when the thread exits. The output stream
+   * is not closed.
+   */
+  private static class OutputCopier extends Thread {
+    InputStream in;
+    OutputStream out;
+    OutputCopier(String name, InputStream in, OutputStream out) {
+      super(name);
+      this.in = in;
+      this.out = out;
+    }
+    public void run() {
+      byte[] buffer = new byte[65536];
+      try {
+        while (true) {
+          int size = in.read(buffer);
+          if (size == -1) {
+            break;
+          }
+          out.write(buffer, 0, size);
+        }
+      } catch (IOException ie) {
+      } finally {
+        try {
+          in.close();
+        } catch (IOException ie) { }
+      }
+    }
+  }
+
+  /**
+   * Run a given command in a subprocess, including threads to copy its stdout
+   * and stderr to our stdout and stderr.
+   * @param command the command and its arguments
+   * @param env the environment to run the process in
+   * @return a handle on the process
+   * @throws IOException
+   */
+  static Process runClient(List<String> command, 
+                           Map<String, String> env) throws IOException {
+    ProcessBuilder builder = new ProcessBuilder(command);
+    if (env != null) {
+      builder.environment().putAll(env);
+    }
+    Process result = builder.start();
+    result.getOutputStream().close();
+    new OutputCopier("pipes-stdout", result.getInputStream(), 
+                     System.out).start();
+    new OutputCopier("pipes-stderr", result.getErrorStream(), 
+                     System.err).start();
+    return result;
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Wed May 16 12:23:48 2007
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.*;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This protocol is a binary implementation of the Pipes protocol.
+ */
+class BinaryProtocol implements DownwardProtocol {
+  public static final int CURRENT_PROTOCOL_VERSION = 0;
+  private DataOutputStream stream;
+  private DataOutputBuffer buffer = new DataOutputBuffer();
+  private static final Log LOG = 
+    LogFactory.getLog(BinaryProtocol.class.getName());
+  private UplinkReaderThread uplink;
+
+  /**
+   * The integer codes to represent the different messages. These must match
+   * the C++ codes or massive confusion will result.
+   */
+  private static enum MessageType { START(0),
+                                    SET_JOB_CONF(1),
+                                    SET_INPUT_TYPES(2),
+                                    RUN_MAP(3),
+                                    MAP_ITEM(4),
+                                    RUN_REDUCE(5),
+                                    REDUCE_KEY(6),
+                                    REDUCE_VALUE(7),
+                                    CLOSE(8),
+                                    ABORT(9),
+                                    OUTPUT(50),
+                                    PARTITIONED_OUTPUT(51),
+                                    STATUS(52),
+                                    PROGRESS(53),
+                                    DONE(54);
+    final int code;
+    MessageType(int code) {
+      this.code = code;
+    }
+  }
+
+  private static class UplinkReaderThread extends Thread {
+    private DataInputStream inStream;
+    private UpwardProtocol handler;
+    private WritableComparable key;
+    private Writable value;
+    
+    public UplinkReaderThread(InputStream stream, UpwardProtocol handler, 
+                              WritableComparable key, Writable value
+                              ) throws IOException{
+      inStream = new DataInputStream(stream);
+      this.handler = handler;
+      this.key = key;
+      this.value = value;
+    }
+
+    public void closeConnection() throws IOException {
+      inStream.close();
+    }
+
+    public void run() {
+      while (true) {
+        try {
+          if (Thread.currentThread().isInterrupted()) {
+            throw new InterruptedException();
+          }
+          int cmd = WritableUtils.readVInt(inStream);
+          LOG.debug("Handling uplink command " + cmd);
+          if (cmd == MessageType.OUTPUT.code) {
+            readObject(key);
+            readObject(value);
+            handler.output(key, value);
+          } else if (cmd == MessageType.PARTITIONED_OUTPUT.code) {
+            int part = WritableUtils.readVInt(inStream);
+            readObject(key);
+            readObject(value);
+            handler.partitionedOutput(part, key, value);
+          } else if (cmd == MessageType.STATUS.code) {
+            handler.status(Text.readString(inStream));
+          } else if (cmd == MessageType.PROGRESS.code) {
+            handler.progress(inStream.readFloat());
+          } else if (cmd == MessageType.DONE.code) {
+            LOG.debug("Pipe child done");
+            handler.done();
+            return;
+          } else {
+            throw new IOException("Bad command code: " + cmd);
+          }
+        } catch (InterruptedException e) {
+          return;
+        } catch (Throwable e) {
+          LOG.error(StringUtils.stringifyException(e));
+          handler.failed(e);
+          return;
+        }
+      }
+    }
+    
+    private void readObject(Writable obj) throws IOException {
+      int numBytes = WritableUtils.readVInt(inStream);
+      byte[] buffer;
+      // For BytesWritable and Text, use the specified length to set the length
+      // this causes the "obvious" translations to work. So that if you emit
+      // a string "abc" from C++, it shows up as "abc".
+      if (obj instanceof BytesWritable) {
+        buffer = new byte[numBytes];
+        inStream.readFully(buffer);
+        ((BytesWritable) obj).set(buffer, 0, numBytes);
+      } else if (obj instanceof Text) {
+        buffer = new byte[numBytes];
+        inStream.readFully(buffer);
+        ((Text) obj).set(buffer);
+      } else {
+        obj.readFields(inStream);
+      }
+    }
+  }
+
+  /**
+   * An output stream that will save a copy of the data into a file.
+   */
+  private static class TeeOutputStream extends FilterOutputStream {
+    private OutputStream file;
+    TeeOutputStream(String filename, OutputStream base) throws IOException {
+      super(base);
+      file = new FileOutputStream(filename);
+    }
+    public void write(byte b[], int off, int len) throws IOException {
+      file.write(b,off,len);
+      out.write(b,off,len);
+    }
+
+    public void write(int b) throws IOException {
+      file.write(b);
+      out.write(b);
+    }
+
+    public void flush() throws IOException {
+      file.flush();
+      out.flush();
+    }
+
+    public void close() throws IOException {
+      flush();
+      file.close();
+      out.close();
+    }
+  }
+
+  /**
+   * Create a proxy object that will speak the binary protocol on a socket.
+   * Upward messages are passed on the the specified handler and downward
+   * downward messages are public methods on this object.
+   * @param sock The socket to communicate on.
+   * @param handler The handler for the received messages.
+   * @param key The object to read keys into.
+   * @param value The object to read values into.
+   * @param config The job's configuration
+   * @throws IOException
+   */
+  public BinaryProtocol(Socket sock, 
+                        UpwardProtocol handler,
+                        WritableComparable key,
+                        Writable value,
+                        JobConf config) throws IOException {
+    OutputStream raw = sock.getOutputStream();
+    // If we are debugging, save a copy of the downlink commands to a file
+    if (Submitter.getKeepCommandFile(config)) {
+      raw = new TeeOutputStream("downlink.data", raw);
+    }
+    stream = new DataOutputStream(raw);
+    uplink = new UplinkReaderThread(sock.getInputStream(), handler, key, value);
+    uplink.setName("pipe-uplink-handler");
+    uplink.start();
+  }
+
+  /**
+   * Close the connection and shutdown the handler thread.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void closeConnection() throws IOException, InterruptedException {
+    LOG.debug("closing connection");
+    stream.close();
+    uplink.closeConnection();
+    uplink.interrupt();
+    uplink.join();
+  }
+
+  public void start() throws IOException {
+    LOG.debug("starting downlink");
+    WritableUtils.writeVInt(stream, MessageType.START.code);
+    WritableUtils.writeVInt(stream, CURRENT_PROTOCOL_VERSION);
+  }
+
+  public void setJobConf(JobConf job) throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.SET_JOB_CONF.code);
+    List<String> list = new ArrayList<String>();
+    for(Map.Entry<String, String> itm: job) {
+      list.add(itm.getKey());
+      list.add(itm.getValue());
+    }
+    WritableUtils.writeVInt(stream, list.size());
+    for(String entry: list){
+      Text.writeString(stream, entry);
+    }
+  }
+
+  public void setInputTypes(String keyType, 
+                            String valueType) throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.SET_INPUT_TYPES.code);
+    Text.writeString(stream, keyType);
+    Text.writeString(stream, valueType);
+  }
+
+  public void runMap(InputSplit split, int numReduces, 
+                     boolean pipedInput) throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.RUN_MAP.code);
+    writeObject(split);
+    WritableUtils.writeVInt(stream, numReduces);
+    WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
+  }
+
+  public void mapItem(WritableComparable key, 
+                      Writable value) throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.MAP_ITEM.code);
+    writeObject(key);
+    writeObject(value);
+  }
+
+  public void runReduce(int reduce, boolean pipedOutput) throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.RUN_REDUCE.code);
+    WritableUtils.writeVInt(stream, reduce);
+    WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+  }
+
+  public void reduceKey(WritableComparable key) throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.REDUCE_KEY.code);
+    writeObject(key);
+  }
+
+  public void reduceValue(Writable value) throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.REDUCE_VALUE.code);
+    writeObject(value);
+  }
+
+  public void close() throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
+    LOG.debug("Sent close command");
+  }
+  
+  public void abort() throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.ABORT.code);
+    LOG.debug("Sent abort command");
+  }
+
+  /**
+   * Write the given object to the stream. If it is a Text or BytesWritable,
+   * write it directly. Otherwise, write it to a buffer and then write the
+   * length and data to the stream.
+   * @param obj the object to write
+   * @throws IOException
+   */
+  private void writeObject(Writable obj) throws IOException {
+    // For Text and BytesWritable, encode them directly, so that they end up
+    // in C++ as the natural translations.
+    if (obj instanceof Text) {
+      Text t = (Text) obj;
+      int len = t.getLength();
+      WritableUtils.writeVInt(stream, len);
+      stream.write(t.getBytes(), 0, len);
+    } else if (obj instanceof BytesWritable) {
+      BytesWritable b = (BytesWritable) obj;
+      int len = b.getSize();
+      WritableUtils.writeVInt(stream, len);
+      stream.write(b.get(), 0, len);
+    } else {
+      buffer.reset();
+      obj.write(buffer);
+      int length = buffer.getLength();
+      WritableUtils.writeVInt(stream, length);
+      stream.write(buffer.getData(), 0, length);
+    }
+  }
+  
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java Wed May 16 12:23:48 2007
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * The abstract description of the downward (from Java to C++) Pipes protocol.
+ * All of these calls are asynchronous and return before the message has been 
+ * processed.
+ */
+interface DownwardProtocol {
+  /**
+   * Start communication
+   * @throws IOException
+   */
+  void start() throws IOException;
+  
+  /**
+   * Set the JobConf for the task.
+   * @param conf
+   * @throws IOException
+   */
+  void setJobConf(JobConf conf) throws IOException;
+  
+  /**
+   * Set the input types for Maps.
+   * @param keyType the name of the key's type
+   * @param valueType the name of the value's type
+   * @throws IOException
+   */
+  void setInputTypes(String keyType, String valueType) throws IOException;
+  
+  /**
+   * Run a map task in the child.
+   * @param split The input split for this map.
+   * @param numReduces The number of reduces for this job.
+   * @param pipedInput Is the input coming from Java?
+   * @throws IOException
+   */
+  void runMap(InputSplit split, int numReduces, 
+              boolean pipedInput) throws IOException;
+  
+  /**
+   * For maps with pipedInput, the key/value pairs are sent via this messaage.
+   * @param key The record's key
+   * @param value The record's value
+   * @throws IOException
+   */
+  void mapItem(WritableComparable key, Writable value) throws IOException;
+  
+  /**
+   * Run a reduce task in the child
+   * @param reduce the index of the reduce (0 .. numReduces - 1)
+   * @param pipedOutput is the output being sent to Java?
+   * @throws IOException
+   */
+  void runReduce(int reduce, boolean pipedOutput) throws IOException;
+  
+  /**
+   * The reduce should be given a new key
+   * @param key the new key
+   * @throws IOException
+   */
+  void reduceKey(WritableComparable key) throws IOException;
+  
+  /**
+   * The reduce should be given a new value
+   * @param value the new value
+   * @throws IOException
+   */
+  void reduceValue(Writable value) throws IOException;
+  
+  /**
+   * The task has no more input coming, but it should finish processing it's 
+   * input.
+   * @throws IOException
+   */
+  void close() throws IOException;
+  
+  /**
+   * The task should stop as soon as possible, because something has gone wrong.
+   * @throws IOException
+   */
+  void abort() throws IOException;
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java Wed May 16 12:23:48 2007
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Handles the upward (C++ to Java) messages from the application.
+ */
+class OutputHandler implements UpwardProtocol {
+  private Reporter reporter;
+  private OutputCollector collector;
+  private float progressValue = 0.0f;
+  private boolean done = false;
+  private Throwable exception = null;
+  
+  /**
+   * Create a handler that will handle any records output from the application.
+   * @param collector the "real" collector that takes the output
+   * @param reporter the reporter for reporting progress
+   */
+  public OutputHandler(OutputCollector collector, Reporter reporter) {
+    this.reporter = reporter;
+    this.collector = collector;
+  }
+
+  /**
+   * The task output a normal record.
+   */
+  public void output(WritableComparable key, 
+                     Writable value) throws IOException {
+    collector.collect(key, value);
+  }
+
+  /**
+   * The task output a record with a partition number attached.
+   */
+  public void partitionedOutput(int reduce, WritableComparable key, 
+                                Writable value) throws IOException {
+    PipesPartitioner.setNextPartition(reduce);
+    collector.collect(key, value);
+  }
+
+  /**
+   * Update the status message for the task.
+   */
+  public void status(String msg) throws IOException {
+    reporter.setStatus(msg);
+  }
+
+  /**
+   * Update the amount done and call progress on the reporter.
+   */
+  public void progress(float progress) throws IOException {
+    progressValue = progress;
+    reporter.progress();
+  }
+
+  /**
+   * The task finished successfully.
+   */
+  public void done() throws IOException {
+    synchronized (this) {
+      done = true;
+      notify();
+    }
+  }
+
+  /**
+   * Get the current amount done.
+   * @return a float between 0.0 and 1.0
+   */
+  public float getProgress() {
+    return progressValue;
+  }
+
+  /**
+   * The task failed with an exception.
+   */
+  public void failed(Throwable e) {
+    synchronized (this) {
+      exception = e;
+      notify();
+    }
+  }
+
+  /**
+   * Wait for the task to finish or abort.
+   * @return did the task finish correctly?
+   * @throws Throwable
+   */
+  public synchronized boolean waitForFinish() throws Throwable {
+    while (!done && exception == null) {
+      wait();
+    }
+    if (exception != null) {
+      throw exception;
+    }
+    return done;
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Wed May 16 12:23:48 2007
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapRunner;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * An adaptor to run a C++ mapper.
+ */
+class PipesMapRunner extends MapRunner {
+  private JobConf job;
+
+  /**
+   * Get the new configuration.
+   * @param job the job's configuration
+   */
+  public void configure(JobConf job) {
+    this.job = job;
+  }
+
+  /**
+   * Run the map task.
+   * @param input the set of inputs
+   * @param output the object to collect the outputs of the map
+   * @param reporter the object to update with status
+   */
+  public void run(RecordReader input, OutputCollector output,
+                  Reporter reporter
+                  ) throws IOException {
+    Application application = null;
+    try {
+      application = new Application(job, output, reporter,
+                                    job.getMapOutputKeyClass(),
+                                    job.getMapOutputValueClass());
+    } catch (InterruptedException ie) {
+      throw new RuntimeException("interrupted", ie);
+    }
+    DownwardProtocol downlink = application.getDownlink();
+    boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
+    downlink.runMap(reporter.getInputSplit(), 
+                    job.getNumReduceTasks(), isJavaInput);
+    try {
+      if (isJavaInput) {
+        // allocate key & value instances that are re-used for all entries
+        WritableComparable key = input.createKey();
+        Writable value = input.createValue();
+        downlink.setInputTypes(key.getClass().getName(),
+                               value.getClass().getName());
+        
+        while (input.next(key, value)) {
+          // map pair to output
+          downlink.mapItem(key, value);
+        }
+        downlink.close();
+      }
+      application.waitForFinish();
+    } catch (Throwable t) {
+      application.abort(t);
+    } finally {
+      application.cleanup();
+    }
+  }
+  
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java Wed May 16 12:23:48 2007
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This partitioner is one that can either be set manually per a record or it
+ * can fall back onto a Java partitioner that was set by the user.
+ */
+class PipesPartitioner implements Partitioner {
+  private static ThreadLocal<Integer> cache = new ThreadLocal<Integer>();
+  private Partitioner part = null;
+  
+  public void configure(JobConf conf) {
+    part = (Partitioner) 
+      ReflectionUtils.newInstance(Submitter.getJavaPartitioner(conf), conf);
+  }
+
+  /**
+   * Set the next key to have the given partition.
+   * @param newValue the next partition value
+   */
+  static void setNextPartition(int newValue) {
+    cache.set(newValue);
+  }
+
+  /**
+   * If a partition result was set manually, return it. Otherwise, we call
+   * the Java partitioner.
+   * @param key the key to partition
+   * @param value the value to partition
+   * @param numPartitions the number of reduces
+   */
+  public int getPartition(WritableComparable key, Writable value, 
+                          int numPartitions) {
+    Integer result = cache.get();
+    if (result == null) {
+      return part.getPartition(key, value, numPartitions);
+    } else {
+      return result;
+    }
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java Wed May 16 12:23:48 2007
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * This class is used to talk to a C++ reduce task.
+ */
+class PipesReducer implements Reducer {
+  private static final Log LOG= LogFactory.getLog(PipesReducer.class.getName());
+  private JobConf job;
+  private Application application = null;
+  private DownwardProtocol downlink = null;
+  private boolean isOk = true;
+
+  public void configure(JobConf job) {
+    this.job = job;
+  }
+
+  /**
+   * Process all of the keys and values. Start up the application if we haven't
+   * started it yet.
+   */
+  public void reduce(WritableComparable key, Iterator values, 
+                     OutputCollector output, Reporter reporter
+                     ) throws IOException {
+    isOk = false;
+    startApplication(output, reporter);
+    downlink.reduceKey(key);
+    while (values.hasNext()) {
+      downlink.reduceValue((Writable) values.next());
+    }
+    isOk = true;
+  }
+
+  private void startApplication(OutputCollector output, Reporter reporter) throws IOException {
+    if (application == null) {
+      try {
+        LOG.info("starting application");
+        application = new Application(job, output, reporter, 
+                                      job.getOutputKeyClass(), 
+                                      job.getOutputValueClass());
+        downlink = application.getDownlink();
+      } catch (InterruptedException ie) {
+        throw new RuntimeException("interrupted", ie);
+      }
+      int reduce=0;
+      downlink.runReduce(reduce, Submitter.getIsJavaRecordWriter(job));
+    }
+  }
+
+  /**
+   * Handle the end of the input by closing down the application.
+   */
+  public void close() throws IOException {
+    // if we haven't started the application, we have nothing to do
+    if (isOk) {
+      OutputCollector nullCollector = new OutputCollector() {
+        public void collect(WritableComparable key, 
+                            Writable value) throws IOException {
+          // NULL
+        }
+      };
+      startApplication(nullCollector, Reporter.NULL);
+    }
+    try {
+      if (isOk) {
+        application.getDownlink().close();
+      } else {
+        application.getDownlink().abort();
+      }
+      LOG.info("waiting for finish");
+      application.waitForFinish();
+      LOG.info("got done");
+    } catch (Throwable t) {
+      application.abort(t);
+    } finally {
+      application.cleanup();
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java Wed May 16 12:23:48 2007
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+
+/**
+ * The main entry point and job submitter. It may either be used as a command
+ * line-based or API-based method to launch Pipes jobs.
+ */
+public class Submitter {
+
+  /**
+   * Get the URI of the application's executable.
+   * @param conf
+   * @return the URI where the application's executable is located
+   */
+  public static String getExecutable(JobConf conf) {
+    return conf.get("hadoop.pipes.executable");
+  }
+  
+  /**
+   * Set the URI for the application's executable. Normally this is a hdfs: 
+   * location.
+   * @param conf
+   * @param executable The URI of the application's executable.
+   */
+  public static void setExecutable(JobConf conf, String executable) {
+    conf.set("hadoop.pipes.executable", executable);
+  }
+
+  /**
+   * Set whether the job is using a Java RecordReader.
+   * @param conf the configuration to modify
+   * @param value the new value
+   */
+  public static void setIsJavaRecordReader(JobConf conf, boolean value) {
+    conf.setBoolean("hadoop.pipes.java.recordreader", value);
+  }
+
+  /**
+   * Check whether the job is using a Java RecordReader
+   * @param conf the configuration to check
+   * @return is it a Java RecordReader?
+   */
+  public static boolean getIsJavaRecordReader(JobConf conf) {
+    return conf.getBoolean("hadoop.pipes.java.recordreader", false);
+  }
+
+  /**
+   * Set whether the Mapper is written in Java.
+   * @param conf the configuration to modify
+   * @param value the new value
+   */
+  public static void setIsJavaMapper(JobConf conf, boolean value) {
+    conf.setBoolean("hadoop.pipes.java.mapper", value);
+  }
+
+  /**
+   * Check whether the job is using a Java Mapper.
+   * @param conf the configuration to check
+   * @return is it a Java Mapper?
+   */
+  public static boolean getIsJavaMapper(JobConf conf) {
+    return conf.getBoolean("hadoop.pipes.java.mapper", false);
+  }
+
+  /**
+   * Set whether the Reducer is written in Java.
+   * @param conf the configuration to modify
+   * @param value the new value
+   */
+  public static void setIsJavaReducer(JobConf conf, boolean value) {
+    conf.setBoolean("hadoop.pipes.java.reducer", value);
+  }
+
+  /**
+   * Check whether the job is using a Java Reducer.
+   * @param conf the configuration to check
+   * @return is it a Java Reducer?
+   */
+  public static boolean getIsJavaReducer(JobConf conf) {
+    return conf.getBoolean("hadoop.pipes.java.reducer", false);
+  }
+
+  /**
+   * Set whether the job will use a Java RecordWriter.
+   * @param conf the configuration to modify
+   * @param value the new value to set
+   */
+  public static void setIsJavaRecordWriter(JobConf conf, boolean value) {
+    conf.setBoolean("hadoop.pipes.java.recordwriter", value);
+  }
+
+  /**
+   * Will the reduce use a Java RecordWriter?
+   * @param conf the configuration to check
+   * @return true, if the output of the job will be written by Java
+   */
+  public static boolean getIsJavaRecordWriter(JobConf conf) {
+    return conf.getBoolean("hadoop.pipes.java.recordwriter", false);
+  }
+
+  /**
+   * Set the configuration, if it doesn't already have a value for the given
+   * key.
+   * @param conf the configuration to modify
+   * @param key the key to set
+   * @param value the new "default" value to set
+   */
+  private static void setIfUnset(JobConf conf, String key, String value) {
+    if (conf.get(key) == null) {
+      conf.set(key, value);
+    }
+  }
+
+  /**
+   * Save away the user's original partitioner before we override it.
+   * @param conf the configuration to modify
+   * @param cls the user's partitioner class
+   */
+  static void setJavaPartitioner(JobConf conf, Class cls) {
+    conf.set("hadoop.pipes.partitioner", cls.getName());
+  }
+  
+  /**
+   * Get the user's original partitioner.
+   * @param conf the configuration to look in
+   * @return the class that the user submitted
+   */
+  static Class getJavaPartitioner(JobConf conf) {
+    return conf.getClass("hadoop.pipes.partitioner", 
+                         HashPartitioner.class,
+                         Partitioner.class);
+  }
+
+  /**
+   * Does the user want to keep the command file for debugging? If this is
+   * true, pipes will write a copy of the command data to a file in the
+   * task directory named "downlink.data", which may be used to run the C++
+   * program under the debugger. You probably also want to set 
+   * JobConf.setKeepFailedTaskFiles(true) to keep the entire directory from
+   * being deleted.
+   * To run using the data file, set the environment variable 
+   * "hadoop.pipes.command.file" to point to the file.
+   * @param conf the configuration to check
+   * @return will the framework save the command file?
+   */
+  public static boolean getKeepCommandFile(JobConf conf) {
+    return conf.getBoolean("hadoop.pipes.command-file.keep", false);
+  }
+
+  /**
+   * Set whether to keep the command file for debugging
+   * @param conf the configuration to modify
+   * @param keep the new value
+   */
+  public static void setKeepCommandFile(JobConf conf, boolean keep) {
+    conf.setBoolean("hadoop.pipes.command-file.keep", keep);
+  }
+
+  /**
+   * Submit a job to the map/reduce cluster. All of the necessary modifications
+   * to the job to run under pipes are made to the configuration.
+   * @param conf the job to submit to the cluster (MODIFIED)
+   * @throws IOException
+   */
+  public static RunningJob submitJob(JobConf conf) throws IOException {
+    // default map output types to BytesWritable
+    if (!getIsJavaMapper(conf)) {
+      conf.setMapRunnerClass(PipesMapRunner.class);
+      // Save the user's partitioner and hook in our's.
+      setJavaPartitioner(conf, conf.getPartitionerClass());
+      conf.setPartitionerClass(PipesPartitioner.class);
+    }
+    if (!getIsJavaReducer(conf)) {
+      conf.setReducerClass(PipesReducer.class);
+      if (!getIsJavaRecordWriter(conf)) {
+        conf.setOutputFormat(NullOutputFormat.class);
+      }
+    }
+    String textClassname = Text.class.getName();
+    setIfUnset(conf, "mapred.output.key.class", textClassname);
+    setIfUnset(conf, "mapred.output.value.class", textClassname);
+    setIfUnset(conf, "mapred.output.key.class", textClassname);
+    setIfUnset(conf, "mapred.output.value.class", textClassname);
+    String exec = getExecutable(conf);
+    if (exec == null) {
+      throw new IllegalArgumentException("No application program defined.");
+    }
+    URI[] fileCache = DistributedCache.getCacheFiles(conf);
+    if (fileCache == null) {
+      fileCache = new URI[1];
+    } else {
+      URI[] tmp = new URI[fileCache.length+1];
+      System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
+      fileCache = tmp;
+    }
+    try {
+      fileCache[0] = new URI(exec);
+    } catch (URISyntaxException e) {
+      IOException ie = new IOException("Problem parsing execable URI " + exec);
+      ie.initCause(e);
+      throw ie;
+    }
+    DistributedCache.setCacheFiles(fileCache, conf);
+    return JobClient.runJob(conf);
+  }
+
+  /**
+   * A command line parser for the CLI-based Pipes job submitter.
+   */
+  static class CommandLineParser {
+    private DefaultOptionBuilder option = 
+      new DefaultOptionBuilder("-","-", false);
+    private ArgumentBuilder arg = new ArgumentBuilder();
+    private GroupBuilder optionList = new GroupBuilder();
+    
+    void addOption(String longName, boolean required, String description, 
+                   String paramName) {
+      arg.withName(paramName).withMinimum(1).withMaximum(1);
+      optionList.withOption(option.withLongName(longName).
+                                   withArgument(arg.create()).
+                                   withDescription(description).
+                                   withRequired(required).create());
+    }
+    
+    void addArgument(String name, boolean required, String description) {
+      arg.withName(name).withMinimum(1).withMaximum(1);
+      optionList.withOption(arg.create());
+    }
+
+    Parser createParser() {
+      Parser result = new Parser();
+      result.setGroup(optionList.create());
+      return result;
+    }
+    
+    void printUsage() {
+      // The CLI package should do this for us, but I can't figure out how
+      // to make it print something reasonable.
+      System.out.println("bin/hadoop pipes");
+      System.out.println("  [-conf <path>]  // Configuration for job");
+      System.out.println("  [-input <path>] // Input directory");
+      System.out.println("  [-output <path>] // Output directory");
+      System.out.println("  [-jar <jar file> // jar filename");
+      System.out.println("  [-inputformat <class>] // InputFormat class");
+      System.out.println("  [-map <class>] // Java Map class");
+      System.out.println("  [-partitioner <class>] // Java Partitioner");
+      System.out.println("  [-reduce <class>] // Java Reduce class");
+      System.out.println("  [-writer <class>] // Java RecordWriter");
+      System.out.println("  [-program <executable>] // executable URI");
+      System.out.println("  [-reduces <num>] // number of reduces");
+    }
+  }
+  
+  private static <InterfaceType> 
+  Class<? extends InterfaceType> getClass(CommandLine cl, String key, 
+                                          JobConf conf, 
+                                          Class<InterfaceType> cls
+                                         ) throws ClassNotFoundException {
+    return conf.getClassByName((String) cl.getValue(key)).asSubclass(cls);
+  }
+
+  /**
+   * Submit a pipes job based on the command line arguments.
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    CommandLineParser cli = new CommandLineParser();
+    if (args.length == 0) {
+      cli.printUsage();
+      return;
+    }
+    cli.addOption("input", false, "input path to the maps", "path");
+    cli.addOption("output", false, "output path from the reduces", "path");
+    cli.addOption("conf", false, "job xml configuration file", "path");
+    cli.addOption("jar", false, "job jar file", "path");
+    cli.addOption("inputformat", false, "java classname of InputFormat", 
+                  "class");
+    //cli.addArgument("javareader", false, "is the RecordReader in Java");
+    cli.addOption("map", false, "java classname of Mapper", "class");
+    cli.addOption("partitioner", false, "java classname of Partitioner", 
+                  "class");
+    cli.addOption("reduce", false, "java classname of Reducer", "class");
+    cli.addOption("writer", false, "java classname of OutputFormat", "class");
+    cli.addOption("program", false, "URI to application executable", "class");
+    cli.addOption("reduces", false, "number of reduces", "num");
+    Parser parser = cli.createParser();
+    try {
+      CommandLine results = parser.parse(args);
+      JobConf conf = new JobConf();
+      if (results.hasOption("-conf")) {
+        conf.addFinalResource(new Path((String) results.getValue("-conf")));
+      }
+      if (results.hasOption("-input")) {
+        conf.setInputPath(new Path((String) results.getValue("-input")));
+      }
+      if (results.hasOption("-output")) {
+        conf.setOutputPath(new Path((String) results.getValue("-output")));
+      }
+      if (results.hasOption("-jar")) {
+        conf.setJar((String) results.getValue("-jar"));
+      }
+      if (results.hasOption("-inputformat")) {
+        setIsJavaRecordReader(conf, true);
+        conf.setInputFormat(getClass(results, "-inputformat", conf,
+                                     InputFormat.class));
+      }
+      if (results.hasOption("-javareader")) {
+        setIsJavaRecordReader(conf, true);
+      }
+      if (results.hasOption("-map")) {
+        setIsJavaMapper(conf, true);
+        conf.setMapperClass(getClass(results, "-map", conf, Mapper.class));
+      }
+      if (results.hasOption("-partitioner")) {
+        conf.setPartitionerClass(getClass(results, "-partitioner", conf,
+                                          Partitioner.class));
+      }
+      if (results.hasOption("-reduce")) {
+        setIsJavaReducer(conf, true);
+        conf.setReducerClass(getClass(results, "-reduce", conf, Reducer.class));
+      }
+      if (results.hasOption("-reduces")) {
+        conf.setNumReduceTasks(Integer.parseInt((String) 
+                                                results.getValue("-reduces")));
+      }
+      if (results.hasOption("-writer")) {
+        setIsJavaRecordWriter(conf, true);
+        conf.setOutputFormat(getClass(results, "-writer", conf, 
+                                      OutputFormat.class));
+      }
+      if (results.hasOption("-program")) {
+        setExecutable(conf, (String) results.getValue("-program"));
+      }
+      // if they gave us a jar file, include it into the class path
+      String jarFile = conf.getJar();
+      if (jarFile != null) {
+        ClassLoader loader =
+          new URLClassLoader(new URL[]{ FileSystem.getLocal(conf).
+                                        pathToFile(new Path(jarFile)).toURL()});
+        conf.setClassLoader(loader);
+      }
+      submitJob(conf);
+    } catch (OptionException oe) {
+      cli.printUsage();
+    }
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java Wed May 16 12:23:48 2007
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The interface for the messages that can come up from the child. All of these
+ * calls are asynchronous and return before the message has been processed.
+ */
+interface UpwardProtocol {
+  /**
+   * Output a record from the child.
+   * @param key the record's key
+   * @param value the record's value
+   * @throws IOException
+   */
+  void output(WritableComparable key, Writable value) throws IOException;
+  
+  /**
+   * Map functions where the application has defined a partition function
+   * output records along with their partition.
+   * @param reduce the reduce to send this record to
+   * @param key the record's key
+   * @param value the record's value
+   * @throws IOException
+   */
+  void partitionedOutput(int reduce, WritableComparable key, 
+                         Writable value) throws IOException;
+  
+  /**
+   * Update the task's status message
+   * @param msg the string to display to the user
+   * @throws IOException
+   */
+  void status(String msg) throws IOException;
+  
+  /**
+   * Report making progress (and the current progress)
+   * @param progress the current progress (0.0 to 1.0)
+   * @throws IOException
+   */
+  void progress(float progress) throws IOException;
+  
+  /**
+   * Report that the application has finished processing all inputs 
+   * successfully.
+   * @throws IOException
+   */
+  void done() throws IOException;
+  
+  /**
+   * Report that the application or more likely communication failed.
+   * @param e
+   */
+  void failed(Throwable e);
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/package.html?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/package.html (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/package.html Wed May 16 12:23:48 2007
@@ -0,0 +1,97 @@
+<html>
+<body>
+
+We need a way to convert a large body of C++ code to use Hadoop DFS
+and map/reduce. The primary approach will be to split the C++ code
+into a separate process that does the application specific code. In
+many ways, the approach will be similar to Hadoop streaming, but using
+Writable serialization to convert the types into bytes that are sent
+to the process via a socket.
+
+<p>
+
+A new class org.apache.hadoop.mapred.pipes.Submitter will have a
+public static method to submit a job as a JobConf and a main method
+that takes an application and optional configuration file, input
+directories, and output directory. The cli for the new main will look
+like:
+
+<pre>
+bin/hadoop pipes \
+  [-conf <i>path</i>] \
+  [-input <i>inputDir</i>] \
+  [-output <i>outputDir</i>] \
+  [-jar <i>applicationJarFile</i>] \
+  [-inputformat <i>class</i>] \
+  [-map <i>class</i>] \
+  [-partitioner <i>class</i>] \
+  [-reduce <i>class</i>] \
+  [-writer <i>class</i>] \
+  [-program <i>program url</i>]
+</pre>
+
+<p>
+
+The application program will link against a thin C++ wrapper library that
+will handle the communication with the rest of the Hadoop
+system.  A goal of the interface is to be "swigable" so that
+interfaces can be generated for python and other scripting
+languages. All of the C++ functions and classes are in the HadoopPipes
+namespace. The job may consist of any combination of Java and C++ RecordReaders,
+Mappers, Paritioner, Combiner, Reducer, and RecordWriter.
+
+<p>
+
+Hadoop will be given a generic Java class for handling the mapper and reducer
+(PipesMapRunner and PipesReducer). They will fork off the application
+program and communicate with it over a socket. The communication will
+be handled by the C++ wrapper library and the PipesMapRunner and
+PipesReducer.
+
+<p>
+
+The application program will pass in a factory object that can create
+the various objects needed by the framework to the runTask
+function. The framework will create the Mapper or Reducer as
+appropriate and call the map or reduce method to invoke the
+application's code. The JobConf will be available to the application.
+
+<p>
+
+The Mapper and Reducer objects get all of their inputs, outputs, and
+context via context objects. The advantage of using the context
+objects is that their interface can be extended with additional
+methods without breaking clients. Although this interface is different
+from the current Java interface, the plan is to migrate the Java
+interface in this direction.
+
+<p>
+
+Although the Java implementation is typed, the C++ interfaces of keys
+and values is just a byte buffer. Since STL strings provide precisely
+the right functionality and are standard, they will be used. The
+decision to not use stronger types was to simplify the interface.
+
+<p>
+
+The application can also define combiner functions. The combiner will
+be run locally by the framework in the application process to avoid
+the round trip to the Java process and back. Because the compare
+function is not available in C++, the combiner will use memcmp to
+sort the inputs to the combiner. This is not as general as the Java
+equivalent, which uses the user's comparator, but should cover the
+majority of the use cases. As the map function outputs key/value
+pairs, they will be buffered. When the buffer is full, it will be
+sorted and passed to the combiner. The output of the combiner will be
+sent to the Java process.
+
+<p>
+
+The application can also set a partition function to control which key
+is given to a particular reduce. If a partition function is not
+defined, the Java one will be used. The partition function will be
+called by the C++ framework before the key/value pair is sent back to
+Java.
+
+</body>
+</html>

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Wed May 16 12:23:48 2007
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.util.StringUtils;
+
+import junit.framework.TestCase;
+
+public class TestPipes extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestPipes.class.getName());
+
+  public void testPipes() throws IOException {
+    if (System.getProperty("compile.c++") == null) {
+      LOG.info("compile.c++ is not defined, so skipping TestPipes");
+      return;
+    }
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fs = null;
+    Path cppExamples = new Path(System.getProperty("install.c++.examples"));
+    Path inputPath = new Path("/testing/in");
+    Path outputPath = new Path("/testing/out");
+    try {
+      final int numSlaves = 2;
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, numSlaves, true, null);
+      fs = dfs.getFileSystem();
+      mr = new MiniMRCluster(numSlaves, fs.getName(), 1);
+      writeInputFile(fs, inputPath);
+      runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"), 
+                 inputPath, outputPath, twoSplitOutput);
+      FileUtil.fullyDelete(fs, outputPath);
+      assertFalse("output not cleaned up", fs.exists(outputPath));
+      runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-part"),
+                 inputPath, outputPath, fixedPartitionOutput);
+      runNonPipedProgram(mr, fs, new Path(cppExamples, "bin/wordcount-nopipe"));
+      mr.waitUntilIdle();
+    } finally {
+      mr.shutdown();
+      dfs.shutdown();
+    }
+  }
+
+  final static String[] twoSplitOutput = new String[] {
+    "`and\t1\na\t1\nand\t1\nbeginning\t1\nbook\t1\nbut\t1\nby\t1\n" +
+    "conversation?'\t1\ndo:\t1\nhad\t2\nhaving\t1\nher\t2\nin\t1\nit\t1\n"+
+    "it,\t1\nno\t1\nnothing\t1\nof\t3\non\t1\nonce\t1\nor\t3\npeeped\t1\n"+
+    "pictures\t2\nthe\t3\nthought\t1\nto\t2\nuse\t1\nwas\t2\n",
+
+    "Alice\t2\n`without\t1\nbank,\t1\nbook,'\t1\nconversations\t1\nget\t1\n" +
+    "into\t1\nis\t1\nreading,\t1\nshe\t1\nsister\t2\nsitting\t1\ntired\t1\n" +
+    "twice\t1\nvery\t1\nwhat\t1\n"
+  };
+  
+  final static String[] fixedPartitionOutput = new String[] {
+    "Alice\t2\n`and\t1\n`without\t1\na\t1\nand\t1\nbank,\t1\nbeginning\t1\n" +
+    "book\t1\nbook,'\t1\nbut\t1\nby\t1\nconversation?'\t1\nconversations\t1\n"+
+    "do:\t1\nget\t1\nhad\t2\nhaving\t1\nher\t2\nin\t1\ninto\t1\nis\t1\n" +
+    "it\t1\nit,\t1\nno\t1\nnothing\t1\nof\t3\non\t1\nonce\t1\nor\t3\n" +
+    "peeped\t1\npictures\t2\nreading,\t1\nshe\t1\nsister\t2\nsitting\t1\n" +
+    "the\t3\nthought\t1\ntired\t1\nto\t2\ntwice\t1\nuse\t1\n" +
+    "very\t1\nwas\t2\nwhat\t1\n",
+    
+    ""                                                   
+  };
+  
+  private void writeInputFile(FileSystem fs, Path dir) throws IOException {
+    DataOutputStream out = fs.create(new Path(dir, "part0"));
+    out.writeBytes("Alice was beginning to get very tired of sitting by her\n");
+    out.writeBytes("sister on the bank, and of having nothing to do: once\n");
+    out.writeBytes("or twice she had peeped into the book her sister was\n");
+    out.writeBytes("reading, but it had no pictures or conversations in\n");
+    out.writeBytes("it, `and what is the use of a book,' thought Alice\n");
+    out.writeBytes("`without pictures or conversation?'\n");
+    out.close();
+  }
+
+  private void runProgram(MiniMRCluster mr, FileSystem fs, 
+                          Path program, Path inputPath, Path outputPath,
+                          String[] expectedResults
+                         ) throws IOException {
+    Path wordExec = new Path("/testing/bin/application");
+    FileUtil.fullyDelete(fs, wordExec.getParent());
+    fs.copyFromLocalFile(program, wordExec);                                         
+    JobConf job = mr.createJobConf();
+    job.setNumMapTasks(3);
+    job.setNumReduceTasks(expectedResults.length);
+    Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
+    Submitter.setIsJavaRecordReader(job, true);
+    Submitter.setIsJavaRecordWriter(job, true);
+    job.setInputPath(inputPath);
+    job.setOutputPath(outputPath);
+    RunningJob result = Submitter.submitJob(job);
+    assertTrue("pipes job failed", result.isSuccessful());
+    List<String> results = new ArrayList<String>();
+    for (Path p:fs.listPaths(outputPath)) {
+      results.add(TestMiniMRWithDFS.readOutput(p, job));
+    }
+    assertEquals("number of reduces is wrong", 
+                 expectedResults.length, results.size());
+    for(int i=0; i < results.size(); i++) {
+      assertEquals("pipes program " + program + " output " + i + " wrong",
+                   expectedResults[i], results.get(i));
+    }
+  }
+  
+  /**
+   * Run a map/reduce word count that does all of the map input and reduce
+   * output directly rather than sending it back up to Java.
+   * @param mr The mini mr cluster
+   * @param dfs the dfs cluster
+   * @param program the program to run
+   * @throws IOException
+   */
+  private void runNonPipedProgram(MiniMRCluster mr, FileSystem dfs,
+                                  Path program) throws IOException {
+    JobConf job = mr.createJobConf();
+    job.setInputFormat(WordCountInputFormat.class);
+    FileSystem local = FileSystem.getLocal(job);
+    Path testDir = new Path(System.getProperty("test.build.data"), "pipes");
+    Path inDir = new Path(testDir, "input");
+    Path outDir = new Path(testDir, "output");
+    Path wordExec = new Path("/testing/bin/application");
+    Path jobXml = new Path(testDir, "job.xml");
+    FileUtil.fullyDelete(dfs, wordExec.getParent());
+    dfs.copyFromLocalFile(program, wordExec);
+    DataOutputStream out = local.create(new Path(inDir, "part0"));
+    out.writeBytes("i am a silly test\n");
+    out.writeBytes("you are silly\n");
+    out.close();
+    out = local.create(new Path(inDir, "part1"));
+    out.writeBytes("all silly things drink java\n");
+    out.close();
+    FileUtil.fullyDelete(local, outDir);
+    local.mkdirs(outDir);
+    out = local.create(jobXml);
+    job.write(out);
+    out.close();
+    try {
+      Submitter.main(new String[]{"-conf", jobXml.toString(),
+                                  "-input", inDir.toString(),
+                                  "-output", outDir.toString(),
+                                  "-program", 
+                                  dfs.makeQualified(wordExec).toString(),
+                                  "-reduces", "2"});
+    } catch (Exception e) {
+      assertTrue("got exception: " + StringUtils.stringifyException(e), false);
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/WordCountInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/WordCountInputFormat.java?view=auto&rev=538693
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/WordCountInputFormat.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/WordCountInputFormat.java Wed May 16 12:23:48 2007
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+/**
+ * This is a support class to test Hadoop Pipes when using C++ RecordReaders.
+ * It defines an InputFormat with InputSplits that are just strings. The
+ * RecordReaders are not implemented in Java, naturally...
+ */
+public class WordCountInputFormat implements InputFormat {
+  static class WordCountInputSplit implements InputSplit  {
+    private String filename;
+    WordCountInputSplit() { }
+    WordCountInputSplit(Path filename) {
+      this.filename = filename.toString();
+    }
+    public void write(DataOutput out) throws IOException { 
+      Text.writeString(out, filename); 
+    }
+    public void readFields(DataInput in) throws IOException { 
+      filename = Text.readString(in); 
+    }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public InputSplit[] getSplits(JobConf conf, 
+                                int numSplits) throws IOException {
+    ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+    FileSystem local = FileSystem.getLocal(conf);
+    for(Path dir: conf.getInputPaths()) {
+      for(Path file: local.listPaths(dir)) {
+        result.add(new WordCountInputSplit(file));
+      }
+    }
+    return result.toArray(new InputSplit[result.size()]);
+  }
+  public void validateInput(JobConf conf) { }
+  public RecordReader getRecordReader(InputSplit split, JobConf conf, 
+                                      Reporter reporter) {
+    return new RecordReader(){
+      public boolean next(Writable key, Writable value) throws IOException {
+        return false;
+      }
+      public WritableComparable createKey() {
+        return new IntWritable();
+      }
+      public Writable createValue() {
+        return new Text();
+      }
+      public long getPos() {
+        return 0;
+      }
+      public void close() { }
+      public float getProgress() { 
+        return 0.0f;
+      }
+    };
+  }
+}



Mime
View raw message