pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Pig Wiki] Update of "PigLogicalPlanOptimizerRewrite" by AlanGates
Date Tue, 05 Jan 2010 23:40:40 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The "PigLogicalPlanOptimizerRewrite" page has been changed by AlanGates.
http://wiki.apache.org/pig/PigLogicalPlanOptimizerRewrite

--------------------------------------------------

New page:
== Problem Statement ==
The current implementation of the logical plan and the logical optimizer in Pig has proven
to not be easily extensible.  Developer feedback has indicated that adding new
rules to the optimizer is quite burdensome.  In addition, the logical plan has been an area
of numerous bugs, many of which have been difficult to fix.  Developers also feel
that the logical plan is difficult to understand and maintain.  The root cause for these issues
is that a number of design decisions that were made as part of the 0.2 rewrite of the front
end have now proven to be
sub-optimal.  The heart of this proposal is to revisit a number of those proposals and rebuild
the logical plan with a simpler design that will make it much easier to
maintain the logical plan as well as extend the logical optimizer.

=== Issues that Need to be Addressed in this Rework ===
'''One:'''  !OperatorPlan has far too many operations.  It has 29 public methods.  This needs
to be paired down to a minimal set of operators that are well defined.

'''Two:''' Currently, relational operators (Join, Sort, etc.) and expression operators (add,
equals, etc.) are both !LogicalOperators.  Operators such as Cogroup that contain expressions
have !OperatorPlans that contain these expressions.  This was done for two reasons:
 1. To make it easier for visitors to visit both types of operators (that is, visitors didn't
have to have separate logic to handle expressions).
 1. To better handle the ambiguous nature of inner plans in Foreach.
However, it has led to visitors and graphs that are hard to understand.  Both of the above
concerns can be handled while breaking this binding so that relational and expression operators
are seaprate types.

'''Three:'''  Related to the issue of relational and expression operators sharing a type is
that inner plans have connections to outer plans.  Take for example a script like

{{{
A = load 'file1' as (x, y);
B = load 'file2' as (u, v);
C = cogroup A by x, B by u
D = filter C by A.x > 0;
}}}
In this case the cogroup will have two inner plans, one of which will be a project of A.x
and the other a project B.u.  The !LOProject objects representing these projections
will hold actual references to the !LOLoad operators for A and B.  This makes disconecting
and rearranging nodes in the plan much more difficult.  Consider if the optimizer wants to
move the filter in D above C.  Now it has to not only change connections in the outer plan
between load, cogroup, and filter; it also has to change connections in the
first inner plan of C, because this now needs to point to the !LOFilter for D rather than
the !LOLoad for A.

'''Four:'''  The work done on Operator and !OperatorPlan to support the original rules for
the optimizer had two main problems:
   1.. The set of primitives chosen were not the correct ones.
   1.. The operations chosen were put on the generic super classes (Operator) rather than
further down on the specific classes that would know how to implement them.

'''Five:'''  At a number of points efforts were made to keep the logical plan close to the
physical plan.  For example, !LOProject represents all of the same operations that
!POProject does.  While this is convenient in translation, it is not convenient when trying
to optimize the plan.  The !LogicalPlan needs to focus on reprenting the logic of
the script in a way that is easy for semantic checkers (such as !TypeChecker) and the optimizer
to work with.

'''Six:'''  The rule of one operation per operator was violated.  !LOProject handles three
separate roles (converting from a relational to an expression operator, actually
projecting, and converting from an expression to a relational operator).  This makes coding
much more complex for the optimizer because when it encounters an !LOProject it
must first determine which of these three roles it is playing before it can understand how
to work with it.

The following proposal will address all of these issues.

