Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 68081DBC7 for ; Fri, 21 Sep 2012 04:59:30 +0000 (UTC) Received: (qmail 9613 invoked by uid 500); 21 Sep 2012 04:59:29 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 9580 invoked by uid 500); 21 Sep 2012 04:59:29 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 9562 invoked by uid 99); 21 Sep 2012 04:59:29 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Sep 2012 04:59:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Sep 2012 04:59:20 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8B0FC23888E3 for ; Fri, 21 Sep 2012 04:58:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1388325 - in /hama/trunk: ./ bin/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/pipes/ Date: Fri, 21 Sep 2012 04:58:35 -0000 To: commits@hama.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120921045835.8B0FC23888E3@eris.apache.org> Author: edwardyoon Date: Fri Sep 21 04:58:34 2012 New Revision: 1388325 URL: http://svn.apache.org/viewvc?rev=1388325&view=rev Log: Hama Streaming Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/ hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java (with props) hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java (with props) hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java (with props) hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (with props) hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (with props) hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java (with props) hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (with props) Modified: hama/trunk/CHANGES.txt hama/trunk/bin/hama hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Modified: hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1388325&r1=1388324&r2=1388325&view=diff ============================================================================== --- hama/trunk/CHANGES.txt (original) +++ hama/trunk/CHANGES.txt Fri Sep 21 04:58:34 2012 @@ -3,6 +3,8 @@ Hama Change Log Release 0.6 (unreleased changes) NEW FEATURES + + HAMA-601: Hama Streaming (tjungblut) BUG FIXES Modified: hama/trunk/bin/hama URL: http://svn.apache.org/viewvc/hama/trunk/bin/hama?rev=1388325&r1=1388324&r2=1388325&view=diff ============================================================================== --- hama/trunk/bin/hama (original) +++ hama/trunk/bin/hama Fri Sep 21 04:58:34 2012 @@ -60,6 +60,7 @@ if [ $# = 0 ]; then echo " zookeeper run a Zookeeper server" echo " job manipulate BSP jobs" echo " jar run a jar file" + echo " pipes run a pipe job" echo " or" echo " CLASSNAME run the class named CLASSNAME" echo "Most commands print help when invoked w/o parameters." @@ -160,6 +161,8 @@ elif [ "$COMMAND" = "zookeeper" ] ; then CLASS='org.apache.hama.ZooKeeperRunner' elif [ "$COMMAND" = "job" ] ; then CLASS='org.apache.hama.bsp.BSPJobClient' +elif [ "$COMMAND" = "pipes" ] ; then + CLASS='org.apache.hama.pipes.Submitter' elif [ "$COMMAND" = "jar" ] ; then CLASS=org.apache.hama.util.RunJar BSP_OPTS="$BSP_OPTS" Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1388325&r1=1388324&r2=1388325&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Fri Sep 21 04:58:34 2012 @@ -20,6 +20,7 @@ package org.apache.hama.bsp; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; +import java.util.Arrays; import java.util.Iterator; import java.util.Map.Entry; @@ -276,6 +277,7 @@ public final class BSPPeerImpl 0) { DistributedCache.addLocalFiles(conf, files.toString()); } Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java?rev=1388325&view=auto ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java (added) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java Fri Sep 21 04:58:34 2012 @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** MODIFIED FOR GPGPU Usage! **/ + +package org.apache.hama.pipes; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.bsp.TaskLog; + +/** + * This class is responsible for launching and communicating with the child + * process. + * + * Adapted from Hadoop Pipes. + * + */ +public class Application { + + private static final Log LOG = LogFactory.getLog(Application.class.getName()); + private ServerSocket serverSocket; + private Process process; + private Socket clientSocket; + + private DownwardProtocol downlink; + + static final boolean WINDOWS = System.getProperty("os.name").startsWith( + "Windows"); + + /** + * Start the child process to handle the task for us. + * + * @param peer the current peer including the task's configuration + * @throws InterruptedException + * @throws IOException + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + Application(BSPPeer peer) throws IOException, + InterruptedException { + + Map env = new HashMap(); + boolean streamingEnabled = peer.getConfiguration().getBoolean( + "hama.streaming.enabled", false); + + if (!streamingEnabled) { + serverSocket = new ServerSocket(0); + env.put("hama.pipes.command.port", + Integer.toString(serverSocket.getLocalPort())); + } + // add TMPDIR environment variable with the value of java.io.tmpdir + env.put("TMPDIR", System.getProperty("java.io.tmpdir")); + + /* Set Logging Environment from Configuration */ + env.put("hama.pipes.logging", + peer.getConfiguration().getBoolean("hama.pipes.logging", false) ? "1" + : "0"); + LOG.debug("DEBUG hama.pipes.logging: " + + peer.getConfiguration().getBoolean("hama.pipes.logging", false)); + + List cmd = new ArrayList(); + String interpretor = peer.getConfiguration().get( + "hama.pipes.executable.interpretor"); + if (interpretor != null) { + cmd.add(interpretor); + } + + String executable = null; + try { + LOG.debug("DEBUG LocalCacheFilesCount: " + + DistributedCache.getLocalCacheFiles(peer.getConfiguration()).length); + for (Path u : DistributedCache + .getLocalCacheFiles(peer.getConfiguration())) + LOG.debug("DEBUG LocalCacheFiles: " + u); + + executable = DistributedCache.getLocalCacheFiles(peer.getConfiguration())[0] + .toString(); + + LOG.info("executable: " + executable); + + } catch (Exception e) { + LOG.error("Executable: " + executable + " fs.default.name: " + + peer.getConfiguration().get("fs.default.name")); + + throw new IOException("Executable is missing!"); + } + + if (!new File(executable).canExecute()) { + // LinuxTaskController sets +x permissions on all distcache files already. + // In case of DefaultTaskController, set permissions here. + FileUtil.chmod(executable, "u+x"); + } + cmd.add(executable); + + String additionalArgs = peer.getConfiguration().get( + "hama.pipes.executable.args"); + // if true, we are resolving filenames with the linked paths in + // DistributedCache + boolean resolveArguments = peer.getConfiguration().getBoolean( + "hama.pipes.resolve.executable.args", false); + if (additionalArgs != null && !additionalArgs.isEmpty()) { + String[] split = additionalArgs.split(" "); + for (String s : split) { + if (resolveArguments) { + for (Path u : DistributedCache.getLocalCacheFiles(peer + .getConfiguration())) { + if (u.getName().equals(s)) { + LOG.info("Resolved argument \"" + s + + "\" with fully qualified path \"" + u.toString() + "\"!"); + cmd.add(u.toString()); + break; + } + } + } else { + cmd.add(s); + } + } + } + + // wrap the command in a stdout/stderr capture + TaskAttemptID taskid = peer.getTaskId(); + File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT); + File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR); + // Get the desired maximum length of task's logs. + long logLength = TaskLog.getTaskLogLength(peer.getConfiguration()); + if (!streamingEnabled) { + cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength); + } else { + // use tee in streaming to get the output to file + cmd = TaskLog.captureOutAndErrorTee(null, cmd, stdout, stderr, logLength); + } + + if (!stdout.getParentFile().exists()) { + stdout.getParentFile().mkdirs(); + LOG.info("STDOUT: " + stdout.getParentFile().getAbsolutePath() + + " created!"); + } + LOG.info("STDOUT: " + stdout.getAbsolutePath()); + + if (!stderr.getParentFile().exists()) { + stderr.getParentFile().mkdirs(); + LOG.info("STDERR: " + stderr.getParentFile().getAbsolutePath() + + " created!"); + } + LOG.info("STDERR: " + stderr.getAbsolutePath()); + + LOG.info("DEBUG: cmd: " + cmd); + + process = runClient(cmd, env); // fork c++ binary + + try { + if (streamingEnabled) { + downlink = new StreamingProtocol(peer, process.getOutputStream(), + process.getInputStream()); + } else { + LOG.info("DEBUG: waiting for Client at " + + serverSocket.getLocalSocketAddress()); + serverSocket.setSoTimeout(2000); + clientSocket = serverSocket.accept(); + downlink = new BinaryProtocol(peer, + clientSocket.getOutputStream(), clientSocket.getInputStream()); + } + downlink.start(); + + } catch (SocketException e) { + throw new SocketException( + "Timout: Client pipes application was not connecting!"); + } + } + + /** + * Get the downward protocol object that can send commands down to the + * application. + * + * @return the downlink proxy + */ + DownwardProtocol getDownlink() { + return downlink; + } + + /** + * Wait for the application to finish + * + * @return did the application finish correctly? + * @throws IOException + * @throws Throwable + */ + boolean waitForFinish() throws InterruptedException, IOException { + downlink.flush(); + return downlink.waitForFinish(); + } + + /** + * Abort the application and wait for it to finish. + * + * @param t the exception that signalled the problem + * @throws IOException A wrapper around the exception that was passed in + */ + void abort(Throwable t) throws IOException { + LOG.info("Aborting because of " + StringUtils.stringifyException(t)); + try { + downlink.abort(); + downlink.flush(); + } catch (IOException e) { + // IGNORE cleanup problems + } + try { + downlink.waitForFinish(); + } catch (Throwable ignored) { + process.destroy(); + } + IOException wrapper = new IOException("pipe child exception"); + wrapper.initCause(t); + throw wrapper; + } + + /** + * Clean up the child process and socket if exist. + */ + void cleanup() throws IOException { + if (serverSocket != null) { + serverSocket.close(); + } + try { + if (downlink != null) { + downlink.close(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + /** + * Run a given command in a subprocess, including threads to copy its stdout + * and stderr to our stdout and stderr. + * + * @param command the command and its arguments + * @param env the environment to run the process in + * @return a handle on the process + * @throws IOException + */ + static Process runClient(List command, Map env) + throws IOException { + ProcessBuilder builder = new ProcessBuilder(command); + if (env != null) { + builder.environment().putAll(env); + } + Process result = builder.start(); + return result; + } + +} Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/Application.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java?rev=1388325&view=auto ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java (added) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java Fri Sep 21 04:58:34 2012 @@ -0,0 +1,593 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.pipes; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.util.KeyValuePair; + +/** + * This protocol is a binary implementation of the Hama Pipes protocol. + * + * Adapted from Hadoop Pipes. + * + */ +public class BinaryProtocol + implements DownwardProtocol { + + protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class + .getName()); + public static final int CURRENT_PROTOCOL_VERSION = 0; + /** + * The buffer size for the command socket + */ + private static final int BUFFER_SIZE = 128 * 1024; + + protected final DataOutputStream stream; + protected final DataOutputBuffer buffer = new DataOutputBuffer(); + + private UplinkReaderThread uplink; + + private boolean hasTask = false; + protected final BSPPeer peer; + + /** + * The integer codes to represent the different messages. These must match the + * C++ codes or massive confusion will result. + */ + protected static enum MessageType { + START(0), SET_BSPJOB_CONF(1), SET_INPUT_TYPES(2), RUN_SETUP(3), RUN_BSP(4), + RUN_CLEANUP(5), READ_KEYVALUE(6), WRITE_KEYVALUE(7), GET_MSG(8), + GET_MSG_COUNT(9), SEND_MSG(10), SYNC(11), GET_ALL_PEERNAME(12), + GET_PEERNAME(13), GET_PEER_INDEX(14), GET_PEER_COUNT(15), + GET_SUPERSTEP_COUNT(16), REOPEN_INPUT(17), CLEAR(18), CLOSE(19), ABORT(20), + DONE(21), TASK_DONE(22), REGISTER_COUNTER(23), INCREMENT_COUNTER(24), LOG( + 25); + + final int code; + + MessageType(int code) { + this.code = code; + } + } + + protected class UplinkReaderThread extends Thread { + + protected DataInputStream inStream; + protected K2 key; + protected V2 value; + protected BSPPeer peer; + + @SuppressWarnings("unchecked") + public UplinkReaderThread(BSPPeer peer, + InputStream stream) throws IOException { + + inStream = new DataInputStream(new BufferedInputStream(stream, + BUFFER_SIZE)); + + this.peer = peer; + this.key = ReflectionUtils.newInstance((Class) peer + .getConfiguration().getClass("bsp.output.key.class", Object.class), + peer.getConfiguration()); + + this.value = ReflectionUtils.newInstance((Class) peer + .getConfiguration().getClass("bsp.output.value.class", Object.class), + peer.getConfiguration()); + } + + public void closeConnection() throws IOException { + inStream.close(); + } + + @Override + public void run() { + while (true) { + try { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + int cmd = readCommand(); + if (cmd == -1) + continue; + LOG.debug("Handling uplink command " + cmd); + + if (cmd == MessageType.WRITE_KEYVALUE.code) { // INCOMING + writeKeyValue(); + } else if (cmd == MessageType.READ_KEYVALUE.code) { // OUTGOING + readKeyValue(); + } else if (cmd == MessageType.INCREMENT_COUNTER.code) { // INCOMING + incrementCounter(); + } else if (cmd == MessageType.REGISTER_COUNTER.code) { // INCOMING + /* + * Is not used in HAMA -> Hadoop Pipes - maybe for performance, skip + * transferring group and name each INCREMENT + */ + } else if (cmd == MessageType.TASK_DONE.code) { // INCOMING + LOG.debug("Got MessageType.TASK_DONE"); + hasTask = false; + } else if (cmd == MessageType.DONE.code) { // INCOMING + LOG.debug("Pipe child done"); + return; + } else if (cmd == MessageType.SEND_MSG.code) { // INCOMING + sendMessage(); + } else if (cmd == MessageType.GET_MSG_COUNT.code) { // OUTGOING + getMessageCount(); + } else if (cmd == MessageType.GET_MSG.code) { // OUTGOING + getMessage(); + } else if (cmd == MessageType.SYNC.code) { // INCOMING + sync(); + } else if (cmd == MessageType.GET_ALL_PEERNAME.code) { // OUTGOING + getAllPeerNames(); + } else if (cmd == MessageType.GET_PEERNAME.code) { // OUTGOING + getPeerName(); + } else if (cmd == MessageType.GET_PEER_INDEX.code) { // OUTGOING + getPeerIndex(); + } else if (cmd == MessageType.GET_PEER_COUNT.code) { // OUTGOING + getPeerCount(); + } else if (cmd == MessageType.GET_SUPERSTEP_COUNT.code) { // OUTGOING + getSuperstepCount(); + } else if (cmd == MessageType.REOPEN_INPUT.code) { // INCOMING + reopenInput(); + } else if (cmd == MessageType.CLEAR.code) { // INCOMING + LOG.debug("Got MessageType.CLEAR"); + peer.clear(); + } else { + throw new IOException("Bad command code: " + cmd); + } + } catch (InterruptedException e) { + return; + } catch (Throwable e) { + onError(e); + throw new RuntimeException(e); + } + } + } + + protected void onError(Throwable e) { + LOG.error(StringUtils.stringifyException(e)); + } + + public int readCommand() throws IOException { + return WritableUtils.readVInt(inStream); + } + + public void reopenInput() throws IOException { + LOG.debug("Got MessageType.REOPEN_INPUT"); + peer.reopenInput(); + } + + public void getSuperstepCount() throws IOException { + WritableUtils.writeVInt(stream, MessageType.GET_SUPERSTEP_COUNT.code); + WritableUtils.writeVLong(stream, peer.getSuperstepCount()); + flush(); + LOG.debug("Responded MessageType.GET_SUPERSTEP_COUNT - SuperstepCount: " + + peer.getSuperstepCount()); + } + + public void getPeerCount() throws IOException { + WritableUtils.writeVInt(stream, MessageType.GET_PEER_COUNT.code); + WritableUtils.writeVInt(stream, peer.getNumPeers()); + flush(); + LOG.debug("Responded MessageType.GET_PEER_COUNT - NumPeers: " + + peer.getNumPeers()); + } + + public void getPeerIndex() throws IOException { + WritableUtils.writeVInt(stream, MessageType.GET_PEER_INDEX.code); + WritableUtils.writeVInt(stream, peer.getPeerIndex()); + flush(); + LOG.debug("Responded MessageType.GET_PEER_INDEX - PeerIndex: " + + peer.getPeerIndex()); + } + + public void getPeerName() throws IOException { + int id = readCommand(); + LOG.debug("Got MessageType.GET_PEERNAME id: " + id); + + WritableUtils.writeVInt(stream, MessageType.GET_PEERNAME.code); + if (id == -1) { // -1 indicates get own PeerName + Text.writeString(stream, peer.getPeerName()); + LOG.debug("Responded MessageType.GET_PEERNAME - Get Own PeerName: " + + peer.getPeerName()); + + } else if ((id < -1) || (id >= peer.getNumPeers())) { + // if no PeerName for this index is found write emptyString + Text.writeString(stream, ""); + LOG.debug("Responded MessageType.GET_PEERNAME - Empty PeerName!"); + + } else { + Text.writeString(stream, peer.getPeerName(id)); + LOG.debug("Responded MessageType.GET_PEERNAME - PeerName: " + + peer.getPeerName(id)); + } + flush(); + } + + public void getAllPeerNames() throws IOException { + LOG.debug("Got MessageType.GET_ALL_PEERNAME"); + WritableUtils.writeVInt(stream, MessageType.GET_ALL_PEERNAME.code); + WritableUtils.writeVInt(stream, peer.getAllPeerNames().length); + for (String s : peer.getAllPeerNames()) + Text.writeString(stream, s); + + flush(); + LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: " + + peer.getAllPeerNames().length); + } + + public void sync() throws IOException, SyncException, InterruptedException { + LOG.debug("Got MessageType.SYNC"); + peer.sync(); // this call blocks + } + + public void getMessage() throws IOException { + LOG.debug("Got MessageType.GET_MSG"); + WritableUtils.writeVInt(stream, MessageType.GET_MSG.code); + BytesWritable msg = peer.getCurrentMessage(); + if (msg != null) + writeObject(msg); + + flush(); + LOG.debug("Responded MessageType.GET_MSG - Message(BytesWritable) ");// +msg); + } + + public void getMessageCount() throws IOException { + WritableUtils.writeVInt(stream, MessageType.GET_MSG_COUNT.code); + WritableUtils.writeVInt(stream, peer.getNumCurrentMessages()); + flush(); + LOG.debug("Responded MessageType.GET_MSG_COUNT - Count: " + + peer.getNumCurrentMessages()); + } + + public void sendMessage() throws IOException { + String peerName = Text.readString(inStream); + BytesWritable msg = new BytesWritable(); + readObject(msg); + LOG.debug("Got MessageType.SEND_MSG to peerName: " + peerName); + peer.send(peerName, msg); + } + + public void incrementCounter() throws IOException { + // int id = WritableUtils.readVInt(inStream); + String group = Text.readString(inStream); + String name = Text.readString(inStream); + long amount = WritableUtils.readVLong(inStream); + peer.incrementCounter(name, group, amount); + } + + public void readKeyValue() throws IOException { + boolean nullinput = peer.getConfiguration().get("bsp.input.format.class") == null + || peer.getConfiguration().get("bsp.input.format.class") + .equals("org.apache.hama.bsp.NullInputFormat"); + + if (!nullinput) { + + KeyValuePair pair = peer.readNext(); + + WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code); + if (pair != null) { + writeObject(pair.getKey()); + writeObject(pair.getValue()); + + LOG.debug("Responded MessageType.READ_KEYVALUE - Key: " + + pair.getKey() + " Value: " + pair.getValue()); + + } else { + Text.writeString(stream, ""); + Text.writeString(stream, ""); + LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair"); + } + flush(); + + } else { + /* TODO */ + /* Send empty Strings to show no KeyValue pair is available */ + WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code); + Text.writeString(stream, ""); + Text.writeString(stream, ""); + flush(); + LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair"); + } + } + + public void writeKeyValue() throws IOException { + readObject(key); // string or binary only + readObject(value); // string or binary only + if (LOG.isDebugEnabled()) + LOG.debug("Got MessageType.WRITE_KEYVALUE - Key: " + key + " Value: " + + value); + peer.write(key, value); + } + + protected void readObject(Writable obj) throws IOException { + int numBytes = readCommand(); + byte[] buffer; + // For BytesWritable and Text, use the specified length to set the length + // this causes the "obvious" translations to work. So that if you emit + // a string "abc" from C++, it shows up as "abc". + if (obj instanceof BytesWritable) { + buffer = new byte[numBytes]; + inStream.readFully(buffer); + ((BytesWritable) obj).set(buffer, 0, numBytes); + } else if (obj instanceof Text) { + buffer = new byte[numBytes]; + inStream.readFully(buffer); + ((Text) obj).set(buffer); + } else if (obj instanceof NullWritable) { + throw new IOException( + "Cannot read data into NullWritable! Check OutputClasses!"); + } else { + /* TODO */ + /* IntWritable, DoubleWritable */ + throw new IOException( + "Hama Pipes does only support Text as Key/Value output!"); + // obj.readFields(inStream); + } + } + } + + /** + * An output stream that will save a copy of the data into a file. + */ + private static class TeeOutputStream extends FilterOutputStream { + private OutputStream file; + + TeeOutputStream(String filename, OutputStream base) throws IOException { + super(base); + file = new FileOutputStream(filename); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + file.write(b, off, len); + out.write(b, off, len); + } + + @Override + public void write(int b) throws IOException { + file.write(b); + out.write(b); + } + + @Override + public void flush() throws IOException { + file.flush(); + out.flush(); + } + + @Override + public void close() throws IOException { + flush(); + file.close(); + out.close(); + } + } + + /** + * Create a proxy object that will speak the binary protocol on a socket. + * Upward messages are passed on the specified handler and downward downward + * messages are public methods on this object. + * + * @param sock The socket to communicate on. + * @param handler The handler for the received messages. + * @param key The object to read keys into. + * @param value The object to read values into. + * @param jobConfig The job's configuration + * @throws IOException + */ + public BinaryProtocol(BSPPeer peer, + OutputStream out, InputStream in) throws IOException { + this.peer = peer; + OutputStream raw = out; + + // If we are debugging, save a copy of the downlink commands to a file + if (Submitter.getKeepCommandFile(peer.getConfiguration())) { + raw = new TeeOutputStream("downlink.data", raw); + } + stream = new DataOutputStream(new BufferedOutputStream(raw, BUFFER_SIZE)); + uplink = getUplinkReader(peer, in); + + uplink.setName("pipe-uplink-handler"); + uplink.start(); + } + + public UplinkReaderThread getUplinkReader( + BSPPeer peer, InputStream sock) + throws IOException { + return new UplinkReaderThread(peer, sock); + } + + @Override + public boolean waitForFinish() throws IOException, InterruptedException { + // LOG.debug("waitForFinish... "+hasTask); + while (hasTask) { + try { + Thread.sleep(100); + // LOG.debug("waitForFinish... "+hasTask); + } catch (Exception e) { + LOG.error(e); + } + } + return hasTask; + } + + /** + * Close the connection and shutdown the handler thread. + * + * @throws IOException + * @throws InterruptedException + */ + @Override + public void close() throws IOException, InterruptedException { + // runCleanup(pipedInput,pipedOutput); + LOG.debug("closing connection"); + endOfInput(); + + uplink.interrupt(); + uplink.join(); + + uplink.closeConnection(); + stream.close(); + } + + @Override + public void start() throws IOException { + LOG.debug("starting downlink"); + WritableUtils.writeVInt(stream, MessageType.START.code); + WritableUtils.writeVInt(stream, CURRENT_PROTOCOL_VERSION); + flush(); + LOG.debug("Sent MessageType.START"); + setBSPJob(peer.getConfiguration()); + } + + public void setBSPJob(Configuration conf) throws IOException { + WritableUtils.writeVInt(stream, MessageType.SET_BSPJOB_CONF.code); + List list = new ArrayList(); + for (Map.Entry itm : conf) { + list.add(itm.getKey()); + list.add(itm.getValue()); + } + WritableUtils.writeVInt(stream, list.size()); + for (String entry : list) { + Text.writeString(stream, entry); + } + flush(); + LOG.debug("Sent MessageType.SET_BSPJOB_CONF"); + } + + @Override + public void setInputTypes(String keyType, String valueType) + throws IOException { + WritableUtils.writeVInt(stream, MessageType.SET_INPUT_TYPES.code); + Text.writeString(stream, keyType); + Text.writeString(stream, valueType); + flush(); + LOG.debug("Sent MessageType.SET_INPUT_TYPES"); + } + + @Override + public void runSetup(boolean pipedInput, boolean pipedOutput) + throws IOException { + + WritableUtils.writeVInt(stream, MessageType.RUN_SETUP.code); + WritableUtils.writeVInt(stream, pipedInput ? 1 : 0); + WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0); + flush(); + hasTask = true; + LOG.debug("Sent MessageType.RUN_SETUP"); + } + + @Override + public void runBsp(boolean pipedInput, boolean pipedOutput) + throws IOException { + + WritableUtils.writeVInt(stream, MessageType.RUN_BSP.code); + WritableUtils.writeVInt(stream, pipedInput ? 1 : 0); + WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0); + flush(); + hasTask = true; + LOG.debug("Sent MessageType.RUN_BSP"); + } + + @Override + public void runCleanup(boolean pipedInput, boolean pipedOutput) + throws IOException { + + WritableUtils.writeVInt(stream, MessageType.RUN_CLEANUP.code); + WritableUtils.writeVInt(stream, pipedInput ? 1 : 0); + WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0); + flush(); + hasTask = true; + LOG.debug("Sent MessageType.RUN_CLEANUP"); + } + + public void endOfInput() throws IOException { + WritableUtils.writeVInt(stream, MessageType.CLOSE.code); + flush(); + LOG.debug("Sent close command"); + LOG.debug("Sent MessageType.CLOSE"); + } + + @Override + public void abort() throws IOException { + WritableUtils.writeVInt(stream, MessageType.ABORT.code); + flush(); + LOG.debug("Sent MessageType.ABORT"); + } + + @Override + public void flush() throws IOException { + stream.flush(); + } + + /** + * Write the given object to the stream. If it is a Text or BytesWritable, + * write it directly. Otherwise, write it to a buffer and then write the + * length and data to the stream. + * + * @param obj the object to write + * @throws IOException + */ + protected void writeObject(Writable obj) throws IOException { + // For Text and BytesWritable, encode them directly, so that they end up + // in C++ as the natural translations. + if (obj instanceof Text) { + Text t = (Text) obj; + int len = t.getLength(); + WritableUtils.writeVInt(stream, len); + stream.write(t.getBytes(), 0, len); + } else if (obj instanceof BytesWritable) { + BytesWritable b = (BytesWritable) obj; + int len = b.getLength(); + WritableUtils.writeVInt(stream, len); + stream.write(b.getBytes(), 0, len); + } else { + buffer.reset(); + obj.write(buffer); + int length = buffer.getLength(); + WritableUtils.writeVInt(stream, length); + stream.write(buffer.getData(), 0, length); + } + } +} Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java?rev=1388325&view=auto ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java (added) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java Fri Sep 21 04:58:34 2012 @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.pipes; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * The abstract description of the downward (from Java to C++) Pipes protocol. + * All of these calls are asynchronous and return before the message has been + * processed. + * + * Adapted from Hadoop Pipes. + * + */ +public interface DownwardProtocol { + + /** + * Start communication + * + * @throws IOException + */ + void start() throws IOException; + + /** + * Set the input types for Maps. + * + * @param keyType the name of the key's type + * @param valueType the name of the value's type + * @throws IOException + */ + void setInputTypes(String keyType, String valueType) throws IOException; + + void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException; + + void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException; + + void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException; + + /** + * The task should stop as soon as possible, because something has gone wrong. + * + * @throws IOException + */ + void abort() throws IOException; + + /** + * Flush the data through any buffers. + */ + void flush() throws IOException; + + /** + * Close the connection. + */ + void close() throws IOException, InterruptedException; + + boolean waitForFinish() throws IOException, InterruptedException; +} Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/DownwardProtocol.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java?rev=1388325&view=auto ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (added) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java Fri Sep 21 04:58:34 2012 @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.pipes; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.sync.SyncException; + +/** + * A BSP that can communicate via pipes with other programming languages and + * runtimes. + */ +public class PipesBSP + extends BSP { + + private static final Log LOG = LogFactory.getLog(PipesBSP.class); + private Application application; + + @Override + public void setup(BSPPeer peer) + throws IOException, SyncException, InterruptedException { + + this.application = new Application(peer); + application.getDownlink().runSetup(false, false); + } + + @Override + public void bsp(BSPPeer peer) + throws IOException, SyncException, InterruptedException { + + application.getDownlink().runBsp(false, false); + } + + @Override + public void cleanup(BSPPeer peer) + throws IOException { + + application.getDownlink().runCleanup(false, false); + + try { + application.waitForFinish(); + } catch (IOException e) { + LOG.error(e); + throw e; + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + application.cleanup(); + } + } + +} Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java?rev=1388325&view=auto ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (added) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java Fri Sep 21 04:58:34 2012 @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.pipes; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.InputFormat; +import org.apache.hama.bsp.InputSplit; +import org.apache.hama.bsp.RecordReader; +import org.apache.hama.bsp.TextInputFormat; + +/** + * Dummy input format used when non-Java a {@link RecordReader} is used by the + * Pipes' application. + * + * The only useful thing this does is set up the Map-Reduce job to get the + * {@link PipesDummyRecordReader}, everything else left for the 'actual' + * InputFormat specified by the user which is given by + * mapred.pipes.user.inputformat. + * + * Adapted from Hadoop Pipes. + * + */ +public class PipesNonJavaInputFormat implements + InputFormat { + + @Override + public RecordReader getRecordReader( + InputSplit genericSplit, BSPJob job) throws IOException { + return new PipesDummyRecordReader(job.getConf(), genericSplit); + } + + @Override + public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException { + // Delegate the generation of input splits to the 'original' InputFormat + return ReflectionUtils.newInstance( + job.getConf().getClass("hama.pipes.user.inputformat", + TextInputFormat.class, InputFormat.class), job.getConf()) + .getSplits(job, numSplits); + } + + /** + * A dummy {@link org.apache.hadoop.mapred.RecordReader} to help track the + * progress of Hama Pipes' applications when they are using a non-Java + * RecordReader. + * + * The PipesDummyRecordReader is informed of the 'progress' of + * the task by the {@link OutputHandler#progress(float)} which calls the + * {@link #next(FloatWritable, NullWritable)} with the progress as the + * key. + */ + static class PipesDummyRecordReader implements + RecordReader { + float progress = 0.0f; + + public PipesDummyRecordReader(Configuration job, InputSplit split) + throws IOException { + } + + @Override + public FloatWritable createKey() { + return null; + } + + @Override + public NullWritable createValue() { + return null; + } + + @Override + public synchronized void close() throws IOException { + } + + @Override + public synchronized long getPos() throws IOException { + return 0; + } + + @Override + public float getProgress() { + return progress; + } + + @Override + public synchronized boolean next(FloatWritable key, NullWritable value) + throws IOException { + progress = key.get(); + return true; + } + } +} Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java?rev=1388325&view=auto ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java (added) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java Fri Sep 21 04:58:34 2012 @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.pipes; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.util.KeyValuePair; + +/** + * Streaming protocol that inherits from the binary protocol. Basically it + * writes everything as text to the peer, each command is separated by newlines. + * To distinguish op-codes (like SET_BSPJOB_CONF) from normal output, we use the + * surrounds %OP_CODE%=_possible_value. + * + * @param input key. + * @param input value. + * @param output key. + * @param output value. + */ +public class StreamingProtocol + extends BinaryProtocol { + + private static final Pattern PROTOCOL_STRING_PATTERN = Pattern.compile("="); + + private final CyclicBarrier ackBarrier = new CyclicBarrier(2); + private volatile boolean brokenBarrier = false; + + public StreamingProtocol(BSPPeer peer, + OutputStream out, InputStream in) throws IOException { + super(peer, out, in); + } + + public class StreamingUplinkReaderThread extends UplinkReaderThread { + + private BufferedReader reader; + + public StreamingUplinkReaderThread( + BSPPeer peer, InputStream stream) + throws IOException { + super(peer, stream); + reader = new BufferedReader(new InputStreamReader(inStream)); + } + + @Override + public void sendMessage() throws IOException { + String peerLine = reader.readLine(); + String msgLine = reader.readLine(); + peer.send(peerLine, new BytesWritable(msgLine.getBytes())); + } + + @Override + public void getMessage() throws IOException { + BytesWritable currentMessage = peer.getCurrentMessage(); + if (currentMessage != null) + writeLine(new String(currentMessage.getBytes())); + else + writeLine("%%-1%%"); + } + + @Override + public void getMessageCount() throws IOException { + writeLine("" + peer.getNumCurrentMessages()); + } + + @Override + public void getSuperstepCount() throws IOException { + writeLine("" + peer.getSuperstepCount()); + } + + @Override + public void getPeerName() throws IOException { + int id = Integer.parseInt(reader.readLine()); + if (id == -1) + writeLine(peer.getPeerName()); + else + writeLine(peer.getPeerName(id)); + } + + @Override + public void getPeerIndex() throws IOException { + writeLine("" + peer.getPeerIndex()); + } + + @Override + public void getAllPeerNames() throws IOException { + writeLine("" + peer.getAllPeerNames().length); + for (String s : peer.getAllPeerNames()) { + writeLine(s); + } + } + + @Override + public void getPeerCount() throws IOException { + writeLine("" + peer.getAllPeerNames().length); + } + + @Override + public void sync() throws IOException, SyncException, InterruptedException { + peer.sync(); + writeLine(getProtocolString(MessageType.SYNC) + "_SUCCESS"); + } + + @Override + public void writeKeyValue() throws IOException { + String key = reader.readLine(); + String value = reader.readLine(); + peer.write(new Text(key), new Text(value)); + } + + @Override + public void readKeyValue() throws IOException { + KeyValuePair readNext = peer.readNext(); + if (readNext == null) { + writeLine("%%-1%%"); + writeLine("%%-1%%"); + } else { + writeLine(readNext.getKey() + ""); + writeLine(readNext.getValue() + ""); + } + } + + @Override + public void reopenInput() throws IOException { + peer.reopenInput(); + } + + @Override + public int readCommand() throws IOException { + String readLine = reader.readLine(); + if (readLine != null && !readLine.isEmpty()) { + String[] split = PROTOCOL_STRING_PATTERN.split(readLine, 2); + split[0] = split[0].replace("%", ""); + if (checkAcks(split)) + return -1; + try { + int parseInt = Integer.parseInt(split[0]); + if (parseInt == BinaryProtocol.MessageType.LOG.code) { + LOG.info(split[1]); + return -1; + } + return parseInt; + } catch (NumberFormatException e) { + e.printStackTrace(); + } + } else { + return -1; + } + return -2; + } + + @Override + protected void onError(Throwable e) { + super.onError(e); + // break the barrier if we had an error + ackBarrier.reset(); + brokenBarrier = true; + } + + private boolean checkAcks(String[] readLine) { + if (readLine[0].startsWith("ACK_")) { + try { + ackBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + return true; + } + return false; + } + + } + + @Override + public void start() throws IOException { + writeLine(MessageType.START, null); + writeLine("" + CURRENT_PROTOCOL_VERSION); + setBSPJob(peer.getConfiguration()); + try { + ackBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + + @Override + public void setBSPJob(Configuration conf) throws IOException { + writeLine(MessageType.SET_BSPJOB_CONF, null); + List list = new ArrayList(); + for (Map.Entry itm : conf) { + list.add(itm.getKey()); + list.add(itm.getValue()); + } + writeLine(list.size()); + for (String entry : list) { + writeLine(entry); + } + flush(); + } + + @Override + public void runSetup(boolean pipedInput, boolean pipedOutput) + throws IOException { + writeLine(MessageType.RUN_SETUP, null); + waitOnAck(); + } + + @Override + public void runBsp(boolean pipedInput, boolean pipedOutput) + throws IOException { + writeLine(MessageType.RUN_BSP, null); + waitOnAck(); + } + + public void waitOnAck() { + try { + if (!brokenBarrier) + ackBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + + @Override + public void runCleanup(boolean pipedInput, boolean pipedOutput) + throws IOException { + writeLine(MessageType.RUN_CLEANUP, null); + waitOnAck(); + } + + @Override + public UplinkReaderThread getUplinkReader( + BSPPeer peer, InputStream in) + throws IOException { + return new StreamingUplinkReaderThread(peer, in); + } + + public void writeLine(int msg) throws IOException { + writeLine("" + msg); + } + + public void writeLine(String msg) throws IOException { + stream.write((msg + "\n").getBytes()); + stream.flush(); + } + + public void writeLine(MessageType type, String msg) throws IOException { + stream.write((getProtocolString(type) + (msg == null ? "" : msg) + "\n") + .getBytes()); + stream.flush(); + } + + public String getProtocolString(MessageType type) { + return "%" + type.code + "%="; + } + +} Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/StreamingProtocol.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1388325&view=auto ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (added) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Fri Sep 21 04:58:34 2012 @@ -0,0 +1,522 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.pipes; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.StringTokenizer; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.Parser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.FileInputFormat; +import org.apache.hama.bsp.FileOutputFormat; +import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.InputFormat; +import org.apache.hama.bsp.OutputFormat; +import org.apache.hama.bsp.Partitioner; + +import com.google.common.base.Joiner; + +/** + * The main entry point and job submitter. It may either be used as a command + * line-based or API-based method to launch Pipes jobs. + * + * Adapted from Hadoop Pipes. + * + */ +public class Submitter implements Tool { + + protected static final Log LOG = LogFactory.getLog(Submitter.class); + private HamaConfiguration conf; + + public Submitter() { + this.conf = new HamaConfiguration(); + } + + public Submitter(HamaConfiguration conf) { + setConf(conf); + } + + @Override + public HamaConfiguration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = (HamaConfiguration) conf; + } + + /** + * Get the URI of the CPU application's executable. + * + * @param conf + * @return the URI where the application's executable is located + */ + public static String getExecutable(Configuration conf) { + return conf.get("hama.pipes.executable"); + } + + /** + * Set the URI for the CPU application's executable. Normally this is a hdfs: + * location. + * + * @param conf + * @param executable The URI of the application's executable. + */ + public static void setExecutable(Configuration conf, String executable) { + conf.set("hama.pipes.executable", executable); + } + + /** + * Set whether the job is using a Java RecordReader. + * + * @param conf the configuration to modify + * @param value the new value + */ + public static void setIsJavaRecordReader(Configuration conf, boolean value) { + conf.setBoolean("hama.pipes.java.recordreader", value); + } + + /** + * Check whether the job is using a Java RecordReader + * + * @param conf the configuration to check + * @return is it a Java RecordReader? + */ + public static boolean getIsJavaRecordReader(Configuration conf) { + return conf.getBoolean("hama.pipes.java.recordreader", false); + } + + /** + * Set whether the job will use a Java RecordWriter. + * + * @param conf the configuration to modify + * @param value the new value to set + */ + public static void setIsJavaRecordWriter(Configuration conf, boolean value) { + conf.setBoolean("hama.pipes.java.recordwriter", value); + } + + /** + * Will the job use a Java RecordWriter? + * + * @param conf the configuration to check + * @return true, if the output of the job will be written by Java + */ + public static boolean getIsJavaRecordWriter(Configuration conf) { + return conf.getBoolean("hama.pipes.java.recordwriter", false); + } + + /** + * Set the configuration, if it doesn't already have a value for the given + * key. + * + * @param conf the configuration to modify + * @param key the key to set + * @param value the new "default" value to set + */ + private static void setIfUnset(Configuration conf, String key, String value) { + if (conf.get(key) == null) { + conf.set(key, value); + } + } + + /** + * Save away the user's original partitioner before we override it. + * + * @param conf the configuration to modify + * @param cls the user's partitioner class + */ + static void setJavaPartitioner(Configuration conf, Class cls) { + conf.set("hama.pipes.partitioner", cls.getName()); + } + + /** + * Get the user's original partitioner. + * + * @param conf the configuration to look in + * @return the class that the user submitted + */ + @SuppressWarnings("rawtypes") + static Class getJavaPartitioner(Configuration conf) { + return conf.getClass("hama.pipes.partitioner", HashPartitioner.class, + Partitioner.class); + } + + /** + * Does the user want to keep the command file for debugging? If this is true, + * pipes will write a copy of the command data to a file in the task directory + * named "downlink.data", which may be used to run the C++ program under the + * debugger. You probably also want to set + * JobConf.setKeepFailedTaskFiles(true) to keep the entire directory from + * being deleted. To run using the data file, set the environment variable + * "hadoop.pipes.command.file" to point to the file. + * + * @param conf the configuration to check + * @return will the framework save the command file? + */ + public static boolean getKeepCommandFile(Configuration conf) { + return conf.getBoolean("hama.pipes.command-file.keep", false); + } + + /** + * Set whether to keep the command file for debugging + * + * @param conf the configuration to modify + * @param keep the new value + */ + public static void setKeepCommandFile(Configuration conf, boolean keep) { + conf.setBoolean("hama.pipes.command-file.keep", keep); + } + + /** + * Submit a job to the cluster. All of the necessary modifications to the job + * to run under pipes are made to the configuration. + * + * @param conf the job to submit to the cluster (MODIFIED) + * @throws IOException + */ + public static void runJob(BSPJob job) throws IOException { + setupPipesJob(job); + BSPJobClient.runJob(job); + } + + private static void setupPipesJob(BSPJob job) throws IOException { + job.setBspClass(PipesBSP.class); + job.setJarByClass(PipesBSP.class); + + String textClassname = Text.class.getName(); + setIfUnset(job.getConf(), "bsp.input.key.class", textClassname); + setIfUnset(job.getConf(), "bsp.input.value.class", textClassname); + setIfUnset(job.getConf(), "bsp.output.key.class", textClassname); + setIfUnset(job.getConf(), "bsp.output.value.class", textClassname); + + setIfUnset(job.getConf(), "bsp.job.name", "Hama Pipes Job"); + + LOG.debug("isJavaRecordReader: " + getIsJavaRecordReader(job.getConf())); + LOG.debug("BspClass: " + job.getBspClass().getName()); + // conf.setInputFormat(NLineInputFormat.class); + LOG.debug("InputFormat: " + job.getInputFormat()); + LOG.debug("InputKeyClass: " + job.getInputKeyClass().getName()); + LOG.debug("InputValueClass: " + job.getInputValueClass().getName()); + LOG.debug("OutputKeyClass: " + job.getOutputKeyClass().getName()); + LOG.debug("OutputValueClass: " + job.getOutputValueClass().getName()); + + if ((!job.getOutputKeyClass().getName().equals(textClassname)) + || (!job.getOutputValueClass().getName().equals(textClassname))) + throw new IllegalArgumentException( + "Hama Pipes does only support Text as Key/Value output!"); + + LOG.debug("bsp.master.address: " + job.getConf().get("bsp.master.address")); + LOG.debug("bsp.local.tasks.maximum: " + + job.getConf().get("bsp.local.tasks.maximum")); + LOG.debug("NumBspTask: " + job.getNumBspTask()); + LOG.debug("fs.default.name: " + job.getConf().get("fs.default.name")); + + String exec = getExecutable(job.getConf()); + if (exec == null) { + throw new IllegalArgumentException("No application defined."); + } + + URI[] fileCache = DistributedCache.getCacheFiles(job.getConf()); + if (fileCache == null) { + fileCache = new URI[1]; + } else { + URI[] tmp = new URI[fileCache.length + 1]; + System.arraycopy(fileCache, 0, tmp, 1, fileCache.length); + fileCache = tmp; + } + + try { + fileCache[0] = new URI(exec); + } catch (URISyntaxException e) { + IOException ie = new IOException("Problem parsing execable URI " + exec); + ie.initCause(e); + throw ie; + } + DistributedCache.setCacheFiles(fileCache, job.getConf()); + } + + /** + * A command line parser for the CLI-based Pipes job submitter. + */ + static class CommandLineParser { + private Options options = new Options(); + + void addOption(String longName, boolean required, String description, + String paramName) { + OptionBuilder.withArgName(paramName); + OptionBuilder.hasArgs(1); + OptionBuilder.withDescription(description); + OptionBuilder.isRequired(required); + Option option = OptionBuilder.create(longName); + options.addOption(option); + } + + void addArgument(String name, boolean required, String description) { + OptionBuilder.withArgName(name); + OptionBuilder.hasArgs(1); + OptionBuilder.withDescription(description); + OptionBuilder.isRequired(required); + Option option = OptionBuilder.create(); + options.addOption(option); + + } + + Parser createParser() { + Parser result = new BasicParser(); + return result; + } + + void printUsage() { + // The CLI package should do this for us, but I can't figure out how + // to make it print something reasonable. + System.out.println("bin/hama pipes"); + System.out.println(" [-input ] // Input directory"); + System.out.println(" [-output ] // Output directory"); + System.out.println(" [-jar // jar filename"); + System.out.println(" [-inputformat ] // InputFormat class"); + System.out + .println(" [-bspTasks ] // Number of bsp tasks to launch"); + System.out.println(" [-partitioner ] // Java Partitioner"); + System.out.println(" [-combiner ] // Java Combiner class"); + System.out.println(" [-outputformat ] // Java RecordWriter"); + System.out + .println(" [-cachefiles ] // Additional cache files like libs, can be globbed with wildcards"); + System.out.println(" [-program ] // executable URI"); + System.out + .println(" [-programArgs ] // arguments for the program"); + System.out + .println(" [-interpreter ] // interpreter, like python or bash"); + System.out + .println(" [-streaming ] // if supplied, streaming is used instead of pipes"); + System.out.println(" [-jobname ] // sets the name of this job"); + + System.out.println(); + GenericOptionsParser.printGenericCommandUsage(System.out); + } + } + + private static Class getClass( + CommandLine cl, String key, HamaConfiguration conf, + Class cls) throws ClassNotFoundException { + + return conf.getClassByName(cl.getOptionValue(key)).asSubclass(cls); + } + + @Override + public int run(String[] args) throws Exception { + CommandLineParser cli = new CommandLineParser(); + if (args.length == 0) { + cli.printUsage(); + return 1; + } + + cli.addOption("input", false, "input path for bsp", "path"); + cli.addOption("output", false, "output path from bsp", "path"); + + cli.addOption("jar", false, "job jar file", "path"); + cli.addOption("inputformat", false, "java classname of InputFormat", + "class"); + // cli.addArgument("javareader", false, "is the RecordReader in Java"); + + cli.addOption("partitioner", false, "java classname of Partitioner", + "class"); + cli.addOption("outputformat", false, "java classname of OutputFormat", + "class"); + + cli.addOption("cachefiles", false, "additional cache files to add", + "space delimited paths"); + + cli.addOption("interpreter", false, "interpreter, like python or bash", + "executable"); + + cli.addOption("jobname", false, "the jobname", "name"); + + cli.addOption("programArgs", false, "program arguments", "arguments"); + cli.addOption("bspTasks", false, "how many bsp tasks to launch", "number"); + cli.addOption("streaming", false, + "if supplied, streaming is used instead of pipes", ""); + + cli.addOption( + "jobconf", + false, + "\"n1=v1,n2=v2,..\" (Deprecated) Optional. Add or override a JobConf property.", + "key=val"); + + cli.addOption("program", false, "URI to application executable", "class"); + Parser parser = cli.createParser(); + try { + + // check generic arguments -conf + GenericOptionsParser genericParser = new GenericOptionsParser(getConf(), + args); + // get other arguments + CommandLine results = parser.parse(cli.options, + genericParser.getRemainingArgs()); + + BSPJob job = new BSPJob(getConf()); + + if (results.hasOption("input")) { + FileInputFormat.setInputPaths(job, results.getOptionValue("input")); + } + if (results.hasOption("output")) { + FileOutputFormat.setOutputPath(job, + new Path(results.getOptionValue("output"))); + } + if (results.hasOption("jar")) { + job.setJar(results.getOptionValue("jar")); + } + + if (results.hasOption("jobname")) { + job.setJobName(results.getOptionValue("jobname")); + } + + if (results.hasOption("inputformat")) { + setIsJavaRecordReader(job.getConf(), true); + job.setInputFormat(getClass(results, "inputformat", conf, + InputFormat.class)); + } + + if (results.hasOption("partitioner")) { + job.setPartitioner(getClass(results, "partitioner", conf, + Partitioner.class)); + } + + if (results.hasOption("outputformat")) { + setIsJavaRecordWriter(job.getConf(), true); + job.setOutputFormat(getClass(results, "outputformat", conf, + OutputFormat.class)); + } + + if (results.hasOption("streaming")) { + LOG.info("Streaming enabled!"); + job.set("hama.streaming.enabled", "true"); + } + + if (results.hasOption("jobconf")) { + LOG.warn("-jobconf option is deprecated, please use -D instead."); + String options = results.getOptionValue("jobconf"); + StringTokenizer tokenizer = new StringTokenizer(options, ","); + while (tokenizer.hasMoreTokens()) { + String keyVal = tokenizer.nextToken().trim(); + String[] keyValSplit = keyVal.split("=", 2); + job.set(keyValSplit[0], keyValSplit[1]); + } + } + + if (results.hasOption("bspTasks")) { + int optionValue = Integer.parseInt(results.getOptionValue("bspTasks")); + conf.setInt("bsp.local.tasks.maximum", optionValue); + conf.setInt("bsp.peers.num", optionValue); + } + + if (results.hasOption("program")) { + String executablePath = results.getOptionValue("program"); + setExecutable(job.getConf(), executablePath); + DistributedCache.addCacheFile(new Path(executablePath).toUri(), conf); + } + + if (results.hasOption("interpreter")) { + job.getConf().set("hama.pipes.executable.interpretor", + results.getOptionValue("interpreter")); + } + + if (results.hasOption("programArgs")) { + job.getConf().set("hama.pipes.executable.args", + Joiner.on(" ").join(results.getOptionValues("programArgs"))); + // job.getConf().set("hama.pipes.resolve.executable.args", "true"); + } + + if (results.hasOption("cachefiles")) { + FileSystem fs = FileSystem.get(getConf()); + String[] optionValues = results.getOptionValues("cachefiles"); + for (String s : optionValues) { + Path path = new Path(s); + FileStatus[] globStatus = fs.globStatus(path); + for (FileStatus f : globStatus) { + if (!f.isDir()) { + DistributedCache.addCacheFile(f.getPath().toUri(), job.getConf()); + } else { + LOG.info("Ignoring directory " + f.getPath() + " while globbing."); + } + } + } + } + + // if they gave us a jar file, include it into the class path + String jarFile = job.getJar(); + if (jarFile != null) { + @SuppressWarnings("deprecation") + final URL[] urls = new URL[] { FileSystem.getLocal(conf) + .pathToFile(new Path(jarFile)).toURL() }; + // FindBugs complains that creating a URLClassLoader should be + // in a doPrivileged() block. + ClassLoader loader = AccessController + .doPrivileged(new PrivilegedAction() { + @Override + public ClassLoader run() { + return new URLClassLoader(urls); + } + }); + conf.setClassLoader(loader); + } + + runJob(job); + return 0; + } catch (ParseException pe) { + LOG.info("Error : " + pe); + cli.printUsage(); + return 1; + } + + } + + /** + * Submit a pipes job based on the command line arguments. + */ + public static void main(String[] args) throws Exception { + int exitCode = new Submitter().run(args); + System.exit(exitCode); + } + +} Propchange: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java ------------------------------------------------------------------------------ svn:eol-style = native