hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1388325 - in /hama/trunk: ./ bin/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/pipes/
Date Fri, 21 Sep 2012 04:58:35 GMT
Author: edwardyoon
Date: Fri Sep 21 04:58:34 2012
New Revision: 1388325

URL: http://svn.apache.org/viewvc?rev=1388325&view=rev
Log:
Hama Streaming

Added:
    hama/trunk/core/src/main/java/org/apache/hama/pipes/
    hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java   (with props)
    hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java   (with props)
    hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java   (with props)
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java   (with props)
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java   (with props)
    hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java   (with props)
    hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java   (with props)
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/bin/hama
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1388325&r1=1388324&r2=1388325&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Sep 21 04:58:34 2012
@@ -3,6 +3,8 @@ Hama Change Log
 Release 0.6 (unreleased changes)
 
   NEW FEATURES
+ 
+   HAMA-601: Hama Streaming (tjungblut)
 
   BUG FIXES
 

Modified: hama/trunk/bin/hama
URL: http://svn.apache.org/viewvc/hama/trunk/bin/hama?rev=1388325&r1=1388324&r2=1388325&view=diff
==============================================================================
--- hama/trunk/bin/hama (original)
+++ hama/trunk/bin/hama Fri Sep 21 04:58:34 2012
@@ -60,6 +60,7 @@ if [ $# = 0 ]; then
   echo "  zookeeper            run a Zookeeper server"
   echo "  job                  manipulate BSP jobs"
   echo "  jar <jar>            run a jar file"
+  echo "  pipes	               run a pipe job"
   echo " or"
   echo "  CLASSNAME            run the class named CLASSNAME"
   echo "Most commands print help when invoked w/o parameters."
@@ -160,6 +161,8 @@ elif [ "$COMMAND" = "zookeeper" ] ; then
   CLASS='org.apache.hama.ZooKeeperRunner'
 elif [ "$COMMAND" = "job" ] ; then
   CLASS='org.apache.hama.bsp.BSPJobClient'
+elif [ "$COMMAND" = "pipes" ] ; then
+  CLASS='org.apache.hama.pipes.Submitter'
 elif [ "$COMMAND" = "jar" ] ; then
   CLASS=org.apache.hama.util.RunJar
   BSP_OPTS="$BSP_OPTS"

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1388325&r1=1388324&r2=1388325&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Fri Sep 21 04:58:34 2012
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
@@ -276,6 +277,7 @@ public final class BSPPeerImpl<K1, V1, K
         }
       }
     }