== Proposed Methodology ==
Fixing these issues will require extensive changes, including a complete rewrite of Operator,
!OperatorPlan, !PlanVisitor, !LogicalOperator, !LogicalPlan,
!LogicalPlanVisitor, every current subclass of !LogicalOperator, and all existing optimizer
rules.  It will also require extensive changes, though not complete rewrites, in
existing subclasses of !LogicalTransformer.  To avoid destablizing the entire codebase during
this operation, this will be done in a new set of packages as a totally separate
set of classes.  Linkage code will be written to translate the current !LogicalPlan to the
new experimental !LogicalPlan class.  A new !LogicalToPhysicalTranslator will also
be written to translate this new !LogicalPlan to a !PhysicalPlan.  This code path will only
be taken if some type of command line switch or property is set, thus insulating
current developers and users from this work.

This has the added advantage that it is easy to build a prototype first.  Given that our first
implementation now needs rewriting, prototyping first will help us explore
whether we solved the problem correctly this time.

== The Actual Proposal ==

=== Changes to Plans ===
In general, the generic plan classes will be changing in a couple of important ways:

One, they will be made much simpler.  The goal will be to find a minimal set of operations
that will enable all desired plan features.

Two, they will no longer be generics.  While nice in theory, this led to several observed
issues.  One, each class had so many parameters that in practice developers have
worked around rather than used the features of the generics.  That is, parameterizing the
classes seemed to get in developers way, rather than help them.  Two, since we
propose to break relational and expression operators into different types, it will no longer
be possible for a single visitor to span both types.  But we do not wish to
prohibit this in all cases.


New Operator class.  Note that the funciton which was previously called `visit` has been renamed
`accept` to avoid confusion with the `visit` method in !PlanVisitor.
{{{

package org.apache.pig.experimental.plan;

public abstract class Operator {
    
    protected String name;
    protected OperatorPlan plan; // plan that contains this operator

    public Operator(String n, OperatorPlan p) {...}

    /**
     * Accept a visitor at this node in the graph.
     * @param v Visitor to accept.
     */
    public abstract void accept(PlanVisitor v);

    public String getName() { ... }
    
    /**
     * Get the plan associated with this operator.
     * @return plan
     */
    public OperatorPlan getPlan() { ... }

}


}}}

New !OperatorPlan class.  Note the severe paring down of the number of operations.  Only simple
add, remove, connect, disconnect.  All all operations are left to subclasses to implement

what makes sense for their plans.

{{{

package org.apache.pig.experimental.plan;

public abstract class OperatorPlan {

    protected Set<Operator> ops;
    protected PlanEdge fromEdges;
    protected PlanEdge toEdges;

    public OperatorPlan() { ... }
    
    /**
	 * Get number of nodes in the plan.
     */
    public int size() { ... }

    /**
     * Get all operators in the plan that have no predecessors.
     * @return all operators in the plan that have no predecessors, or
     * an empty list if the plan is empty.
     */
    public List<Operator> getRoots() { ... }

    /**
     * Get all operators in the plan that have no successors.
     * @return all operators in the plan that have no successors, or
     * an empty list if the plan is empty.
     */
    public List<Operator> getLeaves() { ... }

    /**
     * For a given operator, get all operators immediately before it in the
     * plan.
     * @param op operator to fetch predecessors of
     * @return list of all operators imeediately before op, or an empty list
     * if op is a root.
     * @throws IOException if op is not in the plan.
     */
    public List<Operator> getPredecessors(Operator op) throws IOException { ... }
    
    /**
     * For a given operator, get all operators immediately after it.
     * @param op operator to fetch successors of
     * @return list of all operators imeediately after op, or an empty list
     * if op is a leaf.
     * @throws IOException if op is not in the plan.
     */
    public List<Operator> getSuccessors(Operator op) throws IOException { ... }

    /**
     * Add a new operator to the plan.  It will not be connected to any
     * existing operators.
     * @param op operator to add
     */
    public void add(Operator op) { ... }

    /**
     * Remove an operator from the plan.
     * @param op Operator to be removed
     * @throws IOException if the remove operation attempts to 
     * remove an operator that is still connected to other operators.
     */
    public void remove(Operator op) throws IOException { ... }
        
    /**
	 * Connect two operators in the plan, controlling which position in the
	 * edge lists that the from and to edges are placed.
	 * @param from Operator edge will come from
	 * @param fromPos Position in the array for the from edge
	 * @param to Operator edge will go to
	 * @param toPos Position in the array for the to edge
     */
    public void connect(Operator from,
                        int fromPos,
                        Operator to,
                        int toPos) { ... }
    
    /**
	 * Connect two operators in the plan.
	 * @param from Operator edge will come from
	 * @param to Operator edge will go to
     */
    public void connect(Operator from, Operator to) { ... }
    
    /**
     * Disconnect two operators in the plan.
     * @param from Operator edge is coming from
     * @param to Operator edge is going to
     * @return pair of positions, indicating the position in the from and
     * to arrays.
     * @throws IOException if the two operators aren't connected.
     */
    public Pair<Integer, Integer> disconnect(Operator from,
                                             Operator to) throws IOException { ... }

}

}}}

