Return-Path: Delivered-To: apmail-incubator-pig-commits-archive@locus.apache.org Received: (qmail 55919 invoked from network); 4 Sep 2008 21:26:08 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Sep 2008 21:26:08 -0000 Received: (qmail 56028 invoked by uid 500); 4 Sep 2008 21:26:06 -0000 Delivered-To: apmail-incubator-pig-commits-archive@incubator.apache.org Received: (qmail 55993 invoked by uid 500); 4 Sep 2008 21:26:06 -0000 Mailing-List: contact pig-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@incubator.apache.org Delivered-To: mailing list pig-commits@incubator.apache.org Received: (qmail 55974 invoked by uid 99); 4 Sep 2008 21:26:05 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Sep 2008 14:26:05 -0700 X-ASF-Spam-Status: No, hits=-1999.3 required=10.0 tests=ALL_TRUSTED,FRT_LEVITRA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Sep 2008 21:25:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DDFBD238899E; Thu, 4 Sep 2008 14:25:44 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r692253 [2/3] - in /incubator/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/ex... Date: Thu, 04 Sep 2008 21:25:43 -0000 To: pig-commits@incubator.apache.org From: olga@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080904212544.DDFBD238899E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=692253&r1=692252&r2=692253&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Thu Sep 4 14:25:41 2008 @@ -414,6 +414,26 @@ return elem.exists() || globMatchesFiles(elem, store); } + public static boolean isFile(String filename, PigContext context) + throws IOException { + return !isDirectory(filename, context.getDfs()); + } + + public static boolean isFile(String filename, DataStorage store) + throws IOException { + return !isDirectory(filename, store); + } + + public static boolean isDirectory(String filename, PigContext context) + throws IOException { + return isDirectory(filename, context.getDfs()); + } + + public static boolean isDirectory(String filename, DataStorage store) + throws IOException { + ElementDescriptor elem = store.asElement(filename); + return (elem instanceof ContainerDescriptor); + } private static boolean globMatchesFiles(ElementDescriptor elem, DataStorage fs) Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=692253&r1=692252&r2=692253&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Thu Sep 4 14:25:41 2008 @@ -32,6 +32,7 @@ public class LOLoad extends LogicalOperator { private static final long serialVersionUID = 2L; + protected boolean splittable = true; private FileSpec mInputFileSpec; private LoadFunc mLoadFunc; @@ -51,11 +52,12 @@ * */ public LOLoad(LogicalPlan plan, OperatorKey key, FileSpec inputFileSpec, - URL schemaFile) throws IOException { + URL schemaFile, boolean splittable) throws IOException { super(plan, key); mInputFileSpec = inputFileSpec; mSchemaFile = schemaFile; + this.splittable = splittable; try { mLoadFunc = (LoadFunc) @@ -139,6 +141,10 @@ this.mEnforcedSchema = enforcedSchema; } + public boolean isSplittable() { + return splittable; + } + @Override public byte getType() { return DataType.BAG ; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=692253&r1=692252&r2=692253&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java Thu Sep 4 14:25:41 2008 @@ -173,6 +173,10 @@ protected void visit(LOLimit limOp) throws VisitorException { return; } + + protected void visit(LOStream stream) throws VisitorException { + return; + } /** * Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=692253&r1=692252&r2=692253&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Sep 4 14:25:41 2008 @@ -47,6 +47,8 @@ import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.streaming.StreamingCommand; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; import org.apache.pig.data.TupleFactory; import org.apache.pig.data.Tuple; import org.apache.pig.data.BagFactory; @@ -139,7 +141,7 @@ storePlan.add(store); storePlan.add(input); storePlan.connect(input, store); - attachPlan(storePlan, input, readFrom); + attachPlan(storePlan, input, readFrom, new HashMap()); } catch (ParseException pe) { throw new FrontendException(pe.getMessage()); } @@ -316,6 +318,130 @@ return mapAliasOp.get(alias); } + // Check and set files to be automatically shipped for the given StreamingCommand + // Auto-shipping rules: + // 1. If the command begins with either perl or python assume that the + // binary is the first non-quoted string it encounters that does not + // start with dash - subject to restrictions in (2). + // 2. Otherwise, attempt to ship the first string from the command line as + // long as it does not come from /bin, /user/bin, /user/local/bin. + // It will determine that by scanning the path if an absolute path is + // provided or by executing "which". The paths can be made configurable + // via "set stream.skippath " option. + private static final String PERL = "perl"; + private static final String PYTHON = "python"; + private void checkAutoShipSpecs(StreamingCommand command, String[] argv) + throws ParseException { + // Candidate for auto-ship + String arg0 = argv[0]; + + // Check if command is perl or python ... if so use the first non-option + // and non-quoted string as the candidate + if (arg0.equalsIgnoreCase(PERL) || arg0.equalsIgnoreCase(PYTHON)) { + for (int i=1; i < argv.length; ++i) { + if (!argv[i].startsWith("-") && !isQuotedString(argv[i])) { + checkAndShip(command, argv[i]); + break; + } + } + } else { + // Ship the first argument if it can be ... + checkAndShip(command, arg0); + } + } + + private void checkAndShip(StreamingCommand command, String arg) + throws ParseException { + // Don't auto-ship if it is an absolute path... + if (arg.startsWith("/")) { + return; + } + + // $ which arg + String argPath = which(arg); + if (argPath != null && !inSkipPaths(argPath)) { + try { + command.addPathToShip(argPath); + } catch(IOException e) { + throw new ParseException(e.getMessage()); + } + } + + } + + private boolean isQuotedString(String s) { + return (s.charAt(0) == '\'' && s.charAt(s.length()-1) == '\''); + } + + // Check if file is in the list paths to be skipped + private boolean inSkipPaths(String file) { + for (String skipPath : pigContext.getPathsToSkip()) { + if (file.startsWith(skipPath)) { + return true; + } + } + return false; + } + + + private static String which(String file) { + try { + ProcessBuilder processBuilder = + new ProcessBuilder(new String[] {"which", file}); + Process process = processBuilder.start(); + + BufferedReader stdout = + new BufferedReader(new InputStreamReader(process.getInputStream())); + String fullPath = stdout.readLine(); + + return (process.waitFor() == 0) ? fullPath : null; + } catch (Exception e) {} + return null; + } + + private static final char SINGLE_QUOTE = '\''; + private static final char DOUBLE_QUOTE = '"'; + private static String[] splitArgs(String command) throws ParseException { + List argv = new ArrayList(); + + int beginIndex = 0; + + while (beginIndex < command.length()) { + // Skip spaces + while (Character.isWhitespace(command.charAt(beginIndex))) { + ++beginIndex; + } + + char delim = ' '; + char charAtIndex = command.charAt(beginIndex); + if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) { + delim = charAtIndex; + } + + int endIndex = command.indexOf(delim, beginIndex+1); + if (endIndex == -1) { + if (Character.isWhitespace(delim)) { + // Reached end of command-line + argv.add(command.substring(beginIndex)); + break; + } else { + // Didn't find the ending quote/double-quote + throw new ParseException("Illegal command: " + command); + } + } + + if (Character.isWhitespace(delim)) { + // Do not consume the space + argv.add(command.substring(beginIndex, endIndex)); + } else { + argv.add(command.substring(beginIndex, endIndex+1)); + } + + beginIndex = endIndex + 1; + } + + return argv.toArray(new String[argv.size()]); + } //BEGIN //I am maintaining state about the operators that should @@ -371,8 +497,13 @@ return aliases.get(op); } - public static void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan) throws ParseException { + public static void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan, Map rootProcessed) throws ParseException { log.trace("Entering attachPlan"); + if((rootProcessed.get(root) != null) && (rootProcessed.get(root))) { + log.trace("Root has been processed"); + log.trace("Exiting attachPlan"); + return; + } lp.add(root); log.debug("Added operator " + root + " to the logical plan " + lp); if(null == rootPlan.getPredecessors(root)) { @@ -380,7 +511,8 @@ return; } for(LogicalOperator rootPred: rootPlan.getPredecessors(root)) { - attachPlan(lp, rootPred, rootPlan); + attachPlan(lp, rootPred, rootPlan, rootProcessed); + rootProcessed.put(rootPred, true); try { lp.connect(rootPred, root); log.debug("Connected operator " + rootPred + " to " + root + " in the logical plan " + lp); @@ -558,8 +690,17 @@ TOKEN : { } TOKEN : { } TOKEN : { } +TOKEN : { } +TOKEN : { } TOKEN : { } -TOKEN : { } +TOKEN : { } +TOKEN : { } +TOKEN : { } +TOKEN : { } +TOKEN : { } +TOKEN : { } +TOKEN : { } +TOKEN : { } TOKEN: { @@ -594,6 +735,7 @@ )* "'"> } +TOKEN : { } // Pig has special variables starting with $ TOKEN : { > } @@ -628,6 +770,7 @@ roots.add(op); } + Map rootProcessed = new HashMap(); for(LogicalOperator op: roots) { //At this point we have a logical plan for the pig statement //In order to construct the entire logical plan we need to traverse @@ -638,7 +781,8 @@ LogicalPlan rootPlan = aliases.get(op); if(null != rootPlan) { - attachPlan(lp, op, rootPlan); + attachPlan(lp, op, rootPlan, rootProcessed); + rootProcessed.put(op, true); } } @@ -791,6 +935,25 @@ | ( op = JoinClause(lp)) | ( op = UnionClause(lp)) | ( op = ForEachClause(lp)) +| ( op = StreamClause(lp) + [ + ( + LOOKAHEAD(2) "(" schema = TupleSchema() ")" + { + SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY); + op.setSchema(schema); + op.setCanonicalNames(); + log.info("Stream as schema()"+ schema); + } + | fs = AtomSchema() + { + schema = new Schema(fs); + op.setSchema(schema); + log.info("Stream as atomschema()" + schema); + } + ) + ] + ) | ( op = StoreClause(lp)) ) [ t2= { op.setRequestedParallelism(Integer.parseInt(t2.image));} ] @@ -800,12 +963,14 @@ LogicalOperator LoadClause(LogicalPlan lp) : { - Token t1, t2; + Token t1, t2, t3; String filename; String funcName,funcArgs =null; FuncSpec funcSpec = null; String funcSpecAsString = null; LOLoad lo=null; + String splitBy; + boolean splittable = true; log.trace("Entering LoadClause"); } { @@ -818,13 +983,24 @@ log.debug("LoadClause: funcSpec = " + funcSpec); } )? + ( + t3 = + { + splitBy = unquote(t3.image); + if (splitBy.equalsIgnoreCase("file")) { + splittable = false; + } + } + )? ) { - if (funcSpec == null){ - funcSpec = new FuncSpec(PigStorage.class.getName()); + if (funcSpecAsString == null){ + funcSpecAsString = PigStorage.class.getName(); + funcSpec = new FuncSpec(funcSpecAsString); + log.debug("LoadClause: funcSpec = " + funcSpec); } - lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec), null); + lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec), null, splittable); lp.add(lo); log.debug("Added operator " + lo.getClass().getName() + " to the logical plan"); @@ -1488,11 +1664,13 @@ the list of foreach plans */ + Map rootProcessed = new HashMap(); for(LogicalOperator project: mapProjectInputs.keySet()) { for(LogicalOperator projectInput: mapProjectInputs.get(project)) { generatePlan.add(projectInput); generatePlan.connect(projectInput, project); - attachPlan(generatePlan, projectInput, foreachPlan); + attachPlan(generatePlan, projectInput, foreachPlan, rootProcessed); + rootProcessed.put(projectInput, true); } } } @@ -1517,15 +1695,104 @@ } } -LogicalOperator DefineClause(LogicalPlan lp) : {Token t; Token t1; String functionName, functionArgs;} +LogicalOperator StreamClause(LogicalPlan lp): +{ + LogicalOperator input; + StreamingCommand command; +} +{ + input = NestedExpr(lp) + + command = Command() + { + LOStream loStream = new LOStream(lp, new OperatorKey(scope, getNextId()), input, + pigContext.createExecutableManager(), command); + //addAlias(input.getAlias(), input); + lp.add(loStream); + lp.connect(input, loStream); + return loStream; + } +} + +StreamingCommand Command(): {Token t; StreamingCommand command;} +{ + t = + { + String[] argv = splitArgs(unquote(t.image)); + command = new StreamingCommand(pigContext, argv); + checkAutoShipSpecs(command, argv); + return command; + } + | + t = + { + command = pigContext.getCommandForAlias(t.image); + if (command == null) { + throw new ParseException("Undefined command-alias: " + t.image + + " used as stream operator"); + } + + return command; + } +} + +LogicalOperator DefineClause(LogicalPlan lp) : {Token t; Token cmd; String functionName, functionArgs;} { t = ( + ( + cmd = + { + StreamingCommand command = + new StreamingCommand(pigContext, splitArgs(unquote(cmd.image))); + String[] paths; + StreamingCommand.HandleSpec[] handleSpecs; + } + ( + "(" paths = PathList() ")" + { + if (paths.length == 0) { + command.setShipFiles(false); + } else { + for (String path : paths) { + try { + command.addPathToShip(path); + } catch(IOException e) { + throw new ParseException(e.getMessage()); + } + } + } + } + | + "(" paths = PathList() ")" + { + for (String path : paths) { + try { + command.addPathToCache(path); + } catch(IOException e) { + throw new ParseException(e.getMessage()); + } + } + } + | + "(" InputOutputSpec(command, StreamingCommand.Handle.INPUT) ")" + | + "(" InputOutputSpec(command, StreamingCommand.Handle.OUTPUT) ")" + | + "(" ErrorSpec(command, t.image) ")" + )* + { + pigContext.registerStreamCmd(t.image, command); + } + ) + | + ( functionName = QualifiedFunction() "(" functionArgs = StringList() ")" { pigContext.registerFunction(t.image, new FuncSpec(functionName + "(" + functionArgs + ")")); } ) + ) { // Return the dummy LODefine LogicalOperator lo = new LODefine(lp, new OperatorKey(scope, getNextId())); @@ -1534,6 +1801,83 @@ } } +String[] PathList() : {Token t; List pathList = new ArrayList();} +{ + ( + ( + t = {pathList.add(unquote(t.image));} + ( "," t = {pathList.add(unquote(t.image));} )* + ) + | {} + ) + {return pathList.toArray(new String[pathList.size()]);} +} + +void InputOutputSpec(StreamingCommand command, StreamingCommand.Handle handle): +{ + String stream, deserializer; + StreamingCommand.HandleSpec[] handleSpecs; + String functionName = "PigStorage", functionArgs=""; +} +{ + stream = CommandStream() + [ + functionName = QualifiedFunction() + [ + "(" functionArgs = StringList() ")" + ] + ] + { + deserializer = functionName + "(" + functionArgs + ")"; + command.addHandleSpec(handle, + new HandleSpec(stream, deserializer) + ); + } + ( + "," + stream = CommandStream() + [ + functionName = QualifiedFunction() + [ + "(" functionArgs = StringList() ")" + ] + ] + { + deserializer = functionName + "(" + functionArgs + ")"; + command.addHandleSpec(handle, + new HandleSpec(stream, deserializer) + ); + } + )* +} + +String CommandStream(): {Token t;} +{ + t = + {return "stdin";} + | + t = + {return "stdout";} + | + t = + {return unquote(t.image);} +} + +void ErrorSpec(StreamingCommand command, String alias): {Token t1, t2; int limit = StreamingCommand.MAX_TASKS;} +{ + ( + t1 = + ( t2 = {limit = Integer.parseInt(t2.image);})? + { + command.setLogDir(unquote(t1.image)); + command.setLogFilesLimit(limit); + } + ) + | + { + command.setLogDir(alias); + } +} LogicalOperator StoreClause(LogicalPlan lp) : {LogicalOperator lo; Token t; String fileName; String functionSpec = null; String functionName, functionArgs;} { Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java?rev=692253&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java Thu Sep 4 14:25:41 2008 @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.pig.StoreFunc; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * {@link DefaultInputHandler} handles the input for the Pig-Streaming + * executable in a {@link InputType#SYNCHRONOUS} manner by feeding it input + * via its stdin. + */ +public class DefaultInputHandler extends InputHandler { + + OutputStream stdin; + + public DefaultInputHandler() { + serializer = new PigStorage(); + } + + public DefaultInputHandler(HandleSpec spec) { + serializer = (StoreFunc)PigContext.instantiateFuncFromSpec(spec.spec); + } + + public InputType getInputType() { + return InputType.SYNCHRONOUS; + } + + public void bindTo(OutputStream os) throws IOException { + stdin = os; + super.bindTo(stdin); + } + + @Override + public synchronized void close(Process process) throws IOException { + if(!alreadyClosed) { + alreadyClosed = true; + super.close(process); + try { + stdin.flush(); + stdin.close(); + stdin = null; + } catch(IOException e) { + // check if we got an exception because + // the process actually completed and we were + // trying to flush and close it's stdin + if(process == null || process.exitValue() != 0) { + // the process had not terminated normally + // throw the exception we got + throw e; + } + } + + } + } +} Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java?rev=692253&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java Thu Sep 4 14:25:41 2008 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.IOException; + +import org.apache.pig.LoadFunc; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * {@link FileOutputHandler} handles the output from the Pig-Streaming + * executable in an {@link OutputType#SYNCHRONOUS} manner by reading its output + * via its stdout. + */ +public class DefaultOutputHandler extends OutputHandler { + BufferedPositionedInputStream stdout; + + public DefaultOutputHandler() { + deserializer = new PigStorage(); + } + + public DefaultOutputHandler(HandleSpec spec) { + deserializer = (LoadFunc)PigContext.instantiateFuncFromSpec(spec.spec); + } + + public OutputType getOutputType() { + return OutputType.SYNCHRONOUS; + } + + public void bindTo(String fileName, BufferedPositionedInputStream is, + long offset, long end) throws IOException { + stdout = is; + super.bindTo(fileName, stdout, offset, end); + } + + public synchronized void close() throws IOException { + if(!alreadyClosed) { + super.close(); + stdout.close(); + stdout = null; + alreadyClosed = true; + } + } +} Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=692253&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java Thu Sep 4 14:25:41 2008 @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.streaming.InputHandler.InputType; +import org.apache.pig.impl.streaming.OutputHandler.OutputType; + +/** + * {@link ExecutableManager} manages an external executable which processes data + * in a Pig query. + * + * The ExecutableManager is responsible for startup/teardown of + * the external process and also for managing it. It feeds input records to the + * executable via it's stdin, collects the output records from + * the stdout and also diagnostic information from the + * stdout. + */ +public class ExecutableManager { + private static final Log LOG = LogFactory.getLog(ExecutableManager.class + .getName()); + private static final int SUCCESS = 0; + private static final String PATH = "PATH"; + private static final String BASH = "bash"; + private static final Result EOS_RESULT = new Result(POStatus.STATUS_EOS, null); + + protected StreamingCommand command; // Streaming command to be run + String argvAsString; // Parsed commands + + Process process; // Handle to the process + protected int exitCode = -127; // Exit code of the process + + protected DataOutputStream stdin; // stdin of the process + ProcessInputThread stdinThread; // thread to send input to process + + ProcessOutputThread stdoutThread; // thread to get process stdout + InputStream stdout; // stdout of the process + + ProcessErrorThread stderrThread; // thread to get process stderr + InputStream stderr; // stderr of the process + + // Input/Output handlers + InputHandler inputHandler; + OutputHandler outputHandler; + + // Statistics + protected long inputRecords = 0; + protected long inputBytes = 0; + protected long outputRecords = 0; + protected long outputBytes = 0; + + protected volatile Throwable outerrThreadsError; + private POStream poStream; + private ProcessInputThread fileInputThread; + + /** + * Create a new {@link ExecutableManager}. + */ + public ExecutableManager() { + } + + /** + * Configure and initialize the {@link ExecutableManager}. + * + * @param properties + * {@link Properties} for the ExecutableManager + * @param command + * {@link StreamingCommand} to be run by the + * ExecutableManager + * @param endOfPipe + * {@link DataCollector} to be used to push results of the + * StreamingCommand down + * @throws IOException + * @throws ExecException + */ + public void configure(POStream stream) throws IOException, ExecException { + this.poStream = stream; + this.command = stream.getCommand(); + String[] argv = this.command.getCommandArgs(); + argvAsString = ""; + for (String arg : argv) { + argvAsString += arg; + argvAsString += " "; + } + + // Create the input/output handlers + this.inputHandler = HandlerFactory.createInputHandler(command); + this.outputHandler = HandlerFactory.createOutputHandler(command); + } + + /** + * Close and cleanup the {@link ExecutableManager}. + * @throws IOException + */ + public void close() throws IOException { + // Close the InputHandler, which in some cases lets the process + // terminate + inputHandler.close(process); + + // Check if we need to start the process now ... + if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) { + exec(); + } + + // Wait for the process to exit + try { + exitCode = process.waitFor(); + } catch (InterruptedException ie) { + LOG.error("Unexpected exception while waiting for streaming binary to complete", ie); + killProcess(process); + } + + // Wait for stdout thread to complete + try { + if (stdoutThread != null) { + stdoutThread.join(0); + } + stdoutThread = null; + } catch (InterruptedException ie) { + LOG.error("Unexpected exception while waiting for output thread for streaming binary to complete", ie); + killProcess(process); + } + + // Wait for stderr thread to complete + try { + if (stderrThread != null) { + stderrThread.join(0); + } + stderrThread = null; + } catch (InterruptedException ie) { + LOG.error("Unexpected exception while waiting for input thread for streaming binary to complete", ie); + killProcess(process); + } + + LOG.debug("Process exited with: " + exitCode); + if (exitCode != SUCCESS) { + LOG.error(command + " failed with exit status: " + + exitCode); + } + + if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) { + + // Trigger the outputHandler + outputHandler.bindTo("", null, 0, -1); + + // start thread to process output from executable's stdout + stdoutThread = new ProcessOutputThread(outputHandler, poStream); + stdoutThread.start(); + } + + // Check if there was a problem with the managed process + if (outerrThreadsError != null) { + LOG.error("Output/Error thread failed with: " + + outerrThreadsError); + } + + } + + /** + * Helper function to close input and output streams + * to the process and kill it + * @param process the process to be killed + * @throws IOException + */ + private void killProcess(Process process) throws IOException { + inputHandler.close(process); + outputHandler.close(); + process.destroy(); + } + + /** + * Convert path from Windows convention to Unix convention. Invoked under + * cygwin. + * + * @param path + * path in Windows convention + * @return path in Unix convention, null if fail + */ + private String parseCygPath(String path) { + String[] command = new String[] { "cygpath", "-u", path }; + Process p = null; + try { + p = Runtime.getRuntime().exec(command); + } catch (IOException e) { + return null; + } + int exitVal = 0; + try { + exitVal = p.waitFor(); + } catch (InterruptedException e) { + return null; + } + if (exitVal != 0) + return null; + String line = null; + try { + InputStreamReader isr = new InputStreamReader(p.getInputStream()); + BufferedReader br = new BufferedReader(isr); + line = br.readLine(); + } catch (IOException e) { + return null; + } + return line; + } + + /** + * Set up the run-time environment of the managed process. + * + * @param pb + * {@link ProcessBuilder} used to exec the process + */ + protected void setupEnvironment(ProcessBuilder pb) { + String separator = ":"; + Map env = pb.environment(); + + // Add the current-working-directory to the $PATH + File dir = pb.directory(); + String cwd = (dir != null) ? dir.getAbsolutePath() : System + .getProperty("user.dir"); + + if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) { + String unixCwd = parseCygPath(cwd); + if (unixCwd == null) + throw new RuntimeException( + "Can not convert Windows path to Unix path under cygwin"); + cwd = unixCwd; + } + + String envPath = env.get(PATH); + if (envPath == null) { + envPath = cwd; + } else { + envPath = envPath + separator + cwd; + } + env.put(PATH, envPath); + } + + /** + * Start execution of the external process. + * + * This takes care of setting up the environment of the process and also + * starts {@link ProcessErrorThread} to process the stderr of + * the managed process. + * + * @throws IOException + */ + protected void exec() throws IOException { + // Set the actual command to run with 'bash -c exec ...' + List cmdArgs = new ArrayList(); + cmdArgs.add(BASH); + cmdArgs.add("-c"); + StringBuffer sb = new StringBuffer(); + sb.append("exec "); + sb.append(argvAsString); + cmdArgs.add(sb.toString()); + + // Start the external process + ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs + .toArray(new String[cmdArgs.size()])); + setupEnvironment(processBuilder); + process = processBuilder.start(); + LOG.debug("Started the process for command: " + command); + + // Pick up the process' stderr stream and start the thread to + // process the stderr stream + stderr = new DataInputStream(new BufferedInputStream(process + .getErrorStream())); + stderrThread = new ProcessErrorThread(); + stderrThread.start(); + + // Check if we need to handle the process' stdout directly + if (outputHandler.getOutputType() == OutputType.SYNCHRONOUS) { + // Get hold of the stdout of the process + stdout = new DataInputStream(new BufferedInputStream(process + .getInputStream())); + + // Bind the stdout to the OutputHandler + outputHandler.bindTo("", new BufferedPositionedInputStream(stdout), + 0, Long.MAX_VALUE); + + // start thread to process output from executable's stdout + stdoutThread = new ProcessOutputThread(outputHandler, poStream); + stdoutThread.start(); + } + } + + /** + * Start execution of the {@link ExecutableManager}. + * + * @throws IOException + */ + public void run() throws IOException { + // Check if we need to exec the process NOW ... + if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) { + // start the thread to handle input + fileInputThread = new ProcessInputThread(inputHandler, poStream); + fileInputThread.start(); + + // If Input type is ASYNCHRONOUS that means input to the + // streaming binary is from a file - that means we cannot exec + // the process till the input file is completely written. This + // will be done in close() - so now we return + return; + } + + // Start the executable ... + exec(); + // set up input to the executable + stdin = new DataOutputStream(new BufferedOutputStream(process + .getOutputStream())); + inputHandler.bindTo(stdin); + + // Start the thread to send input to the executable's stdin + stdinThread = new ProcessInputThread(inputHandler, poStream); + stdinThread.start(); + } + + /** + * The thread which consumes input from POStream's binaryInput queue + * and feeds it to the the Process + */ + class ProcessInputThread extends Thread { + + InputHandler inputHandler; + private POStream poStream; + private BlockingQueue binaryInputQueue; + + ProcessInputThread(InputHandler inputHandler, POStream poStream) { + setDaemon(true); + this.inputHandler = inputHandler; + this.poStream = poStream; + // the input queue from where this thread will read + // input tuples + this.binaryInputQueue = poStream.getBinaryInputQueue(); + } + + public void run() { + try { + // Read tuples from the previous operator in the pipeline + // and pass it to the executable + while (true) { + Result inp = null; + inp = binaryInputQueue.take(); + synchronized (poStream) { + // notify waiting producer + // the if check is to keep "findbugs" + // happy + if(inp != null) + poStream.notifyAll(); + } + // We should receive an EOP only when *ALL* input + // for this process has already been sent and no + // more input is expected + if (inp.returnStatus == POStatus.STATUS_EOP) { + // signal cleanup in ExecutableManager + close(); + return; + } + if (inp.returnStatus == POStatus.STATUS_OK) { + // Check if there was a problem with the managed process + if (outerrThreadsError != null) { + throw new IOException( + "Output/Error thread failed with: " + + outerrThreadsError); + } + + // Pass the serialized tuple to the executable via the + // InputHandler + Tuple t = null; + try { + t = (Tuple) inp.result; + inputHandler.putNext(t); + } catch (IOException e) { + // if input type is synchronous then it could + // be related to the process terminating + if(inputHandler.getInputType() == InputType.SYNCHRONOUS) { + LOG.warn("Exception while trying to write to stream binary's input", e); + // could be because the process + // died OR closed the input stream + // we will only call close() here and not + // worry about deducing whether the process died + // normally or abnormally - if there was any real + // issue the ProcessOutputThread should see + // a non zero exit code from the process and send + // a POStatus.STATUS_ERR back - what if we got + // an IOException because there was only an issue with + // writing to input of the binary - hmm..hope that means + // the process died abnormally!! + close(); + return; + } else { + // asynchronous case - then this is a real exception + LOG.error("Exception while trying to write to stream binary's input", e); + // send POStatus.STATUS_ERR to POStream to signal the error + // Generally the ProcessOutputThread would do this but now + // we should do it here since neither the process nor the + // ProcessOutputThread will ever be spawned + Result res = new Result(POStatus.STATUS_ERR, + "Exception while trying to write to stream binary's input" + e.getMessage()); + sendOutput(poStream.getBinaryOutputQueue(), res); + throw e; + } + } + inputBytes += t.getMemorySize(); + inputRecords++; + } + } + } catch (Throwable t) { + + + // Note that an error occurred + outerrThreadsError = t; + LOG.error(t); + try { + killProcess(process); + } catch (IOException ioe) { + LOG.warn(ioe); + } + } + } + } + + private void sendOutput(BlockingQueue binaryOutputQueue, Result res) { + try { + binaryOutputQueue.put(res); + } catch (InterruptedException e) { + LOG.error("Error while sending binary output to POStream", e); + } + synchronized (poStream) { + // notify waiting consumer + // the if is to satisfy "findbugs" + if(res != null) { + poStream.notifyAll(); + } + } + } + + /** + * The thread which gets output from the streaming binary and puts it onto + * the binary output Queue of POStream + */ + class ProcessOutputThread extends Thread { + + OutputHandler outputHandler; + private BlockingQueue binaryOutputQueue; + + ProcessOutputThread(OutputHandler outputHandler, POStream poStream) { + setDaemon(true); + this.outputHandler = outputHandler; + // the output queue where this thread will put + // output tuples for POStream + this.binaryOutputQueue = poStream.getBinaryOutputQueue(); + } + + public void run() { + try { + // Read tuples from the executable and send it to + // Queue of POStream + Tuple tuple = null; + while ((tuple = outputHandler.getNext()) != null) { + processOutput(tuple); + outputBytes += tuple.getMemorySize(); + } + // output from binary is done + processOutput(null); + outputHandler.close(); + } catch (Throwable t) { + // Note that an error occurred + outerrThreadsError = t; + LOG.error("Caught Exception in OutputHandler of Streaming binary, " + + "sending error signal to pipeline", t); + // send ERROR to POStream + try { + Result res = new Result(); + res.result = "Error reading output from Streaming binary:" + + "'" + argvAsString + "':" + t.getMessage(); + res.returnStatus = POStatus.STATUS_ERR; + sendOutput(binaryOutputQueue, res); + killProcess(process); + } catch (Exception e) { + LOG.error("Error while trying to signal Error status to pipeline", e); + } + } + } + + void processOutput(Tuple t) { + Result res = new Result(); + + if (t != null) { + // we have a valid tuple to pass back + res.result = t; + res.returnStatus = POStatus.STATUS_OK; + outputRecords++; + } else { + // t == null means end of output from + // binary - wait for the process to exit + // and harvest exit code + try { + exitCode = process.waitFor(); + } catch (InterruptedException ie) { + try { + killProcess(process); + } catch (IOException e) { + LOG.warn("Exception trying to kill process while processing null output " + + "from binary", e); + + } + // signal error + String errMsg = "Failure while waiting for process (" + argvAsString + ")" + + ie.getMessage(); + LOG.error(errMsg, ie); + res.result = errMsg; + res.returnStatus = POStatus.STATUS_ERR; + sendOutput(binaryOutputQueue, res); + return; + } + if(exitCode == 0) { + // signal EOS (End Of Stream output) + res = EOS_RESULT; + } else { + // signal Error + + String errMsg = "'" + argvAsString + "'" + " failed with exit status: " + + exitCode; + LOG.error(errMsg); + res.result = errMsg; + res.returnStatus = POStatus.STATUS_ERR; + } + } + sendOutput(binaryOutputQueue, res); + + } + } + + + + /** + * Workhorse to process the stderr stream of the managed process. + * + * By default ExecuatbleManager just sends out the received + * error message to the stderr of itself. + * + * @param error + * error message from the managed process. + */ + protected void processError(String error) { + // Just send it out to our stderr + System.err.print(error); + } + + class ProcessErrorThread extends Thread { + + public ProcessErrorThread() { + setDaemon(true); + } + + public void run() { + try { + String error; + BufferedReader reader = new BufferedReader( + new InputStreamReader(stderr)); + while ((error = reader.readLine()) != null) { + processError(error + "\n"); + } + + if (stderr != null) { + stderr.close(); + LOG.debug("ProcessErrorThread done"); + } + } catch (Throwable t) { + // Note that an error occurred + outerrThreadsError = t; + + LOG.error(t); + try { + if (stderr != null) { + stderr.close(); + } + } catch (IOException ioe) { + LOG.warn(ioe); + } + throw new RuntimeException(t); + } + } + } + +} Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java?rev=692253&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java Thu Sep 4 14:25:41 2008 @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.pig.StoreFunc; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * {@link FileInputHandler} handles the input for the Pig-Streaming + * executable in an {@link InputType#ASYNCHRONOUS} manner by feeding it input + * via an external file specified by the user. + */ +public class FileInputHandler extends InputHandler { + + String fileName; + OutputStream fileOutStream; + + public FileInputHandler(HandleSpec handleSpec) throws ExecException { + fileName = handleSpec.name; + serializer = + (StoreFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec); + + try { + fileOutStream = new FileOutputStream(new File(fileName)); + super.bindTo(fileOutStream); + } catch (IOException fnfe) { + throw new ExecException(fnfe); + } + } + + public InputType getInputType() { + return InputType.ASYNCHRONOUS; + } + + public void bindTo(OutputStream os) throws IOException { + throw new UnsupportedOperationException("Cannot call bindTo on " + + "FileInputHandler"); + } + + public synchronized void close(Process process) throws IOException { + if(!alreadyClosed) { + super.close(process); + fileOutStream.flush(); + fileOutStream.close(); + fileOutStream = null; + alreadyClosed = true; + } + } + +} Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java?rev=692253&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java Thu Sep 4 14:25:41 2008 @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * {@link FileOutputHandler} handles the output from the Pig-Streaming + * executable in an {@link OutputType#ASYNCHRONOUS} manner by reading it from + * an external file specified by the user. + */ +public class FileOutputHandler extends OutputHandler { + + String fileName; + BufferedPositionedInputStream fileInStream; + + public FileOutputHandler(HandleSpec handleSpec) throws ExecException { + fileName = handleSpec.name; + deserializer = + (LoadFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec); + } + + public OutputType getOutputType() { + return OutputType.ASYNCHRONOUS; + } + + public void bindTo(String fileName, BufferedPositionedInputStream is, + long offset, long end) throws IOException { + // This is a trigger to start processing the output from the file ... + // ... however, we must ignore the input parameters and use ones + // provided during initialization + File file = new File(this.fileName); + this.fileInStream = + new BufferedPositionedInputStream(new FileInputStream(file)); + super.bindTo(this.fileName, this.fileInStream, 0, file.length()); + } + + public synchronized void close() throws IOException { + if(!alreadyClosed) { + super.close(); + fileInStream.close(); + fileInStream = null; + alreadyClosed = true; + } + } + +} Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java?rev=692253&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java Thu Sep 4 14:25:41 2008 @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.impl.streaming.StreamingCommand.Handle; +import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; + +/** + * Factory to create an {@link InputHandler} or {@link OutputHandler} + * depending on the specification of the {@link StreamingCommand}. + */ +public class HandlerFactory { + + /** + * Create an InputHandler for the given input specification + * of the StreamingCommand. + * + * @param command StreamingCommand + * @return InputHandler for the given input specification + * @throws ExecException + */ + public static InputHandler createInputHandler(StreamingCommand command) + throws ExecException { + List inputSpecs = command.getHandleSpecs(Handle.INPUT); + + HandleSpec in = null; + if (inputSpecs == null || (in = inputSpecs.get(0)) == null) { + return new DefaultInputHandler(); + } + + return (in.name.equals("stdin")) ? new DefaultInputHandler(in) : + new FileInputHandler(in); + } + + /** + * Create an OutputHandler for the given output specification + * of the StreamingCommand. + * + * @param command StreamingCommand + * @return OutputHandler for the given output specification + * @throws ExecException + */ + public static OutputHandler createOutputHandler(StreamingCommand command) + throws ExecException { + List outputSpecs = command.getHandleSpecs(Handle.OUTPUT); + + HandleSpec out = null; + if (outputSpecs == null || (out = outputSpecs.get(0)) == null) { + return new DefaultOutputHandler(); + } + + return (out.name.equals("stdout")) ? new DefaultOutputHandler(out) : + new FileOutputHandler(out); + } +} Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java?rev=692253&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java Thu Sep 4 14:25:41 2008 @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.pig.StoreFunc; +import org.apache.pig.data.Tuple; + +/** + * {@link InputHandler} is responsible for handling the input to the + * Pig-Streaming external command. + * + * The managed executable could be fed input in a {@link InputType#SYNCHRONOUS} + * manner via its stdin or in an {@link InputType#ASYNCHRONOUS} + * manner via an external file which is subsequently read by the executable. + */ +public abstract class InputHandler { + /** + * + */ + public enum InputType {SYNCHRONOUS, ASYNCHRONOUS} + /* + * The serializer to be used to send data to the managed process. + * + * It is the responsibility of the concrete sub-classes to setup and + * manage the serializer. + */ + protected StoreFunc serializer; + + // flag to mark if close() has already been called + protected boolean alreadyClosed = false; + + /** + * Get the handled InputType + * @return the handled InputType + */ + public abstract InputType getInputType(); + + /** + * Send the given input Tuple to the managed executable. + * + * @param t input Tuple + * @throws IOException + */ + public void putNext(Tuple t) throws IOException { + serializer.putNext(t); + } + + /** + * Close the InputHandler since there is no more input + * to be sent to the managed process. + * @param process the managed process - this could be null in some cases + * like when input is through files. In that case, the process would not + * have been exec'ed yet - if this method if overridden it is the responsibility + * of the implementer to check that the process is usable. The managed process + * object is supplied by the ExecutableManager to this call so that this method + * can check if the process is alive if it needs to know. + * + * @throws IOException + */ + public synchronized void close(Process process) throws IOException { + if(!alreadyClosed) { + serializer.finish(); + alreadyClosed = true; + } + } + + /** + * Bind the InputHandler to the OutputStream + * from which it reads input and sends it to the managed process. + * + * @param os OutputStream from which to read input data for the + * managed process + * @throws IOException + */ + public void bindTo(OutputStream os) throws IOException { + serializer.bindTo(os); + } +} Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=692253&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java Thu Sep 4 14:25:41 2008 @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.streaming; + +import java.io.IOException; + +import org.apache.pig.LoadFunc; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.BufferedPositionedInputStream; + +/** + * {@link OutputHandler} is responsible for handling the output of the + * Pig-Streaming external command. + * + * The output of the managed executable could be fetched in a + * {@link OutputType#SYNCHRONOUS} manner via its stdout or in an + * {@link OutputType#ASYNCHRONOUS} manner via an external file to which the + * process wrote its output. + */ +public abstract class OutputHandler { + public enum OutputType {SYNCHRONOUS, ASYNCHRONOUS} + + /* + * The deserializer to be used to send data to the managed process. + * + * It is the responsibility of the concrete sub-classes to setup and + * manage the deserializer. + */ + protected LoadFunc deserializer; + + /** + * Get the handled OutputType. + * @return the handled OutputType + */ + public abstract OutputType getOutputType(); + + // flag to mark if close() has already been called + protected boolean alreadyClosed = false; + + /** + * Bind the OutputHandler to the InputStream + * from which to read the output data of the managed process. + * + * @param is InputStream from which to read the output data + * of the managed process + * @throws IOException + */ + public void bindTo(String fileName, BufferedPositionedInputStream is, + long offset, long end) throws IOException { + deserializer.bindTo(fileName, new BufferedPositionedInputStream(is), + offset, end); + } + + /** + * Get the next output Tuple of the managed process. + * + * @return the next output Tuple of the managed process + * @throws IOException + */ + public Tuple getNext() throws IOException { + return deserializer.getNext(); + } + + /** + * Close the OutputHandler. + * @throws IOException + */ + public synchronized void close() throws IOException {} +} Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=692253&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java Thu Sep 4 14:25:41 2008 @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.impl.streaming; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; + + +/** + * {@link StreamingCommand} represents the specification of an external + * command to be executed in a Pig Query. + * + * StreamingCommand encapsulates all relevant details of the + * command specified by the user either directly via the STREAM + * operator or indirectly via a DEFINE operator. It includes + * details such as input/output/error specifications and also files to be + * shipped to the cluster and files to be cached. + */ +public class StreamingCommand implements Serializable, Cloneable { + private static final long serialVersionUID = 1L; + + // External command to be executed and it's parsed components + String executable; + String[] argv; + + // Files to be shipped to the cluster in-order to be executed + List shipSpec = new LinkedList(); + + // Files to be shipped to the cluster in-order to be executed + List cacheSpec = new LinkedList(); + + /** + * Handle to communicate with the external process. + */ + public enum Handle {INPUT, OUTPUT} + + /** + * Map from the the stdin/stdout/stderr handles to their specifications + */ + Map> handleSpecs = + new TreeMap>(); + + // Should the stderr of the process be persisted? + boolean persistStderr = false; + + // Directory where the process's stderr logs should be persisted. + String logDir; + + // Limit on the number of persisted log-files + int logFilesLimit = 100; + public static final int MAX_TASKS = 100; + + boolean shipFiles = true; + + private PigContext pigContext; + + /** + * Create a new StreamingCommand with the given command. + * + * @param command streaming command to be executed + * @param argv parsed arguments of the command + */ + public StreamingCommand(PigContext pigContext, String[] argv) { + this.pigContext = pigContext; + this.argv = argv; + + // Assume that argv[0] is the executable + this.executable = this.argv[0]; + } + + /** + * Get the command to be executed. + * + * @return the command to be executed + */ + public String getExecutable() { + return executable; + } + + /** + * Set the executable for the StreamingCommand. + * + * @param executable the executable for the StreamingCommand + */ + public void setExecutable(String executable) { + this.executable = executable; + } + + /** + * Set the command line arguments for the StreamingCommand. + * + * @param argv the command line arguments for the + * StreamingCommand + */ + public void setCommandArgs(String[] argv) { + this.argv = argv; + } + + /** + * Get the parsed command arguments. + * + * @return the parsed command arguments as String[] + */ + public String[] getCommandArgs() { + return argv; + } + + /** + * Get the list of files which need to be shipped to the cluster. + * + * @return the list of files which need to be shipped to the cluster + */ + public List getShipSpecs() { + return shipSpec; + } + + /** + * Get the list of files which need to be cached on the execute nodes. + * + * @return the list of files which need to be cached on the execute nodes + */ + public List getCacheSpecs() { + return cacheSpec; + } + + /** + * Add a file to be shipped to the cluster. + * + * Users can use this to distribute executables and other necessary files + * to the clusters. + * + * @param path path of the file to be shipped to the cluster + */ + public void addPathToShip(String path) throws IOException { + // Validate + File file = new File(path); + if (!file.exists()) { + throw new IOException("Invalid ship specification: '" + path + + "' does not exist!"); + } else if (file.isDirectory()) { + throw new IOException("Invalid ship specification: '" + path + + "' is a directory and can't be shipped!"); + } + shipSpec.add(path); + } + + /** + * Add a file to be cached on execute nodes on the cluster. The file is + * assumed to be available at the shared filesystem. + * + * @param path path of the file to be cached on the execute nodes + */ + public void addPathToCache(String path) throws IOException { + // Validate + URI pathUri = null; + URI dfsPath = null; + try { + pathUri = new URI(path); + + // Strip away the URI's _fragment_ and _query_ + dfsPath = new URI(pathUri.getScheme(), pathUri.getAuthority(), + pathUri.getPath(), null, null); + } catch (URISyntaxException urise) { + throw new IOException("Invalid cache specification: " + path); + } + + boolean exists = false; + try { + exists = FileLocalizer.fileExists(dfsPath.toString(), pigContext); + } catch (IOException ioe) { + // Throw a better error message... + throw new IOException("Invalid cache specification: '" + dfsPath + + "' does not exist!"); + } + + if (!exists) { + throw new IOException("Invalid cache specification: '" + dfsPath + + "' does not exist!"); + } else if (FileLocalizer.isDirectory(dfsPath.toString(), pigContext)) { + throw new IOException("Invalid cache specification: '" + dfsPath + + "' is a directory and can't be cached!"); + } + + cacheSpec.add(path); + } + + /** + * Attach a {@link HandleSpec} to a given {@link Handle} + * @param handle Handle to which the specification is to + * be attached. + * @param handleSpec HandleSpec for the given handle. + */ + public void addHandleSpec(Handle handle, HandleSpec handleSpec) { + List handleSpecList = handleSpecs.get(handle); + + if (handleSpecList == null) { + handleSpecList = new LinkedList(); + handleSpecs.put(handle, handleSpecList); + } + + handleSpecList.add(handleSpec); + } + + /** + * Set the input specification for the StreamingCommand. + * + * @param spec input specification + */ + public void setInputSpec(HandleSpec spec) { + List inputSpecs = getHandleSpecs(Handle.INPUT); + if (inputSpecs == null || inputSpecs.size() == 0) { + addHandleSpec(Handle.INPUT, spec); + } else { + inputSpecs.set(0, spec); + } + } + + /** + * Get the input specification of the StreamingCommand. + * + * @return input specification of the StreamingCommand + */ + public HandleSpec getInputSpec() { + List inputSpecs = getHandleSpecs(Handle.INPUT); + if (inputSpecs == null || inputSpecs.size() == 0) { + addHandleSpec(Handle.INPUT, new HandleSpec("stdin", PigStorage.class.getName())); + } + return getHandleSpecs(Handle.INPUT).get(0); + } + + /** + * Set the specification for the primary output of the + * StreamingCommand. + * + * @param spec specification for the primary output of the + * StreamingCommand + */ + public void setOutputSpec(HandleSpec spec) { + List outputSpecs = getHandleSpecs(Handle.OUTPUT); + if (outputSpecs == null || outputSpecs.size() == 0) { + addHandleSpec(Handle.OUTPUT, spec); + } else { + outputSpecs.set(0, spec); + } + } + + /** + * Get the specification of the primary output of the + * StreamingCommand. + * + * @return specification of the primary output of the + * StreamingCommand + */ + public HandleSpec getOutputSpec() { + List outputSpecs = getHandleSpecs(Handle.OUTPUT); + if (outputSpecs == null || outputSpecs.size() == 0) { + addHandleSpec(Handle.OUTPUT, new HandleSpec("stdout", PigStorage.class.getName())); + } + return getHandleSpecs(Handle.OUTPUT).get(0); + } + + /** + * Get specifications for the given Handle. + * + * @param handle Handle of the stream + * @return specification for the given Handle + */ + public List getHandleSpecs(Handle handle) { + return handleSpecs.get(handle); + } + + /** + * Should the stderr of the managed process be persisted? + * + * @return true if the stderr of the managed process should be + * persisted, false otherwise. + */ + public boolean getPersistStderr() { + return persistStderr; + } + + /** + * Specify if the stderr of the managed process should be persisted. + * + * @param persistStderr true if the stderr of the managed + * process should be persisted, else false + */ + public void setPersistStderr(boolean persistStderr) { + this.persistStderr = persistStderr; + } + + /** + * Get the directory where the log-files of the command are persisted. + * + * @return the directory where the log-files of the command are persisted + */ + public String getLogDir() { + return logDir; + } + + /** + * Set the directory where the log-files of the command are persisted. + * + * @param logDir the directory where the log-files of the command are persisted + */ + public void setLogDir(String logDir) { + this.logDir = logDir; + if (this.logDir.startsWith("/")) { + this.logDir = this.logDir.substring(1); + } + setPersistStderr(true); + } + + /** + * Get the maximum number of tasks whose stderr logs files are persisted. + * + * @return the maximum number of tasks whose stderr logs files are persisted + */ + public int getLogFilesLimit() { + return logFilesLimit; + } + + /** + * Set the maximum number of tasks whose stderr logs files are persisted. + * @param logFilesLimit the maximum number of tasks whose stderr logs files + * are persisted + */ + public void setLogFilesLimit(int logFilesLimit) { + this.logFilesLimit = Math.min(MAX_TASKS, logFilesLimit); + } + + /** + * Set whether files should be shipped or not. + * + * @param shipFiles true if files of this command should be + * shipped, false otherwise + */ + public void setShipFiles(boolean shipFiles) { + this.shipFiles = shipFiles; + } + + /** + * Get whether files for this command should be shipped or not. + * + * @return true if files of this command should be shipped, + * false otherwise + */ + public boolean getShipFiles() { + return shipFiles; + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + for (String arg : getCommandArgs()) { + sb.append(arg); + sb.append(" "); + } + sb.append("(" + getInputSpec().toString() + "/"+getOutputSpec() + ")"); + + return sb.toString(); + } + + public Object clone() { + try { + StreamingCommand clone = (StreamingCommand)super.clone(); + + clone.shipSpec = new ArrayList(shipSpec); + clone.cacheSpec = new ArrayList(cacheSpec); + + clone.handleSpecs = new HashMap>(); + for (Map.Entry> e : handleSpecs.entrySet()) { + List values = new ArrayList(); + for (HandleSpec spec : e.getValue()) { + values.add((HandleSpec)spec.clone()); + } + clone.handleSpecs.put(e.getKey(), values); + } + + return clone; + } catch (CloneNotSupportedException cnse) { + // Shouldn't happen since we do implement Clonable + throw new InternalError(cnse.toString()); + } + } + + + /** + * Specification about the usage of the {@link Handle} to communicate + * with the external process. + * + * It specifies the stream-handle which can be one of stdin/ + * stdout/stderr or a named file and also the + * serializer/deserializer specification to be used to read/write data + * to/from the stream. + */ + public static class HandleSpec + implements Comparable, Serializable, Cloneable { + private static final long serialVersionUID = 1L; + + String name; + String spec; + + /** + * Create a new {@link HandleSpec} with a given name using the default + * {@link PigStorage} serializer/deserializer. + * + * @param handleName name of the handle (one of stdin, + * stdout or a file-path) + */ + public HandleSpec(String handleName) { + this(handleName, PigStorage.class.getName()); + } + + /** + * Create a new {@link HandleSpec} with a given name using the default + * {@link PigStorage} serializer/deserializer. + * + * @param handleName name of the handle (one of stdin, + * stdout or a file-path) + * @param spec serializer/deserializer spec + */ + public HandleSpec(String handleName, String spec) { + this.name = handleName; + this.spec = spec; + } + + public int compareTo(HandleSpec o) { + return this.name.compareTo(o.name); + } + + public String toString() { + return name + "-" + spec; + } + + /** + * Get the name of the HandleSpec. + * + * @return the name of the HandleSpec (one of + * stdin, stdout or a file-path) + */ + public String getName() { + return name; + } + + /** + * Set the name of the HandleSpec. + * + * @param name name of the HandleSpec (one of + * stdin, stdout or a file-path) + */ + public void setName(String name) { + this.name = name; + } + + /** + * Get the serializer/deserializer spec of the HandleSpec. + * + * @return the serializer/deserializer spec of the + * HandleSpec + */ + public String getSpec() { + return spec; + } + + /** + * Set the serializer/deserializer spec of the HandleSpec. + * + * @param spec the serializer/deserializer spec of the + * HandleSpec + */ + public void setSpec(String spec) { + this.spec = spec; + } + + public boolean equals(Object obj) { + HandleSpec other = (HandleSpec)obj; + return (name.equals(other.name) && spec.equals(other.spec)); + } + + + public Object clone() { + try { + return super.clone(); + } catch (CloneNotSupportedException cnse) { + // Shouldn't happen since we do implement Clonable + throw new InternalError(cnse.toString()); + } + } + } +} \ No newline at end of file Modified: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java?rev=692253&r1=692252&r2=692253&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java Thu Sep 4 14:25:41 2008 @@ -17,6 +17,7 @@ */ package org.apache.pig.tools.grunt; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.Reader; @@ -172,6 +173,15 @@ //mPigServer.setJobName(unquote(value)); mPigServer.setJobName(value); } + else if (key.equals("stream.skippath")) { + // Validate + File file = new File(value); + if (!file.exists() || file.isDirectory()) { + throw new IOException("Invalid value for stream.skippath:" + + value); + } + mPigServer.addPathToSkip(value); + } else { // other key-value pairs can go there