+    LOG.info("Moving to local cache files: " + files.toString() +" INITIALLY IT WAS: " + Arrays.toString(DistributedCache.getCacheFiles(conf)));
     if (files.length() > 0) {
       DistributedCache.addLocalFiles(conf, files.toString());
     }

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java?rev=1388325&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java Fri Sep 21 04:58:34 2012
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** MODIFIED FOR GPGPU Usage! **/
+
+package org.apache.hama.pipes;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskLog;
+
+/**
+ * This class is responsible for launching and communicating with the child
+ * process.
+ * 
+ * Adapted from Hadoop Pipes.
+ * 
+ */
+public class Application<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable> {
+
+  private static final Log LOG = LogFactory.getLog(Application.class.getName());
+  private ServerSocket serverSocket;
+  private Process process;
+  private Socket clientSocket;
+
+  private DownwardProtocol<K1, V1> downlink;
+
+  static final boolean WINDOWS = System.getProperty("os.name").startsWith(
+      "Windows");
+
+  /**
+   * Start the child process to handle the task for us.
+   * 
+   * @param peer the current peer including the task's configuration
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  Application(BSPPeer<K1, V1, K2, V2, BytesWritable> peer) throws IOException,
+      InterruptedException {
+
+    Map<String, String> env = new HashMap<String, String>();
+    boolean streamingEnabled = peer.getConfiguration().getBoolean(
+        "hama.streaming.enabled", false);
+
+    if (!streamingEnabled) {
+      serverSocket = new ServerSocket(0);
+      env.put("hama.pipes.command.port",
+          Integer.toString(serverSocket.getLocalPort()));
+    }
+    // add TMPDIR environment variable with the value of java.io.tmpdir
+    env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
+
+    /* Set Logging Environment from Configuration */
+    env.put("hama.pipes.logging",
+        peer.getConfiguration().getBoolean("hama.pipes.logging", false) ? "1"
+            : "0");
+    LOG.debug("DEBUG hama.pipes.logging: "
+        + peer.getConfiguration().getBoolean("hama.pipes.logging", false));
+
+    List<String> cmd = new ArrayList<String>();
+    String interpretor = peer.getConfiguration().get(
+        "hama.pipes.executable.interpretor");
+    if (interpretor != null) {
+      cmd.add(interpretor);
+    }
+
+    String executable = null;
+    try {
+      LOG.debug("DEBUG LocalCacheFilesCount: "
+          + DistributedCache.getLocalCacheFiles(peer.getConfiguration()).length);
+      for (Path u : DistributedCache
+          .getLocalCacheFiles(peer.getConfiguration()))
+        LOG.debug("DEBUG LocalCacheFiles: " + u);
+
+      executable = DistributedCache.getLocalCacheFiles(peer.getConfiguration())[0]
+          .toString();
+
+      LOG.info("executable: " + executable);
+
+    } catch (Exception e) {
+      LOG.error("Executable: " + executable + " fs.default.name: "
+          + peer.getConfiguration().get("fs.default.name"));
+
+      throw new IOException("Executable is missing!");
+    }
+
+    if (!new File(executable).canExecute()) {
+      // LinuxTaskController sets +x permissions on all distcache files already.
+      // In case of DefaultTaskController, set permissions here.
+      FileUtil.chmod(executable, "u+x");
+    }
+    cmd.add(executable);
+
+    String additionalArgs = peer.getConfiguration().get(
+        "hama.pipes.executable.args");
+    // if true, we are resolving filenames with the linked paths in
+    // DistributedCache
+    boolean resolveArguments = peer.getConfiguration().getBoolean(
+        "hama.pipes.resolve.executable.args", false);
+    if (additionalArgs != null && !additionalArgs.isEmpty()) {
+      String[] split = additionalArgs.split(" ");
+      for (String s : split) {
+        if (resolveArguments) {
+          for (Path u : DistributedCache.getLocalCacheFiles(peer
+              .getConfiguration())) {
+            if (u.getName().equals(s)) {
+              LOG.info("Resolved argument \"" + s
+                  + "\" with fully qualified path \"" + u.toString() + "\"!");
+              cmd.add(u.toString());
+              break;
+            }
+          }
+        } else {
+          cmd.add(s);
+        }
+      }
+    }
+
+    // wrap the command in a stdout/stderr capture
+    TaskAttemptID taskid = peer.getTaskId();
+    File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+    File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+    // Get the desired maximum length of task's logs.
+    long logLength = TaskLog.getTaskLogLength(peer.getConfiguration());
+    if (!streamingEnabled) {
+      cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength);
+    } else {
+      // use tee in streaming to get the output to file
+      cmd = TaskLog.captureOutAndErrorTee(null, cmd, stdout, stderr, logLength);
+    }
+
+    if (!stdout.getParentFile().exists()) {
+      stdout.getParentFile().mkdirs();
+      LOG.info("STDOUT: " + stdout.getParentFile().getAbsolutePath()
+          + " created!");
+    }
+    LOG.info("STDOUT: " + stdout.getAbsolutePath());
+
+    if (!stderr.getParentFile().exists()) {
+      stderr.getParentFile().mkdirs();
+      LOG.info("STDERR: " + stderr.getParentFile().getAbsolutePath()
+          + " created!");
+    }
+    LOG.info("STDERR: " + stderr.getAbsolutePath());
+
+    LOG.info("DEBUG: cmd: " + cmd);
+
+    process = runClient(cmd, env); // fork c++ binary
+
+    try {
+      if (streamingEnabled) {
+        downlink = new StreamingProtocol(peer, process.getOutputStream(),
+            process.getInputStream());
+      } else {
+        LOG.info("DEBUG: waiting for Client at "
+            + serverSocket.getLocalSocketAddress());
+        serverSocket.setSoTimeout(2000);
+        clientSocket = serverSocket.accept();
+        downlink = new BinaryProtocol<K1, V1, K2, V2>(peer,
+            clientSocket.getOutputStream(), clientSocket.getInputStream());
+      }
+      downlink.start();
+
+    } catch (SocketException e) {
+      throw new SocketException(
+          "Timout: Client pipes application was not connecting!");
+    }
+  }
+
+  /**
+   * Get the downward protocol object that can send commands down to the
+   * application.
+   * 
+   * @return the downlink proxy
+   */
+  DownwardProtocol<K1, V1> getDownlink() {
+    return downlink;
+  }
+
+  /**
+   * Wait for the application to finish
+   * 
+   * @return did the application finish correctly?
+   * @throws IOException
+   * @throws Throwable
+   */
+  boolean waitForFinish() throws InterruptedException, IOException {
+    downlink.flush();
+    return downlink.waitForFinish();
+  }
+
+  /**
+   * Abort the application and wait for it to finish.
+   * 
+   * @param t the exception that signalled the problem
+   * @throws IOException A wrapper around the exception that was passed in
+   */
+  void abort(Throwable t) throws IOException {
+    LOG.info("Aborting because of " + StringUtils.stringifyException(t));
+    try {
+      downlink.abort();
+      downlink.flush();
+    } catch (IOException e) {
+      // IGNORE cleanup problems
+    }
+    try {
+      downlink.waitForFinish();
+    } catch (Throwable ignored) {
+      process.destroy();
+    }
+    IOException wrapper = new IOException("pipe child exception");
+    wrapper.initCause(t);
+    throw wrapper;
+  }
+
+  /**
+   * Clean up the child process and socket if exist.
+   */
+  void cleanup() throws IOException {
+    if (serverSocket != null) {
+      serverSocket.close();
+    }
+    try {
+      if (downlink != null) {
+        downlink.close();
+      }
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Run a given command in a subprocess, including threads to copy its stdout
+   * and stderr to our stdout and stderr.
+   * 
+   * @param command the command and its arguments
+   * @param env the environment to run the process in
+   * @return a handle on the process
+   * @throws IOException
+   */
+  static Process runClient(List<String> command, Map<String, String> env)
+      throws IOException {
+    ProcessBuilder builder = new ProcessBuilder(command);
+    if (env != null) {
+      builder.environment().putAll(env);
+    }
+    Process result = builder.start();
+    return result;
+  }
+
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java?rev=1388325&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java Fri Sep 21 04:58:34 2012
@@ -0,0 +1,593 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+
+/**
+ * This protocol is a binary implementation of the Hama Pipes protocol.
+ * 
+ * Adapted from Hadoop Pipes.
+ * 
+ */
+public class BinaryProtocol<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+    implements DownwardProtocol<K1, V1> {
+
+  protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class
+      .getName());
+  public static final int CURRENT_PROTOCOL_VERSION = 0;
+  /**
+   * The buffer size for the command socket
+   */
+  private static final int BUFFER_SIZE = 128 * 1024;
+
+  protected final DataOutputStream stream;
+  protected final DataOutputBuffer buffer = new DataOutputBuffer();
+
+  private UplinkReaderThread uplink;
+
+  private boolean hasTask = false;
+  protected final BSPPeer<K1, V1, K2, V2, BytesWritable> peer;
+
+  /**
+   * The integer codes to represent the different messages. These must match the
+   * C++ codes or massive confusion will result.
+   */
+  protected static enum MessageType {
+    START(0), SET_BSPJOB_CONF(1), SET_INPUT_TYPES(2), RUN_SETUP(3), RUN_BSP(4),
+    RUN_CLEANUP(5), READ_KEYVALUE(6), WRITE_KEYVALUE(7), GET_MSG(8),
+    GET_MSG_COUNT(9), SEND_MSG(10), SYNC(11), GET_ALL_PEERNAME(12),
+    GET_PEERNAME(13), GET_PEER_INDEX(14), GET_PEER_COUNT(15),
+    GET_SUPERSTEP_COUNT(16), REOPEN_INPUT(17), CLEAR(18), CLOSE(19), ABORT(20),
+    DONE(21), TASK_DONE(22), REGISTER_COUNTER(23), INCREMENT_COUNTER(24), LOG(
+        25);
+
+    final int code;
+
+    MessageType(int code) {
+      this.code = code;
+    }
+  }
+
+  protected class UplinkReaderThread extends Thread {
+
+    protected DataInputStream inStream;
+    protected K2 key;
+    protected V2 value;
+    protected BSPPeer<K1, V1, K2, V2, BytesWritable> peer;
+
+    @SuppressWarnings("unchecked")
+    public UplinkReaderThread(BSPPeer<K1, V1, K2, V2, BytesWritable> peer,
+        InputStream stream) throws IOException {
+
+      inStream = new DataInputStream(new BufferedInputStream(stream,
+          BUFFER_SIZE));
+
+      this.peer = peer;
+      this.key = ReflectionUtils.newInstance((Class<? extends K2>) peer
+          .getConfiguration().getClass("bsp.output.key.class", Object.class),
+          peer.getConfiguration());
+
+      this.value = ReflectionUtils.newInstance((Class<? extends V2>) peer
+          .getConfiguration().getClass("bsp.output.value.class", Object.class),
+          peer.getConfiguration());
+    }
+
+    public void closeConnection() throws IOException {
+      inStream.close();
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          if (Thread.currentThread().isInterrupted()) {
+            throw new InterruptedException();
+          }
+
+          int cmd = readCommand();
+          if (cmd == -1)
+            continue;
+          LOG.debug("Handling uplink command " + cmd);
+
+          if (cmd == MessageType.WRITE_KEYVALUE.code) { // INCOMING
+            writeKeyValue();
+          } else if (cmd == MessageType.READ_KEYVALUE.code) { // OUTGOING
+            readKeyValue();
+          } else if (cmd == MessageType.INCREMENT_COUNTER.code) { // INCOMING
+            incrementCounter();
+          } else if (cmd == MessageType.REGISTER_COUNTER.code) { // INCOMING
+            /*
+             * Is not used in HAMA -> Hadoop Pipes - maybe for performance, skip
+             * transferring group and name each INCREMENT
+             */
+          } else if (cmd == MessageType.TASK_DONE.code) { // INCOMING
+            LOG.debug("Got MessageType.TASK_DONE");
+            hasTask = false;
+          } else if (cmd == MessageType.DONE.code) { // INCOMING
+            LOG.debug("Pipe child done");
+            return;
+          } else if (cmd == MessageType.SEND_MSG.code) { // INCOMING
+            sendMessage();
+          } else if (cmd == MessageType.GET_MSG_COUNT.code) { // OUTGOING
+            getMessageCount();
+          } else if (cmd == MessageType.GET_MSG.code) { // OUTGOING
+            getMessage();
+          } else if (cmd == MessageType.SYNC.code) { // INCOMING
+            sync();
+          } else if (cmd == MessageType.GET_ALL_PEERNAME.code) { // OUTGOING
+            getAllPeerNames();
+          } else if (cmd == MessageType.GET_PEERNAME.code) { // OUTGOING
+            getPeerName();
+          } else if (cmd == MessageType.GET_PEER_INDEX.code) { // OUTGOING
+            getPeerIndex();
+          } else if (cmd == MessageType.GET_PEER_COUNT.code) { // OUTGOING
+            getPeerCount();
+          } else if (cmd == MessageType.GET_SUPERSTEP_COUNT.code) { // OUTGOING
+            getSuperstepCount();
+          } else if (cmd == MessageType.REOPEN_INPUT.code) { // INCOMING
+            reopenInput();
+          } else if (cmd == MessageType.CLEAR.code) { // INCOMING
+            LOG.debug("Got MessageType.CLEAR");
+            peer.clear();
+          } else {
+            throw new IOException("Bad command code: " + cmd);
+          }
+        } catch (InterruptedException e) {
+          return;
+        } catch (Throwable e) {
+          onError(e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    protected void onError(Throwable e) {
+      LOG.error(StringUtils.stringifyException(e));
+    }
+
+    public int readCommand() throws IOException {
+      return WritableUtils.readVInt(inStream);
+    }
+
+    public void reopenInput() throws IOException {
+      LOG.debug("Got MessageType.REOPEN_INPUT");
+      peer.reopenInput();
+    }
+
+    public void getSuperstepCount() throws IOException {
+      WritableUtils.writeVInt(stream, MessageType.GET_SUPERSTEP_COUNT.code);
+      WritableUtils.writeVLong(stream, peer.getSuperstepCount());
+      flush();
+      LOG.debug("Responded MessageType.GET_SUPERSTEP_COUNT - SuperstepCount: "
+          + peer.getSuperstepCount());
+    }
+
+    public void getPeerCount() throws IOException {
+      WritableUtils.writeVInt(stream, MessageType.GET_PEER_COUNT.code);
+      WritableUtils.writeVInt(stream, peer.getNumPeers());
+      flush();
+      LOG.debug("Responded MessageType.GET_PEER_COUNT - NumPeers: "
+          + peer.getNumPeers());
+    }
+
+    public void getPeerIndex() throws IOException {
+      WritableUtils.writeVInt(stream, MessageType.GET_PEER_INDEX.code);
+      WritableUtils.writeVInt(stream, peer.getPeerIndex());
+      flush();
+      LOG.debug("Responded MessageType.GET_PEER_INDEX - PeerIndex: "
+          + peer.getPeerIndex());
+    }
+
+    public void getPeerName() throws IOException {
+      int id = readCommand();
+      LOG.debug("Got MessageType.GET_PEERNAME id: " + id);
+
+      WritableUtils.writeVInt(stream, MessageType.GET_PEERNAME.code);
+      if (id == -1) { // -1 indicates get own PeerName
+        Text.writeString(stream, peer.getPeerName());
+        LOG.debug("Responded MessageType.GET_PEERNAME - Get Own PeerName: "
+            + peer.getPeerName());
+
+      } else if ((id < -1) || (id >= peer.getNumPeers())) {
+        // if no PeerName for this index is found write emptyString
+        Text.writeString(stream, "");
+        LOG.debug("Responded MessageType.GET_PEERNAME - Empty PeerName!");
+
+      } else {
+        Text.writeString(stream, peer.getPeerName(id));
+        LOG.debug("Responded MessageType.GET_PEERNAME - PeerName: "
+            + peer.getPeerName(id));
+      }
+      flush();
+    }
+
+    public void getAllPeerNames() throws IOException {
+      LOG.debug("Got MessageType.GET_ALL_PEERNAME");
+      WritableUtils.writeVInt(stream, MessageType.GET_ALL_PEERNAME.code);
+      WritableUtils.writeVInt(stream, peer.getAllPeerNames().length);
+      for (String s : peer.getAllPeerNames())
+        Text.writeString(stream, s);
+
+      flush();
+      LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: "
+          + peer.getAllPeerNames().length);
+    }
+
+    public void sync() throws IOException, SyncException, InterruptedException {
+      LOG.debug("Got MessageType.SYNC");
+      peer.sync(); // this call blocks
+    }
+
+    public void getMessage() throws IOException {
+      LOG.debug("Got MessageType.GET_MSG");
+      WritableUtils.writeVInt(stream, MessageType.GET_MSG.code);
+      BytesWritable msg = peer.getCurrentMessage();
+      if (msg != null)
+        writeObject(msg);
+
+      flush();
+      LOG.debug("Responded MessageType.GET_MSG - Message(BytesWritable) ");// +msg);
+    }
+
+    public void getMessageCount() throws IOException {
+      WritableUtils.writeVInt(stream, MessageType.GET_MSG_COUNT.code);
+      WritableUtils.writeVInt(stream, peer.getNumCurrentMessages());
+      flush();
+      LOG.debug("Responded MessageType.GET_MSG_COUNT - Count: "
+          + peer.getNumCurrentMessages());
+    }
+
+    public void sendMessage() throws IOException {
+      String peerName = Text.readString(inStream);
+      BytesWritable msg = new BytesWritable();
+      readObject(msg);
+      LOG.debug("Got MessageType.SEND_MSG to peerName: " + peerName);
+      peer.send(peerName, msg);
+    }
+
+    public void incrementCounter() throws IOException {
+      // int id = WritableUtils.readVInt(inStream);
+      String group = Text.readString(inStream);
+      String name = Text.readString(inStream);
+      long amount = WritableUtils.readVLong(inStream);
+      peer.incrementCounter(name, group, amount);
+    }
+
+    public void readKeyValue() throws IOException {
+      boolean nullinput = peer.getConfiguration().get("bsp.input.format.class") == null
+          || peer.getConfiguration().get("bsp.input.format.class")
+              .equals("org.apache.hama.bsp.NullInputFormat");
+
+      if (!nullinput) {
+
+        KeyValuePair<K1, V1> pair = peer.readNext();
+
+        WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code);
+        if (pair != null) {
+          writeObject(pair.getKey());
+          writeObject(pair.getValue());
+
+          LOG.debug("Responded MessageType.READ_KEYVALUE - Key: "
+              + pair.getKey() + " Value: " + pair.getValue());
+
+        } else {
+          Text.writeString(stream, "");
+          Text.writeString(stream, "");
+          LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair");
+        }
+        flush();
+
+      } else {
+        /* TODO */
+        /* Send empty Strings to show no KeyValue pair is available */
+        WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code);
+        Text.writeString(stream, "");
+        Text.writeString(stream, "");
+        flush();
+        LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair");
+      }
+    }
+
+    public void writeKeyValue() throws IOException {
+      readObject(key); // string or binary only
+      readObject(value); // string or binary only
+      if (LOG.isDebugEnabled())
+        LOG.debug("Got MessageType.WRITE_KEYVALUE - Key: " + key + " Value: "
+            + value);
+      peer.write(key, value);
+    }
+
+    protected void readObject(Writable obj) throws IOException {
+      int numBytes = readCommand();
+      byte[] buffer;
+      // For BytesWritable and Text, use the specified length to set the length
+      // this causes the "obvious" translations to work. So that if you emit
+      // a string "abc" from C++, it shows up as "abc".
+      if (obj instanceof BytesWritable) {
+        buffer = new byte[numBytes];
+        inStream.readFully(buffer);
+        ((BytesWritable) obj).set(buffer, 0, numBytes);
+      } else if (obj instanceof Text) {
+        buffer = new byte[numBytes];
+        inStream.readFully(buffer);
+        ((Text) obj).set(buffer);
+      } else if (obj instanceof NullWritable) {
+        throw new IOException(
+            "Cannot read data into NullWritable! Check OutputClasses!");
+      } else {
+        /* TODO */
+        /* IntWritable, DoubleWritable */
+        throw new IOException(
+            "Hama Pipes does only support Text as Key/Value output!");
+        // obj.readFields(inStream);
+      }
+    }
+  }
+
+  /**
+   * An output stream that will save a copy of the data into a file.
+   */
+  private static class TeeOutputStream extends FilterOutputStream {
+    private OutputStream file;
+
+    TeeOutputStream(String filename, OutputStream base) throws IOException {
+      super(base);
+      file = new FileOutputStream(filename);
+    }
+
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+      file.write(b, off, len);
+      out.write(b, off, len);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      file.write(b);
+      out.write(b);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      file.flush();
+      out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      flush();
+      file.close();
+      out.close();
+    }
+  }
+
+  /**
+   * Create a proxy object that will speak the binary protocol on a socket.
+   * Upward messages are passed on the specified handler and downward downward
+   * messages are public methods on this object.
+   * 
+   * @param sock The socket to communicate on.
+   * @param handler The handler for the received messages.
+   * @param key The object to read keys into.
+   * @param value The object to read values into.
+   * @param jobConfig The job's configuration
+   * @throws IOException
+   */
+  public BinaryProtocol(BSPPeer<K1, V1, K2, V2, BytesWritable> peer,
+      OutputStream out, InputStream in) throws IOException {
+    this.peer = peer;
+    OutputStream raw = out;
+
+    // If we are debugging, save a copy of the downlink commands to a file
+    if (Submitter.getKeepCommandFile(peer.getConfiguration())) {
+      raw = new TeeOutputStream("downlink.data", raw);
+    }
+    stream = new DataOutputStream(new BufferedOutputStream(raw, BUFFER_SIZE));
+    uplink = getUplinkReader(peer, in);
+
+    uplink.setName("pipe-uplink-handler");
+    uplink.start();
+  }
+
+  public UplinkReaderThread getUplinkReader(
+      BSPPeer<K1, V1, K2, V2, BytesWritable> peer, InputStream sock)
+      throws IOException {
+    return new UplinkReaderThread(peer, sock);
+  }
+
+  @Override
+  public boolean waitForFinish() throws IOException, InterruptedException {
+    // LOG.debug("waitForFinish... "+hasTask);
+    while (hasTask) {
+      try {
+        Thread.sleep(100);
+        // LOG.debug("waitForFinish... "+hasTask);
+      } catch (Exception e) {
+        LOG.error(e);
+      }
+    }
+    return hasTask;
+  }
+
+  /**
+   * Close the connection and shutdown the handler thread.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public void close() throws IOException, InterruptedException {
+    // runCleanup(pipedInput,pipedOutput);
+    LOG.debug("closing connection");
+    endOfInput();
+
+    uplink.interrupt();
+    uplink.join();
+
+    uplink.closeConnection();
+    stream.close();
+  }
+
+  @Override
+  public void start() throws IOException {
+    LOG.debug("starting downlink");
+    WritableUtils.writeVInt(stream, MessageType.START.code);
+    WritableUtils.writeVInt(stream, CURRENT_PROTOCOL_VERSION);
+    flush();
+    LOG.debug("Sent MessageType.START");
+    setBSPJob(peer.getConfiguration());
+  }
+
+  public void setBSPJob(Configuration conf) throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.SET_BSPJOB_CONF.code);
+    List<String> list = new ArrayList<String>();
+    for (Map.Entry<String, String> itm : conf) {
+      list.add(itm.getKey());
+      list.add(itm.getValue());
+    }
+    WritableUtils.writeVInt(stream, list.size());
+    for (String entry : list) {
+      Text.writeString(stream, entry);
+    }
+    flush();
+    LOG.debug("Sent MessageType.SET_BSPJOB_CONF");
+  }
+
+  @Override
+  public void setInputTypes(String keyType, String valueType)
+      throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.SET_INPUT_TYPES.code);
+    Text.writeString(stream, keyType);
+    Text.writeString(stream, valueType);
+    flush();
+    LOG.debug("Sent MessageType.SET_INPUT_TYPES");
+  }
+
+  @Override
+  public void runSetup(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+
+    WritableUtils.writeVInt(stream, MessageType.RUN_SETUP.code);
+    WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
+    WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+    flush();
+    hasTask = true;
+    LOG.debug("Sent MessageType.RUN_SETUP");
+  }
+
+  @Override
+  public void runBsp(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+
+    WritableUtils.writeVInt(stream, MessageType.RUN_BSP.code);
+    WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
+    WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+    flush();
+    hasTask = true;
+    LOG.debug("Sent MessageType.RUN_BSP");
+  }
+
+  @Override
+  public void runCleanup(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+
+    WritableUtils.writeVInt(stream, MessageType.RUN_CLEANUP.code);
+    WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
+    WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+    flush();
+    hasTask = true;
+    LOG.debug("Sent MessageType.RUN_CLEANUP");
+  }
+
+  public void endOfInput() throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
+    flush();
+    LOG.debug("Sent close command");
+    LOG.debug("Sent MessageType.CLOSE");
+  }
+
+  @Override
+  public void abort() throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.ABORT.code);
+    flush();
+    LOG.debug("Sent MessageType.ABORT");
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  /**
+   * Write the given object to the stream. If it is a Text or BytesWritable,
+   * write it directly. Otherwise, write it to a buffer and then write the
+   * length and data to the stream.
+   * 
+   * @param obj the object to write
+   * @throws IOException
+   */
+  protected void writeObject(Writable obj) throws IOException {
+    // For Text and BytesWritable, encode them directly, so that they end up
+    // in C++ as the natural translations.
+    if (obj instanceof Text) {
+      Text t = (Text) obj;
+      int len = t.getLength();
+      WritableUtils.writeVInt(stream, len);
+      stream.write(t.getBytes(), 0, len);
+    } else if (obj instanceof BytesWritable) {
+      BytesWritable b = (BytesWritable) obj;
+      int len = b.getLength();
+      WritableUtils.writeVInt(stream, len);
+      stream.write(b.getBytes(), 0, len);
+    } else {
+      buffer.reset();
+      obj.write(buffer);
+      int length = buffer.getLength();
+      WritableUtils.writeVInt(stream, length);
+      stream.write(buffer.getData(), 0, length);
+    }
+  }
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java?rev=1388325&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java Fri Sep 21 04:58:34 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The abstract description of the downward (from Java to C++) Pipes protocol.
+ * All of these calls are asynchronous and return before the message has been
+ * processed.
+ * 
+ * Adapted from Hadoop Pipes.
+ * 
+ */
+public interface DownwardProtocol<K extends Writable, V extends Writable> {
+
+  /**
+   * Start communication
+   * 
+   * @throws IOException
+   */
+  void start() throws IOException;
+
+  /**
+   * Set the input types for Maps.
+   * 
+   * @param keyType the name of the key's type
+   * @param valueType the name of the value's type
+   * @throws IOException
+   */
+  void setInputTypes(String keyType, String valueType) throws IOException;
+
+  void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException;
+
+  void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException;
+
+  void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException;
+
+  /**
+   * The task should stop as soon as possible, because something has gone wrong.
+   * 
+   * @throws IOException
+   */
+  void abort() throws IOException;
+
+  /**
+   * Flush the data through any buffers.
+   */
+  void flush() throws IOException;
+
+  /**
+   * Close the connection.
+   */
+  void close() throws IOException, InterruptedException;
+
+  boolean waitForFinish() throws IOException, InterruptedException;
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java?rev=1388325&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java Fri Sep 21 04:58:34 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+
+/**
+ * A BSP that can communicate via pipes with other programming languages and
+ * runtimes.
+ */
+public class PipesBSP<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable>
+    extends BSP<K1, V1, K2, V2, BytesWritable> {
+
+  private static final Log LOG = LogFactory.getLog(PipesBSP.class);
+  private Application<K1, V1, K2, V2, BytesWritable> application;
+
+  @Override
+  public void setup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
+      throws IOException, SyncException, InterruptedException {
+
+    this.application = new Application<K1, V1, K2, V2, BytesWritable>(peer);
+    application.getDownlink().runSetup(false, false);
+  }
+
+  @Override
+  public void bsp(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
+      throws IOException, SyncException, InterruptedException {
+
+    application.getDownlink().runBsp(false, false);
+  }
+
+  @Override
+  public void cleanup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
+      throws IOException {
+
+    application.getDownlink().runCleanup(false, false);
+
+    try {
+      application.waitForFinish();
+    } catch (IOException e) {
+      LOG.error(e);
+      throw e;
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      application.cleanup();
+    }
+  }
+
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java?rev=1388325&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java Fri Sep 21 04:58:34 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.InputFormat;
+import org.apache.hama.bsp.InputSplit;
+import org.apache.hama.bsp.RecordReader;
+import org.apache.hama.bsp.TextInputFormat;
+
+/**
+ * Dummy input format used when non-Java a {@link RecordReader} is used by the
+ * Pipes' application.
+ * 
+ * The only useful thing this does is set up the Map-Reduce job to get the
+ * {@link PipesDummyRecordReader}, everything else left for the 'actual'
+ * InputFormat specified by the user which is given by
+ * <i>mapred.pipes.user.inputformat</i>.
+ * 
+ * Adapted from Hadoop Pipes.
+ * 
+ */
+public class PipesNonJavaInputFormat implements
+    InputFormat<FloatWritable, NullWritable> {
+
+  @Override
+  public RecordReader<FloatWritable, NullWritable> getRecordReader(
+      InputSplit genericSplit, BSPJob job) throws IOException {
+    return new PipesDummyRecordReader(job.getConf(), genericSplit);
+  }
+
+  @Override
+  public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
+    // Delegate the generation of input splits to the 'original' InputFormat
+    return ReflectionUtils.newInstance(
+        job.getConf().getClass("hama.pipes.user.inputformat",
+            TextInputFormat.class, InputFormat.class), job.getConf())
+        .getSplits(job, numSplits);
+  }
+
+  /**
+   * A dummy {@link org.apache.hadoop.mapred.RecordReader} to help track the
+   * progress of Hama Pipes' applications when they are using a non-Java
+   * <code>RecordReader</code>.
+   * 
+   * The <code>PipesDummyRecordReader</code> is informed of the 'progress' of
+   * the task by the {@link OutputHandler#progress(float)} which calls the
+   * {@link #next(FloatWritable, NullWritable)} with the progress as the
+   * <code>key</code>.
+   */
+  static class PipesDummyRecordReader implements
+      RecordReader<FloatWritable, NullWritable> {
+    float progress = 0.0f;
+
+    public PipesDummyRecordReader(Configuration job, InputSplit split)
+        throws IOException {
+    }
+
+    @Override
+    public FloatWritable createKey() {
+      return null;
+    }
+
+    @Override
+    public NullWritable createValue() {
+      return null;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+    }
+
+    @Override
+    public synchronized long getPos() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public float getProgress() {
+      return progress;
+    }
+
+    @Override
+    public synchronized boolean next(FloatWritable key, NullWritable value)
+        throws IOException {
+      progress = key.get();
+      return true;
+    }
+  }
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java?rev=1388325&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java Fri Sep 21 04:58:34 2012
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+
+/**
+ * Streaming protocol that inherits from the binary protocol. Basically it
+ * writes everything as text to the peer, each command is separated by newlines.
+ * To distinguish op-codes (like SET_BSPJOB_CONF) from normal output, we use the
+ * surrounds %OP_CODE%=_possible_value.
+ * 
+ * @param <K1> input key.
+ * @param <V1> input value.
+ * @param <K2> output key.
+ * @param <V2> output value.
+ */
+public class StreamingProtocol<K1 extends Writable, V1 extends Writable>
+    extends BinaryProtocol<K1, V1, Text, Text> {
+
+  private static final Pattern PROTOCOL_STRING_PATTERN = Pattern.compile("=");
+
+  private final CyclicBarrier ackBarrier = new CyclicBarrier(2);
+  private volatile boolean brokenBarrier = false;
+
+  public StreamingProtocol(BSPPeer<K1, V1, Text, Text, BytesWritable> peer,
+      OutputStream out, InputStream in) throws IOException {
+    super(peer, out, in);
+  }
+
+  public class StreamingUplinkReaderThread extends UplinkReaderThread {
+
+    private BufferedReader reader;
+
+    public StreamingUplinkReaderThread(
+        BSPPeer<K1, V1, Text, Text, BytesWritable> peer, InputStream stream)
+        throws IOException {
+      super(peer, stream);
+      reader = new BufferedReader(new InputStreamReader(inStream));
+    }
+
+    @Override
+    public void sendMessage() throws IOException {
+      String peerLine = reader.readLine();
+      String msgLine = reader.readLine();
+      peer.send(peerLine, new BytesWritable(msgLine.getBytes()));
+    }
+
+    @Override
+    public void getMessage() throws IOException {
+      BytesWritable currentMessage = peer.getCurrentMessage();
+      if (currentMessage != null)
+        writeLine(new String(currentMessage.getBytes()));
+      else
+        writeLine("%%-1%%");
+    }
+
+    @Override
+    public void getMessageCount() throws IOException {
+      writeLine("" + peer.getNumCurrentMessages());
+    }
+
+    @Override
+    public void getSuperstepCount() throws IOException {
+      writeLine("" + peer.getSuperstepCount());
+    }
+
+    @Override
+    public void getPeerName() throws IOException {
+      int id = Integer.parseInt(reader.readLine());
+      if (id == -1)
+        writeLine(peer.getPeerName());
+      else
+        writeLine(peer.getPeerName(id));
+    }
+
+    @Override
+    public void getPeerIndex() throws IOException {
+      writeLine("" + peer.getPeerIndex());
+    }
+
+    @Override
+    public void getAllPeerNames() throws IOException {
+      writeLine("" + peer.getAllPeerNames().length);
+      for (String s : peer.getAllPeerNames()) {
+        writeLine(s);
+      }
+    }
+
+    @Override
+    public void getPeerCount() throws IOException {
+      writeLine("" + peer.getAllPeerNames().length);
+    }
+
+    @Override
+    public void sync() throws IOException, SyncException, InterruptedException {
+      peer.sync();
+      writeLine(getProtocolString(MessageType.SYNC) + "_SUCCESS");
+    }
+
+    @Override
+    public void writeKeyValue() throws IOException {
+      String key = reader.readLine();
+      String value = reader.readLine();
+      peer.write(new Text(key), new Text(value));
+    }
+
+    @Override
+    public void readKeyValue() throws IOException {
+      KeyValuePair<K1, V1> readNext = peer.readNext();
+      if (readNext == null) {
+        writeLine("%%-1%%");
+        writeLine("%%-1%%");
+      } else {
+        writeLine(readNext.getKey() + "");
+        writeLine(readNext.getValue() + "");
+      }
+    }
+
+    @Override
+    public void reopenInput() throws IOException {
+      peer.reopenInput();
+    }
+
+    @Override
+    public int readCommand() throws IOException {
+      String readLine = reader.readLine();
+      if (readLine != null && !readLine.isEmpty()) {
+        String[] split = PROTOCOL_STRING_PATTERN.split(readLine, 2);
+        split[0] = split[0].replace("%", "");
+        if (checkAcks(split))
+          return -1;
+        try {
+          int parseInt = Integer.parseInt(split[0]);
+          if (parseInt == BinaryProtocol.MessageType.LOG.code) {
+            LOG.info(split[1]);
+            return -1;
+          }
+          return parseInt;
+        } catch (NumberFormatException e) {
+          e.printStackTrace();
+        }
+      } else {
+        return -1;
+      }
+      return -2;
+    }
+
+    @Override
+    protected void onError(Throwable e) {
+      super.onError(e);
+      // break the barrier if we had an error
+      ackBarrier.reset();
+      brokenBarrier = true;
+    }
+
+    private boolean checkAcks(String[] readLine) {
+      if (readLine[0].startsWith("ACK_")) {
+        try {
+          ackBarrier.await();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        } catch (BrokenBarrierException e) {
+          e.printStackTrace();
+        }
+        return true;
+      }
+      return false;
+    }
+
+  }
+
+  @Override
+  public void start() throws IOException {
+    writeLine(MessageType.START, null);
+    writeLine("" + CURRENT_PROTOCOL_VERSION);
+    setBSPJob(peer.getConfiguration());
+    try {
+      ackBarrier.await();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (BrokenBarrierException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void setBSPJob(Configuration conf) throws IOException {
+    writeLine(MessageType.SET_BSPJOB_CONF, null);
+    List<String> list = new ArrayList<String>();
+    for (Map.Entry<String, String> itm : conf) {
+      list.add(itm.getKey());
+      list.add(itm.getValue());
+    }
+    writeLine(list.size());
+    for (String entry : list) {
+      writeLine(entry);
+    }
+    flush();
+  }
+
+  @Override
+  public void runSetup(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+    writeLine(MessageType.RUN_SETUP, null);
+    waitOnAck();
+  }
+
+  @Override
+  public void runBsp(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+    writeLine(MessageType.RUN_BSP, null);
+    waitOnAck();
+  }
+
+  public void waitOnAck() {
+    try {
+      if (!brokenBarrier)
+        ackBarrier.await();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (BrokenBarrierException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void runCleanup(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+    writeLine(MessageType.RUN_CLEANUP, null);
+    waitOnAck();
+  }
+
+  @Override
+  public UplinkReaderThread getUplinkReader(
+      BSPPeer<K1, V1, Text, Text, BytesWritable> peer, InputStream in)
+      throws IOException {
+    return new StreamingUplinkReaderThread(peer, in);
+  }
+
+  public void writeLine(int msg) throws IOException {
+    writeLine("" + msg);
+  }
+
+  public void writeLine(String msg) throws IOException {
+    stream.write((msg + "\n").getBytes());
+    stream.flush();
+  }
+
+  public void writeLine(MessageType type, String msg) throws IOException {
+    stream.write((getProtocolString(type) + (msg == null ? "" : msg) + "\n")
+        .getBytes());
+    stream.flush();
+  }
+
+  public String getProtocolString(MessageType type) {
+    return "%" + type.code + "%=";
+  }
+
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1388325&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Fri Sep 21 04:58:34 2012
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.StringTokenizer;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Parser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.FileInputFormat;
+import org.apache.hama.bsp.FileOutputFormat;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.InputFormat;
+import org.apache.hama.bsp.OutputFormat;
+import org.apache.hama.bsp.Partitioner;
+
+import com.google.common.base.Joiner;
+
+/**
+ * The main entry point and job submitter. It may either be used as a command
+ * line-based or API-based method to launch Pipes jobs.
+ * 
+ * Adapted from Hadoop Pipes.
+ * 
+ */
+public class Submitter implements Tool {
+
+  protected static final Log LOG = LogFactory.getLog(Submitter.class);
+  private HamaConfiguration conf;
+
+  public Submitter() {
+    this.conf = new HamaConfiguration();
+  }
+
+  public Submitter(HamaConfiguration conf) {
+    setConf(conf);
+  }
+
+  @Override
+  public HamaConfiguration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = (HamaConfiguration) conf;
+  }
+
+  /**
+   * Get the URI of the CPU application's executable.
+   * 
+   * @param conf
+   * @return the URI where the application's executable is located
+   */
+  public static String getExecutable(Configuration conf) {
+    return conf.get("hama.pipes.executable");
+  }
+
+  /**
+   * Set the URI for the CPU application's executable. Normally this is a hdfs:
+   * location.
+   * 
+   * @param conf
+   * @param executable The URI of the application's executable.
+   */
+  public static void setExecutable(Configuration conf, String executable) {
+    conf.set("hama.pipes.executable", executable);
+  }
+
+  /**
+   * Set whether the job is using a Java RecordReader.
+   * 
+   * @param conf the configuration to modify
+   * @param value the new value
+   */
+  public static void setIsJavaRecordReader(Configuration conf, boolean value) {
+    conf.setBoolean("hama.pipes.java.recordreader", value);
+  }
+
+  /**
+   * Check whether the job is using a Java RecordReader
+   * 
+   * @param conf the configuration to check
+   * @return is it a Java RecordReader?
+   */
+  public static boolean getIsJavaRecordReader(Configuration conf) {
+    return conf.getBoolean("hama.pipes.java.recordreader", false);
+  }
+
+  /**
+   * Set whether the job will use a Java RecordWriter.
+   * 
+   * @param conf the configuration to modify
+   * @param value the new value to set
+   */
+  public static void setIsJavaRecordWriter(Configuration conf, boolean value) {
+    conf.setBoolean("hama.pipes.java.recordwriter", value);
+  }
+
+  /**
+   * Will the job use a Java RecordWriter?
+   * 
+   * @param conf the configuration to check
+   * @return true, if the output of the job will be written by Java
+   */
+  public static boolean getIsJavaRecordWriter(Configuration conf) {
+    return conf.getBoolean("hama.pipes.java.recordwriter", false);
+  }
+
+  /**
+   * Set the configuration, if it doesn't already have a value for the given
+   * key.
+   * 
+   * @param conf the configuration to modify
+   * @param key the key to set
+   * @param value the new "default" value to set
+   */
+  private static void setIfUnset(Configuration conf, String key, String value) {
+    if (conf.get(key) == null) {
+      conf.set(key, value);
+    }
+  }
+
+  /**
+   * Save away the user's original partitioner before we override it.
+   * 
+   * @param conf the configuration to modify
+   * @param cls the user's partitioner class
+   */
+  static void setJavaPartitioner(Configuration conf, Class<?> cls) {
+    conf.set("hama.pipes.partitioner", cls.getName());
+  }
+
+  /**
+   * Get the user's original partitioner.
+   * 
+   * @param conf the configuration to look in
+   * @return the class that the user submitted
+   */
+  @SuppressWarnings("rawtypes")
+  static Class<? extends Partitioner> getJavaPartitioner(Configuration conf) {
+    return conf.getClass("hama.pipes.partitioner", HashPartitioner.class,
+        Partitioner.class);
+  }
+
+  /**
+   * Does the user want to keep the command file for debugging? If this is true,
+   * pipes will write a copy of the command data to a file in the task directory
+   * named "downlink.data", which may be used to run the C++ program under the
+   * debugger. You probably also want to set
+   * JobConf.setKeepFailedTaskFiles(true) to keep the entire directory from
+   * being deleted. To run using the data file, set the environment variable
+   * "hadoop.pipes.command.file" to point to the file.
+   * 
+   * @param conf the configuration to check
+   * @return will the framework save the command file?
+   */
+  public static boolean getKeepCommandFile(Configuration conf) {
+    return conf.getBoolean("hama.pipes.command-file.keep", false);
+  }
+
+  /**
+   * Set whether to keep the command file for debugging
+   * 
+   * @param conf the configuration to modify
+   * @param keep the new value
+   */
+  public static void setKeepCommandFile(Configuration conf, boolean keep) {
+    conf.setBoolean("hama.pipes.command-file.keep", keep);
+  }
+
+  /**
+   * Submit a job to the cluster. All of the necessary modifications to the job
+   * to run under pipes are made to the configuration.
+   * 
+   * @param conf the job to submit to the cluster (MODIFIED)
+   * @throws IOException
+   */
+  public static void runJob(BSPJob job) throws IOException {
+    setupPipesJob(job);
+    BSPJobClient.runJob(job);
+  }
+
+  private static void setupPipesJob(BSPJob job) throws IOException {
+    job.setBspClass(PipesBSP.class);
+    job.setJarByClass(PipesBSP.class);
+
+    String textClassname = Text.class.getName();
+    setIfUnset(job.getConf(), "bsp.input.key.class", textClassname);
+    setIfUnset(job.getConf(), "bsp.input.value.class", textClassname);
+    setIfUnset(job.getConf(), "bsp.output.key.class", textClassname);
+    setIfUnset(job.getConf(), "bsp.output.value.class", textClassname);
+
+    setIfUnset(job.getConf(), "bsp.job.name", "Hama Pipes Job");
+
+    LOG.debug("isJavaRecordReader: " + getIsJavaRecordReader(job.getConf()));
+    LOG.debug("BspClass: " + job.getBspClass().getName());
+    // conf.setInputFormat(NLineInputFormat.class);
+    LOG.debug("InputFormat: " + job.getInputFormat());
+    LOG.debug("InputKeyClass: " + job.getInputKeyClass().getName());
+    LOG.debug("InputValueClass: " + job.getInputValueClass().getName());
+    LOG.debug("OutputKeyClass: " + job.getOutputKeyClass().getName());
+    LOG.debug("OutputValueClass: " + job.getOutputValueClass().getName());
+
+    if ((!job.getOutputKeyClass().getName().equals(textClassname))
+        || (!job.getOutputValueClass().getName().equals(textClassname)))
+      throw new IllegalArgumentException(
+          "Hama Pipes does only support Text as Key/Value output!");
+
+    LOG.debug("bsp.master.address: " + job.getConf().get("bsp.master.address"));
+    LOG.debug("bsp.local.tasks.maximum: "
+        + job.getConf().get("bsp.local.tasks.maximum"));
+    LOG.debug("NumBspTask: " + job.getNumBspTask());
+    LOG.debug("fs.default.name: " + job.getConf().get("fs.default.name"));
+
+    String exec = getExecutable(job.getConf());
+    if (exec == null) {
+      throw new IllegalArgumentException("No application defined.");
+    }
+
+    URI[] fileCache = DistributedCache.getCacheFiles(job.getConf());
+    if (fileCache == null) {
+      fileCache = new URI[1];
+    } else {
+      URI[] tmp = new URI[fileCache.length + 1];
+      System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
+      fileCache = tmp;
+    }
+
+    try {
+      fileCache[0] = new URI(exec);
+    } catch (URISyntaxException e) {
+      IOException ie = new IOException("Problem parsing execable URI " + exec);
+      ie.initCause(e);
+      throw ie;
+    }
+    DistributedCache.setCacheFiles(fileCache, job.getConf());
+  }
+
+  /**
+   * A command line parser for the CLI-based Pipes job submitter.
+   */
+  static class CommandLineParser {
+    private Options options = new Options();
+
+    void addOption(String longName, boolean required, String description,
+        String paramName) {
+      OptionBuilder.withArgName(paramName);
+      OptionBuilder.hasArgs(1);
+      OptionBuilder.withDescription(description);
+      OptionBuilder.isRequired(required);
+      Option option = OptionBuilder.create(longName);
+      options.addOption(option);
+    }
+
+    void addArgument(String name, boolean required, String description) {
+      OptionBuilder.withArgName(name);
+      OptionBuilder.hasArgs(1);
+      OptionBuilder.withDescription(description);
+      OptionBuilder.isRequired(required);
+      Option option = OptionBuilder.create();
+      options.addOption(option);
+
+    }
+
+    Parser createParser() {
+      Parser result = new BasicParser();
+      return result;
+    }
+
+    void printUsage() {
+      // The CLI package should do this for us, but I can't figure out how
+      // to make it print something reasonable.
+      System.out.println("bin/hama pipes");
+      System.out.println("  [-input <path>] // Input directory");
+      System.out.println("  [-output <path>] // Output directory");
+      System.out.println("  [-jar <jar file> // jar filename");
+      System.out.println("  [-inputformat <class>] // InputFormat class");
+      System.out
+          .println("  [-bspTasks <number>] // Number of bsp tasks to launch");
+      System.out.println("  [-partitioner <class>] // Java Partitioner");
+      System.out.println("  [-combiner <class>] // Java Combiner class");
+      System.out.println("  [-outputformat <class>] // Java RecordWriter");
+      System.out
+          .println("  [-cachefiles <space separated paths>] // Additional cache files like libs, can be globbed with wildcards");
+      System.out.println("  [-program <executable>] // executable URI");
+      System.out
+          .println("  [-programArgs <argument>] // arguments for the program");
+      System.out
+          .println("  [-interpreter <executable>] // interpreter, like python or bash");
+      System.out
+          .println("  [-streaming <true|false>] // if supplied, streaming is used instead of pipes");
+      System.out.println("  [-jobname <name>] // sets the name of this job");
+
+      System.out.println();
+      GenericOptionsParser.printGenericCommandUsage(System.out);
+    }
+  }
+
+  private static <InterfaceType> Class<? extends InterfaceType> getClass(
+      CommandLine cl, String key, HamaConfiguration conf,
+      Class<InterfaceType> cls) throws ClassNotFoundException {
+
+    return conf.getClassByName(cl.getOptionValue(key)).asSubclass(cls);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    CommandLineParser cli = new CommandLineParser();
+    if (args.length == 0) {
+      cli.printUsage();
+      return 1;
+    }
+
+    cli.addOption("input", false, "input path for bsp", "path");
+    cli.addOption("output", false, "output path from bsp", "path");
+
+    cli.addOption("jar", false, "job jar file", "path");
+    cli.addOption("inputformat", false, "java classname of InputFormat",
+        "class");
+    // cli.addArgument("javareader", false, "is the RecordReader in Java");
+
+    cli.addOption("partitioner", false, "java classname of Partitioner",
+        "class");
+    cli.addOption("outputformat", false, "java classname of OutputFormat",
+        "class");
+
+    cli.addOption("cachefiles", false, "additional cache files to add",
+        "space delimited paths");
+
+    cli.addOption("interpreter", false, "interpreter, like python or bash",
+        "executable");
+
+    cli.addOption("jobname", false, "the jobname", "name");
+
+    cli.addOption("programArgs", false, "program arguments", "arguments");
+    cli.addOption("bspTasks", false, "how many bsp tasks to launch", "number");
+    cli.addOption("streaming", false,
+        "if supplied, streaming is used instead of pipes", "");
+
+    cli.addOption(
+        "jobconf",
+        false,
+        "\"n1=v1,n2=v2,..\" (Deprecated) Optional. Add or override a JobConf property.",
+        "key=val");
+
+    cli.addOption("program", false, "URI to application executable", "class");
+    Parser parser = cli.createParser();
+    try {
+
+      // check generic arguments -conf
+      GenericOptionsParser genericParser = new GenericOptionsParser(getConf(),
+          args);
+      // get other arguments
+      CommandLine results = parser.parse(cli.options,
+          genericParser.getRemainingArgs());
+
+      BSPJob job = new BSPJob(getConf());
+
+      if (results.hasOption("input")) {
+        FileInputFormat.setInputPaths(job, results.getOptionValue("input"));
+      }
+      if (results.hasOption("output")) {
+        FileOutputFormat.setOutputPath(job,
+            new Path(results.getOptionValue("output")));
+      }
+      if (results.hasOption("jar")) {
+        job.setJar(results.getOptionValue("jar"));
+      }
+
+      if (results.hasOption("jobname")) {
+        job.setJobName(results.getOptionValue("jobname"));
+      }
+
+      if (results.hasOption("inputformat")) {
+        setIsJavaRecordReader(job.getConf(), true);
+        job.setInputFormat(getClass(results, "inputformat", conf,
+            InputFormat.class));
+      }
+
+      if (results.hasOption("partitioner")) {
+        job.setPartitioner(getClass(results, "partitioner", conf,
+            Partitioner.class));
+      }
+
+      if (results.hasOption("outputformat")) {
+        setIsJavaRecordWriter(job.getConf(), true);
+        job.setOutputFormat(getClass(results, "outputformat", conf,
+            OutputFormat.class));
+      }
+
+      if (results.hasOption("streaming")) {
+        LOG.info("Streaming enabled!");
+        job.set("hama.streaming.enabled", "true");
+      }
+
+      if (results.hasOption("jobconf")) {
+        LOG.warn("-jobconf option is deprecated, please use -D instead.");
+        String options = results.getOptionValue("jobconf");
+        StringTokenizer tokenizer = new StringTokenizer(options, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String keyVal = tokenizer.nextToken().trim();
+          String[] keyValSplit = keyVal.split("=", 2);
+          job.set(keyValSplit[0], keyValSplit[1]);
+        }
+      }
+
+      if (results.hasOption("bspTasks")) {
+        int optionValue = Integer.parseInt(results.getOptionValue("bspTasks"));
+        conf.setInt("bsp.local.tasks.maximum", optionValue);
+        conf.setInt("bsp.peers.num", optionValue);
+      }
+
+      if (results.hasOption("program")) {
+        String executablePath = results.getOptionValue("program");
+        setExecutable(job.getConf(), executablePath);
+        DistributedCache.addCacheFile(new Path(executablePath).toUri(), conf);
+      }
+
+      if (results.hasOption("interpreter")) {
+        job.getConf().set("hama.pipes.executable.interpretor",
+            results.getOptionValue("interpreter"));
+      }
+
+      if (results.hasOption("programArgs")) {
+        job.getConf().set("hama.pipes.executable.args",
+            Joiner.on(" ").join(results.getOptionValues("programArgs")));
+        // job.getConf().set("hama.pipes.resolve.executable.args", "true");
+      }
+
+      if (results.hasOption("cachefiles")) {
+        FileSystem fs = FileSystem.get(getConf());
+        String[] optionValues = results.getOptionValues("cachefiles");
+        for (String s : optionValues) {
+          Path path = new Path(s);
+          FileStatus[] globStatus = fs.globStatus(path);
+          for (FileStatus f : globStatus) {
+            if (!f.isDir()) {
+              DistributedCache.addCacheFile(f.getPath().toUri(), job.getConf());
+            } else {
+              LOG.info("Ignoring directory " + f.getPath() + " while globbing.");
+            }
+          }
+        }
+      }
+
+      // if they gave us a jar file, include it into the class path
+      String jarFile = job.getJar();
+      if (jarFile != null) {
+        @SuppressWarnings("deprecation")
+        final URL[] urls = new URL[] { FileSystem.getLocal(conf)
+            .pathToFile(new Path(jarFile)).toURL() };
+        // FindBugs complains that creating a URLClassLoader should be
+        // in a doPrivileged() block.
+        ClassLoader loader = AccessController
+            .doPrivileged(new PrivilegedAction<ClassLoader>() {
+              @Override
+              public ClassLoader run() {
+                return new URLClassLoader(urls);
+              }
+            });
+        conf.setClassLoader(loader);
+      }
+
+      runJob(job);
+      return 0;
+    } catch (ParseException pe) {
+      LOG.info("Error : " + pe);
+      cli.printUsage();
+      return 1;
+    }
+
+  }
+
+  /**
+   * Submit a pipes job based on the command line arguments.
+   */
+  public static void main(String[] args) throws Exception {
+    int exitCode = new Submitter().run(args);
+    System.exit(exitCode);
+  }
+
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message