There are no significant changes to !PlanVisitor and !PlanWalker other than the removal of
generics.  With the change that only LOForeach now has inner plans, it isn't clear whether
the
`pushWalker` and `popWalker` methods in !PlanVisitor are really useful anymore or not.  These
may be removed.

== Changes to Logical Operators ==
There will be a number of important changes to !LogicalOperators.

First, as mentioned above relational opertors will be split into two disparate groups, relational
and expression.

Second, inner plans and expression plans will no longer hold any explicit references to outer
plans.  At most, they will reference the operator which contains the inner plan.

Third, operators will represent exactly one operation.

Fourth, only LOForeach will have inner plans.  All other relational operators will
only have expressions.

Fifth, a new operator, called for now Identity but desparately seeking a better name,
will be introduced.  The sole purpose of this operator will be to act as a root
in foreach's inner plans.  So, given a script like:

{{{
A = load 'input';
B = group A by $0;
C = foreach B {
    C1 = B.$1;
    C2 = distinct C1;
    generate group, COUNT(C2);
}
}}}

the foreach's inner plan will have two Identity operators, once for $0 and one for $1.
This will allow the C1 !LOGenerate operator to connect to another logical operator.

Sixth, in the past expression operators were used at places in inner plans, such as
the `C1 = B1.$0;` above.  Relational operators will always be used in these places
now.  LOGenerate (or perhaps a new operator if necessary) will be used in the place of
these assignment operations instead.

Seventh, in the past !LOForeach had multiple inner plans, one for each of its outputs.
That will no longer be the case.  !LOForeach will always have exactly one inner plan,
which must terminate with a !LOGenerate.  That !LOGenerate will have expressions for
each of its outputs.

All logical operators will have a schema.  This schema represents the format of the
output for that operator.  The schema can be null, which indicates that the format of
the output for that operator is unknown.  In general the notion of unknownness in a
schema will be contagious.  Take for example:

{{{
A = load 'file1' as (x: int, y: float);
B = load 'file2';
C = cogroup A by x, B by $0;
D = foreach C generate flatten(A), flatten(B);
}}}

A will have a schema, since one is specified for it.  B will not have a schema, since
one is not specified.  C will have a schema, because the schema of (co)group is always
known.  Note however that in C's schema, the bag A will have a schema, and the bag B
will not.  This means that D will not have schema, because the output of flatten(B) is
not known.  If D is changed to be `D = foreach C generate flatten(A);` then D will
have a schema, since the format of flatten(A) is known.

!LogicalPlan will contain `add` and `removeLogical` operations specificly designed for manipulating
logical plans.  These will be the only operations supported on the plan.

