hadoop-pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Arun C Murthy (JIRA)" <j...@apache.org>
Subject [jira] Commented: (PIG-94) Pig Streaming functional spec proposal
Date Fri, 15 Feb 2008 22:06:08 GMT

    [ https://issues.apache.org/jira/browse/PIG-94?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12569435#action_12569435

Arun C Murthy commented on PIG-94:

I've put up a first cut here: http://wiki.apache.org/pig/PigStreamingDesign.



Pig Streaming 1.0 - Design

The main goal of Pig-Streaming 1.0 is to support a form of processing in which the entire
portion of the dataset that corresponds to a task in sent to the task and output streams out.
There is no temporal or causal correspondence between an input record and specific output

This document specs out the high-level design of how Pig will support the Streaming concept.
It builds off the functional spec documented at: http://wiki.apache.org/pig/PigStreamingFunctionalSpec.

Main Components: 
1. User-facing changes (e.g. Pig Latin) 
2. Logical Layer 
3. Physical Layer 
4. Streaming Implementation

1. User-facing changes

The main changes include the addition of the new STREAM operator and the enhancement of the
DEFINE operator to allow alias-ing the actual command to which data is streamed. (See the
wiki for details.)

There are two affected components:

a) QueryParser

Straight-forward changes to QueryParser include parsing the STREAM operator and then save
relevant details in a StreamEvalSpec. StreamEvalSpec is a sub-class of org.apache.pig.impl.eval.EvalSpec;
and it works similar to other Eval operators (FILTER|FOREACH) in the sense that it just takes
a bag of tuples and does one operation on each tuple. It also ensures that the STREAM operator
can be _chained_ with other Evals in exactly the same manner as in Pig today (by constructing

StreamEvalSpec also contains necessary details such as: 
i. Actual _command_ and it's arguments, if any. 
ii. Information about the _ship-spec_ and _cache-spec_ which will go through Hadoop's DistributedCache.

iii. Serializer/Deserializer information.

b) PigScriptParser

The PigScriptParser also needs to be enhanced to enable it to process the newer constructs
supported by the DEFINE operator. The one change we need to make to PigContext is to add a
PigContext.registerStreamingCommand api to enable the PigScriptParser to store the streaming
command and relevant information to be passed along to QueryParser and other components.


      StreamEvalSpec.java (extends EvalSpec)

      PigContext.java (add registerStreamingCommand)

2. Logical Layer

Since 'streaming' is an eval on each record in the dataset, it should still be a logical Eval
operator i.e. LOEval should suffice for streaming operations too.

3. Physical Layer

Pig's MapReduce physical layer shouldn't be affected at all, since the StreamEvalSpec neatly
fits into the map/reduce pipeline as another CompositeEvalSpec. (StreamEvalSpec.setupDefaultPipe
is the critical knob.)

4. Streaming Implementation

The main infrastructure to support the notion of data processing by sending dataset to a task's
input and collecting its output is a generic manager who takes care of setup/teardown of the
streaming task, manages it's stdin/stderr/stdout streams and also does post-processing. The
plan is to implement a org.apache.hadoop.mapred.lib.external.ExecutableManager to take over
the aforementioned responsibilities. The decision to keep that separate from Hadoop's Streaming
component (in contrib/streaming) to ensure that Pig has no extraneous dependency on Hadoop
streaming and am putting it into org.apache.hadoop.mapred.lib to ensure Pig depends on Hadoop
Core only.

The ExecutableManager also is responsible for dealing with multiple outputs of the streaming
tasks (refer to the functional spec in the wiki).


  class org.apache.hadoop.mapred.lib.external.ExecutableManager {
    void setup() throws Exception;
    void teardown() throws Exception;
    void setInput(Writable w);
    Writable getOutput();

The important deviation from current Pig infrastructure is that there isn't a one-to-one mapping
between inputs and output records anymore since the user-script could (potentially) consume
all the input before it emits any output records. Hence, StreamEvalSpec.add will call ExecutableManager.setInput((Writable)(d));
while it collects output from the task (ExecutableManager.getOutput()) and pass it along for
the next eval in the pipeline. 

> Pig Streaming functional spec proposal
> --------------------------------------
>                 Key: PIG-94
>                 URL: https://issues.apache.org/jira/browse/PIG-94
>             Project: Pig
>          Issue Type: New Feature
>            Reporter: Olga Natkovich
> This issue is for discussion about Pig streaming functional spec.
> http://wiki.apache.org/pig/PigStreamingFunctionalSpec

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message