hadoop-pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alan Gates (JIRA)" <j...@apache.org>
Subject [jira] Commented: (PIG-161) Rework physical plan
Date Mon, 16 Jun 2008 22:15:48 GMT

    [ https://issues.apache.org/jira/browse/PIG-161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12605422#action_12605422
] 

Alan Gates commented on PIG-161:
--------------------------------

As part of the pipeline rework, we have discovered some issues in the original design regarding
how we handle foreach/generate.  In particular this has
been an issue for how nested plans will work.  The original design looked like this:

{noformat}
Foreach {
    Plan nestedPlan; // one plan for the whole nested logic.  All the operations in this plan
would be relational operators.
}

Generate {
    Plan[] nestedPlans; // one plan for each element that would be generated.  All operators
in this plan would be expression operators.
}
{noformat}

The runtime logic of foreach.getNext() was:
{noformat}
input = predecessor.getNext();
nestedPlan.attach(input);
rc = nestedPlan.getNext();
return rc;
{noformat}

and the runtime logic of generate.getNext() was:
{noformat}
rc = new tuple
for (np : nestedPlans) {
    tuple[i] = np.getNext();
}
handle any flattening;
return rc;
{noformat}

This led to a couple of issues.
# Nested plans which are DAGs and not trees (which are quite common) are hard to handle.
# Generate was not properly running all tuples through the nested plan before passing on the
input to its attached plans.  This led to aggregate functions getting only the first tuple
in a bag instead of all the tuples.  Generate could not really be changed to address this
as it is not clear to it when it does and doesn't need to keep pulling for tuples.

To address this issue we propose the following design.

{noformat}
Foreach {
    Plan[] nestedPlans; // one plan for each element in the projection.  May be relational
or expression operators.
}

Accumulate {
    Plan nestedPlan; // Consists entirely of expression operators.
}
{noformat}

In the case where the nested plan contains a realtional operator (which means there was an
actual nested section in the script) then a new relational
operator, accumulate, will be added to the end of the plan.  It's task will be to accumulate
tuples from the nested pipeline, construct a bag, and
attach that bag as the input to any expression operators given as part of the generate projection.

So the runtime logic of foreach.getNext() will now be:
{noformat}
input = predecessor.getNext();
rc = new tuple;
for (np : nestPlans) {
    np.attach(input);
    tuple[i] = np.getNext();
}
handle any flattening;
return rc;
{noformat}

and the rutime logic of accumulate.getNext() will be:
{noformat}
input = predecessor.getNext();
nestedPlan.attach(input);
return nestedPlan.getNext();
{noformat}

For clarity, let us consider a couple of use cases.

Case 1:  no nested section in the script.
{noformat}
A = load 'myfile';
B = group A by $0;
C = foreach B generate group, COUNT($1), SUM($1.$1);
{noformat}

The plans for this will be:

Top level plan:

load -> group -> foreach

The foreach will have three nested plans:

plan 1: project(0)

plan 2: project(1) -> COUNT()

plan 3: project(1) -> project(1) -> SUM()

So for each tuple it gets, foreach will attach it to each of the above three plans, and then
call getNext() on the leaves of those plans.  It will take
the resulting three values and construct a tuple and return that as the result of its getNext().

One thing to note here is in plan 3, project is being used in different ways.  The first project
is being handed a tuple, and asked to output
a single field (which happens to be a bag).  The second project is being handed a bag and
asked to output a bag.  The schema of its input bag is
{(int, int)} and the schema of its output bag is {(int)}.  So project needs to know when it
is being used to output a bag vs when it is being asked to
output a field from a tuple, which may be of any type.

Case 2:  nested section in the script
{noformat}
A = load 'myfile';
B = group A by $0;
C = foreach B {
    C1 = distinct $1;
    C2 = filter $1 by $1 > 0;
    generate group, COUNT(C1), SUM(C2.$1);
};
{noformat}

The plans for this will be:

Top level plan:

load -> group -> foreach

The foreach will have three nested plans:

plan 1:  project(0)

plan 2: project(1) -> distinct -> accumulate

The accumulate will have a nested plan of: project( * ) -> COUNT()

plan 3: project(1) -> filter -> accumulate

The accumulate will have a nested plan of : project(1) -> SUM()

So effectively, we're proposing several changes:

# Removal of generate as a relational operator.  Its functionality will be absorbed into foreach.
# Splitting of nested plans, one for each generate.  This sacrifices some optimizations. 
Consider for example if in the script above it had been C2 = filter C1 by $1 > 0.  Ideally
we would only evaluate distinct once.  Under this proposal we would evaluate it twice.  For
now that is ok because evaluating it once and then splitting output is much more difficult.
# Creation of "bookend" operators.  The project will facilitate transition from expression
operators to relational operators at the top of these nested plans by taking a bag attached
as input and streaming out the tuples one at a time (this functionality is already in place,
built for the original generate implementation).  The accumulate will facilitate transitions
from realtions to an expression operator.  One oddity here will be that it will be the only
relational operator that can return a type other than bag.  It may return any type.

This proposal will entail the following changes:

On the physical side:

# Change project.getNext(Bag) to handle the case where it's given a bag.  In this case it
should return a bag, stripping the bag to contain only the field(s) being projected.
# Change foreach to handle functionality previously in generate, including the flattening
logic.
# Create an accumulate operator.

On the logical side:
# Changes to project.getSchema() to figure out when project needs to return a bag vs when
it needs to return any type
# Changes to parsing of foreach to decide when an accumulate is necessary in the plan, and
when it isn't.
# Changes to foreach.getSchema() to take on much of the previous functionality of generate.getSchema().
 
# These changes will most likely force changes in the type checker as well.



> Rework physical plan
> --------------------
>
>                 Key: PIG-161
>                 URL: https://issues.apache.org/jira/browse/PIG-161
>             Project: Pig
>          Issue Type: Sub-task
>            Reporter: Alan Gates
>            Assignee: Alan Gates
>         Attachments: arithmeticOperators.patch, BinCondAndNegative.patch, CastAndMapLookUp.patch,
incr2.patch, incr3.patch, incr4.patch, incr5.patch, logToPhyTranslator.patch, missingOps.patch,
MRCompilerTests_PlansAndOutputs.txt, Phy_AbsClass.patch, physicalOps.patch, physicalOps.patch,
physicalOps.patch, physicalOps.patch, physicalOps_latest.patch, POCast.patch, POCast.patch,
podistinct.patch, pogenerate.patch, pogenerate.patch, pogenerate.patch, posort.patch, POUserFuncCorrection.patch,
TEST-org.apache.pig.test.TestLocalJobSubmission.txt, TEST-org.apache.pig.test.TestLogToPhyCompiler.txt,
TEST-org.apache.pig.test.TestLogToPhyCompiler.txt, TEST-org.apache.pig.test.TestMapReduce.txt,
TEST-org.apache.pig.test.TestTypeCheckingValidator.txt, TEST-org.apache.pig.test.TestUnion.txt,
translator.patch, translator.patch, translator.patch, translator.patch
>
>
> This bug tracks work to rework all of the physical operators as described in http://wiki.apache.org/pig/PigTypesFunctionalSpec

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message