{{{

package org.apache.pig.experimental.logical.relational;

/**
 * LogicalPlan is the logical view of relational operations Pig will execute 
 * for a given script.  Note that it contains only realtional operations.
 * All expressions will be contained in LogicalExpressionPlans inside
 * each relational operator.  LogicalPlan provides operations for
 * removing and adding LogicalRelationalOperators.  These will handle doing
 * all of the necessary add, remove, connect, and disconnect calls in
 * OperatorPlan.  They will not handle patching up individual relational
 * operators.  That will be handle by the various Patchers.
 *
 */
public class LogicalPlan extends OperatorPlan {
    
    /**
     * Add a relational operation to the plan.
     * @param before operator that will be before the new operator.  This
     * operator should already be in the plan.  If before is null then
     * the new operator will be a root.
     * @param newOper new operator to add.  This operator should not already
     * be in the plan.
     * @param after operator  that will be after the new operator.  This
     * operator should already be in the plan.  If after is null, then the
     * new operator will be a root.
     * @throws IOException if add is already in the plan, or before or after
     * are not in the plan.
     */
    public void add(LogicalRelationalOperator before,
                    LogicalRelationalOperator newOper,
                    LogicalRelationalOperator after) throws IOException { ... }
   
    /**
     * Add a relational operation with multiple outputs to the plan.
     * @param before operators that will be before the new operator.  These
     * operator should already be in the plan.
     * @param newOper new operator to add.  This operator should not already
     * be in the plan.
     * @param after operator  that will be after the new operator.  This
     * operator should already be in the plan.  If after is null, then the
     * new operator will be a root.
     * @throws IOException if add is already in the plan, or before or after
     * are not in the plan.
     */
    public void add(List<LogicalRelationalOperator> before,
                    LogicalRelationalOperator newOper,
                    LogicalRelationalOperator after) throws IOException { ... }
    
    /**
     * Add a relational operation with multiple inputs to the plan.
     * @param before operator that will be before the new operator.  This
     * operator should already be in the plan.  If before is null then
     * the new operator will be a root.
     * @param newOper new operator to add.  This operator should not already
     * be in the plan.
     * @param after operators that will be after the new operator.  These
     * operator should already be in the plan.
     * @throws IOException if add is already in the plan, or before or after
     * are not in the plan.
     */
    public void add(LogicalRelationalOperator before,
                    LogicalRelationalOperator newOper,
                    List<LogicalRelationalOperator> after) throws IOException { ...
}
    
    /**
     * Add a relational operation to the plan when the caller wants to control
     * how the nodes are connected in the graph.
     * @param before operator that will be before the new operator.  This
     * operator should already be in the plan.  before should not be null.
     * the new operator will be a root.
     * @param beforeToPos Position in before's edges to connect newOper at.
     * @param beforeFromPos Position in newOps's edges to connect before at.
     * @param newOper new operator to add.  This operator should not already
     * be in the plan.
     * @param afterToPos Position in after's edges to connect newOper at.
     * @param afterFromPos Position in newOps's edges to connect after at.
     * @param after operator  that will be after the new operator.  This
     * operator should already be in the plan.  If after is null, then the
     * new operator will be a root.
     * @throws IOException if add is already in the plan, or before or after
     * are not in the plan.
     */
    public void add(LogicalRelationalOperator before,
                    int beforeToPos,
                    int beforeFromPos,
                    LogicalRelationalOperator newOper,
                    int afterToPos,
                    int afterFromPos,
                    LogicalRelationalOperator after) throws IOException { ... }
    
    /**
     * Remove an operator from the logical plan.  This call will take care
     * of disconnecting the operator, connecting the predecessor(s) and 
     * successor(s) and patching up the plan. 
     * @param op operator to be removed.
     * @throws IOException If the operator is not in the plan.
     */
    public void removeLogical(LogicalRelationalOperator op) throws IOException { ... }
        
}


}}}

A !LogicalRelationalOperator will be the logical representation of a relational operator (join,
sort, etc.).

{{{

package org.apache.pig.experimental.logical.relational;

/**
 * Logical representation of relational operators.  Relational operators have
 * a schema.
 */
abstract public class LogicalRelationalOperator extends Operator {
    
    protected LogicalSchema schema;
    protected int requestedParallelism;
    protected String alias;
    protected int lineNum;

    /**
     * 
     * @param name of this operator
     * @param plan this operator is in
     */
    public LogicalRelationalOperator(String name, OperatorPlan plan) { ... }
    
    /**
     * 
     * @param name of this operator
     * @param plan this operator is in
     * @param rp requested parallelism
     */
    public LogicalRelationalOperator(String name,
                                     OperatorPlan plan,
                                     int rp) { ... }
    
    /**
     * Get the schema for the output of this relational operator.  This does
     * not merely return the schema variable.  If schema is not yet set, this
     * will attempt to construct it.  Therefore it is abstract since each
     * operator will need to construct its schema differently.
     * @return the schema
     */
    abstract public LogicalSchema getSchema();
    
    /**
     * Get the requestedParallelism for this operator.
     * @return requestedParallelsim
     */
    public int getRequestedParallelism() { ... }
    
    /**
     * Get the alias of this operator.  That is, if the Pig Latin for this operator
     * was 'X = sort W by $0' then the alias will be X.  For store and split it will
     * be the alias being stored or split.  Note that because of this this alias
     * is not guaranteed to be unique to a single operator.
     * @return alias
     */
    public String getAlias() { ... }
    
    /**
     * Get the line number in the submitted Pig Latin script where this operator
     * occurred.
     * @return line number
     */
    public int getLineNumber() { ... }

}

}}}

!LogicalSchema will be based on the existing Schema class.  It is hoped that this class can
be greatly simplified.

!LogicalExpressionPlan will extend !OperatorPlan and contain !LogicalExpressionOperators.
 Often expression trees are built with the expressions themselves containing references to
the
next expression in the tree.  For example, a common implementation would be something like:

{{{
abstract class BinaryExpression {
    Expression leftHandSide;
    Expression rightHandSide;
}

class Plus extends BinaryExpression {
...
}

}}}

Since we already have plan structure we will have a !LogicalExpressionPlan.  This has the
advantage that !PlanVisitors will work with expression trees and we do not need to invent
a separate
visitor hierarchy. What operations !LogicalExpressionPlan will need is not yet clear.

!LogicalExpressionOperators will have a data type (the type they return) and a unique identifier
(uid).  The point of the uid is to allow the optimizer to track how expressions flow through
the tree.  So projection expressions will have the same uid as the expression they are projecting.
 All other expressions will create a new uid, since they are changing the value of the
expression.

Consider the following example:

{{{
A = load 'file1' as (x:int, y:int);
B = filter A by x > 0 and y > 0;
C = foreach A generate x + y;
}}}

In this case x and y will be assigned uids in load, where they first enter the script.
The output of filter will maintain these same uids since filter does not alter the
format of its input.  But the output of foreach will have a different uid, since this
creates a new value in the script.

Hopefully an example will make all of this somewhat clearer.  Consider the following
script:

{{{
A = load 'input1' as (x: int, y: chararray);
B = load 'input2' as (u: int, v: float);
C = filter A by x is not null;
D = filter B by u is not null and v > 0.0;
E = join C on x, D on u;
F = group E on x;
G = foreach F {
    H = E.y;
    I = distinct H;
    J = order E by v;
    generate group, COUNT(I), CUMULATIVE(J);
}
store G into 'output';
}}}

That script will produce a logical plan that looks like the following:

{{attachment:logicalplan.jpg}}

The !LOFilter for D will have an expression plan that looks like:

{{attachment:expressiontree.jpg}}

== Changes to the Optimizer ==
The following changes will be made to the optimizer:
 1. Currently all rules are handed to the optimizer at once, and it iterates over them until
none of the rules trigger or it reaches the maximum number of iterations.  This will be changed
so that rules are collected into sets.  The optimizer will then iterate over rules in each
set until none of the rules trigger or it reaches the maximum number of iterations.  The reason
for this change will be made clear below.
 1. Currently the plan itself has the knowledge of how to patch itself up after it is rearranged.
 (For example, how to reconstruct schemas after a plan is changed.)  This will be changed
so that instead the optimizer can register a number of listeners (aka observers) on the plan.
 These listeners will then be invoked after each rule that modifies the plan.  In these way
the plans themselves need not understand how to patch up changes made by an optimization rule.
 Also as we expand the plans and they record more information, it is easy to add new listeners
without having to interact with existing functionality.
 1. The Rule class will be merged with the existing !RuleMatcher class so that Rule takes
on the functionality of matching.  This match routine will be written once in Rule and extensions
of rule need not re-implement it.

The goal in the above changes is to radically simplify writing optimizer rules.  Consider
a rule to push a filter above a join.

{{{
A = load 'file1' as (x, y);
B = load 'file2' as (u, v);
C = join A by x, B by u;
D = filter C by (y > 0 or v > 0) and x > 0 and u > 0 and y > v;
}}}

In the current design, to push this filter, a rule must know how to split filters, how to
push the parts that pushable, and reconstruct the filters of the parts that are not.  In the
new
proposal we can instead create three rules.  Rule 1 will only know how split filters.  Rule
2 will only know how to push them.  And Rule 3 will only know how to reconstitute them.  These
rules
can then be placed in separate sets, so that they do not interfere with each other.  So in
this example, after Rule 1 has run, the script will conceptually look like

{{{
A = load 'file1' as (x, y);
B = load 'file2' as (u, v);
C = join A by x, B by u;
D1 = filter C by (y > 0 or v > 0);
D2 = filter D1 by  x > 0;
D3 = filter D2 by u > 0;
D = filter D3 by y > v;
}}}

Since Rule 1 will be run repeatedly it need not manage entirely splitting the filter.  It
can be written to simply split one and, allowing the next iteration to split any subsequents
ands.

After several iterations of Rule 2, the script will look like:

{{{
A = load 'file1' as (x, y);
D2 = filter A by  x > 0;
B = load 'file2' as (u, v);
D3 = filter B by u > 0;
C = join D2 by x, D3 by u;
D1 = filter C by (y > 0 or v > 0);
D = filter D1 by y > v;
}}}

Again, Rule 2 does need to do this in one pass.  It can concentrate on pushing filters past
the join one at a time.  (Most likely there will be a Rule 2.1 that will handle swapping filters
so that we can get D2 past D1 and then apply Rule 2 to push D2 before C.)

And finally, after Rule 3 has run, the script will look like:

{{{
A = load 'file1' as (x, y);
D2 = filter A by  x > 0;
B = load 'file2' as (u, v);
D3 = filter B by u > 0;
C = join D2 by x, D3 by u;
D = filter C by (y > 0 or v > 0) and y > v;
}}}

Writing each of these rules will be much simpler than writing one large rule that must handle
all three cases.

After each of these rules modify the tree, listeners will be notified that the tree has changed.
 The currently known listeners are one to reconstruct schemas based on the changes and one
to
assign uids to expressions in the tree.

As part of the prototype we need to identify a set of test case rules that we believe represent
operations that will need to be done on the plan.  Currently we have the following in this
list:
 * Pushing filter past join
 * Pushing filter above foreach with flatten
 * Pushing filter above group
 * Pushing filter above sort
 * Pushing filter above split
 * Pushing filter above union
 * Pushing filter above cross
 * Pushing foreach with flatten below cogroup, join, cross, union
 * Combine limit and sort
 * Combine load with stream
 * Combine stream with store
 * Combine two filters
 * Combine two foreachs
 * Combine two joins

Mime
View raw message