hadoop-pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "David Ciemiewicz (JIRA)" <j...@apache.org>
Subject [jira] Commented: (PIG-697) Proposed improvements to pig's optimizer
Date Wed, 08 Apr 2009 18:43:13 GMT

    [ https://issues.apache.org/jira/browse/PIG-697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12697146#action_12697146
] 

David Ciemiewicz commented on PIG-697:
--------------------------------------

Some thoughts on optimization problems and patterns from SQL and coding Pig and my desire
for a higher level version of Pig than we have today.

I know this may come off as "distraction" but hopefully you'll have some time to hear me out.

* after a conversation with Santhosh about the SQL to Pig translation work 
* multiple issues I have countered with nested foreach statements including redundant function
execution 
* nested FOREACH statement "assignment" computation bugs 
* hand coding chains of foreach statements so I can get the Algebraic combiner to kick 
* hand coding chains of foreach statements and grouping statements rather than using a single
statement

I think I might have stumbled on a potentially improved model for Pig to Pig execution plan
generation:

{code}
            High Level Pig to Low Level Pig translation
{code}

I think this would potentially benefit the SQL to Pig efforts and provide for programmer coding
efficiency in Pig as well.

This will be a bit protracted, but I hope you have some time to consider it.

Take the following SQL idiom that the SQL to Pig translator will need to support:

{code}
            select
                        EXP(AVG(LN(time+0.1))) as geomean_time
            from
                        events
            where
                        time is not null and
                        time >= 0;
{code}

In "high level pig", I have wanted to code this as"
 
{code}
            A = load 'events' using PigStorage() as ( time: int );
            B = filter A by time is not null and time >= 0;
            C = group B all;
            D = foreach C generate EXP(AVG(LN(B.time+0.1))) as geomean_time;
{code}

In fact, this would seem to provide a nice translation path from SQL to "low level pig" via
"high level pig".

Unfortunately, this won't work.  We developers must write Pig scripts at a lower level and
break all of this apart into various steps.

An additional issue is that, because of some, um, workarounds, in the execution plan optimizations,
the combiner won't kick in if we don't do further steps.

So the most "performant" version of the desired pig script is the following really "low level
pig" where D is broken into 3 steps, merging one with B and the remaining 2 steps as separate
D steps:

 
{code}
            A = load 'events' using PigStorage() as ( time: int );
            B = filter A by time is not null and time >= 0;
            B = foreach A generate LOG(time+0.1) as log_time;
            C = group B all;
            D = foreach C generate group, AVG(B.log_time) as mean_log_time;
                                    -- note that group alias is required for Algebraic combiner
to kick in
            D = foreach D generate EXP(mean_log_time) as geomean_time;
{code}

If we can figure out how to translate SQL into this last "low-level" set of statements, why
couldn't we or shouldn't we have "high level pig" as well and permit more efficient code writing
and optimization?


Next example

I do a bunch of nested intermediate computations in a nested FOREACH statement:

{code}
C = foreach C {
        curr_mean_log_timetonextevent = curr_sum_log_timetonextevent / (double)count;
        curr_meansq_log_timetonextevent = curr_sumsq_log_timetonextevent / (double)count;
        curr_var_log_timetonextevent = curr_meansq_log_timetonextevent - 
                        (curr_mean_log_timetonextevent * curr_mean_log_timetonextevent);
        curr_sterr_log_timetonextevent = math.SQRT(curr_var_log_timetonextevent / (double)count);
 

        curr_geomean_timetonextevent = math.EXP(curr_mean_log_timetonextevent);
        curr_geosterr_timetonextevent = math.EXP(curr_sterr_log_timetonextevent);
        curr_mean_timetonextevent = curr_sum_log_timetonextevent / (double)count;
        curr_meansq_timetonextevent = curr_sumsq_log_timetonextevent / (double)count;
        curr_var_timetonextevent = curr_meansq_timetonextevent - 
                        (curr_mean_timetonextevent * curr_mean_timetonextevent);

        curr_sterr_timetonextevent = math.SQRT(curr_var_timetonextevent / count);

        generate
            ...
{code}

The code for nested statements in Pig has been particularly problematic and buggy including
problems such as:

* redundant execution of functions such as SUM, AVG
* nested function problems
* mathematical operator problems (illustrated in this bug)
* no type propagation
* the need to use AS clauses to name nested alias assignments projected in the GENERATE clauses

What if instead of trying to do all of these operations in some specialized execution code,
what if this was treated as "high level" pig that translated all of these intermediate statements
into two or more "low level" foreach expansions.

This isn't as wild as it seems because 9 times out of 10, the "workaround" that I have had
to do is exactly that: I had to stop using nested foreach and instead break the code into
two separate foreach statements chained together.

In other words I went from the above nested foreach statement that generated errors and didn't
work to two hand coded foreach statements (or more) that did:

{code}
C = foreach C generate
            *,
            curr_sum_log_timetonextevent / (double)count as curr_mean_log_timetonextevent,
            curr_sumsq_log_timetonextevent / (double)count as curr_meansq_log_timetonextevent;

C = foreach C generate=
            *,
            curr_meansq_log_timetonextevent - 
                        (curr_mean_log_timetonextevent * curr_mean_log_timetonextevent)
                                                                                    as curr_var_log_timetonextevent;

C = foreach C generate
            *,
            math.SQRT(curr_var_log_timetonextevent / (double)count) as curr_sterr_log_timetonextevent;
{code}

This was the only way I could avoid the redundant computations and get the code actually work.
Well, actually if I added casts at appropriate places, it also worked, but what a pain.

This would also have the advantage that alias names used in the nested "assignments" would
actually propagate without an "as" clause in the subsequent generate statement.

I know this is a "brain fart" but it does have a time honored tradition in languages like
C, C++, Lisp of using the language to "bootstrap" the language by translating from more "high
level" idioms to less feature rich "low level" idioms.

It just seemed like a plausible way of speeding up both development of a SQL to Pig translator
as well as allowing a more rapid transition of Pig to higher level idioms while correcting
whole swaths of execution bugs and performance optimization issues as well.




> Proposed improvements to pig's optimizer
> ----------------------------------------
>
>                 Key: PIG-697
>                 URL: https://issues.apache.org/jira/browse/PIG-697
>             Project: Pig
>          Issue Type: Bug
>          Components: impl
>            Reporter: Alan Gates
>            Assignee: Alan Gates
>
> I propose the following changes to pig optimizer, plan, and operator functionality to
support more robust optimization:
> 1) Remove the required array from Rule.  This will change rules so that they only match
exact patterns instead of allowing missing elements in the pattern.
> This has the downside that if a given rule applies to two patterns (say Load->Filter->Group,
Load->Group) you have to write two rules.  But it has the upside that
> the resulting rules know exactly what they are getting.  The original intent of this
was to reduce the number of rules that needed to be written.  But the
> resulting rules have do a lot of work to understand the operators they are working with.
 With exact matches only, each rule will know exactly the operators it
> is working on and can apply the logic of shifting the operators around.  All four of
the existing rules set all entries of required to true, so removing this
> will have no effect on them.
> 2) Change PlanOptimizer.optimize to iterate over the rules until there are no conversions
or a certain number of iterations has been reached.  Currently the
> function is:
> {code}
>     public final void optimize() throws OptimizerException {
>         RuleMatcher matcher = new RuleMatcher();
>         for (Rule rule : mRules) {
>             if (matcher.match(rule)) {
>                 // It matches the pattern.  Now check if the transformer
>                 // approves as well.
>                 List<List<O>> matches = matcher.getAllMatches();
>                 for (List<O> match:matches)
>                 {
> 	                if (rule.transformer.check(match)) {
> 	                    // The transformer approves.
> 	                    rule.transformer.transform(match);
> 	                }
>                 }
>             }
>         }
>     }
> {code}
> It would change to be:
> {code}
>     public final void optimize() throws OptimizerException {
>         RuleMatcher matcher = new RuleMatcher();
>         boolean sawMatch;
>         int iterators = 0;
>         do {
>             sawMatch = false;
>             for (Rule rule : mRules) {
>                 List<List<O>> matches = matcher.getAllMatches();
>                 for (List<O> match:matches) {
>                     // It matches the pattern.  Now check if the transformer
>                     // approves as well.
>                     if (rule.transformer.check(match)) {
>                         // The transformer approves.
>                         sawMatch = true;
>                         rule.transformer.transform(match);
>                     }
>                 }
>             }
>             // Not sure if 1000 is the right number of iterations, maybe it
>             // should be configurable so that large scripts don't stop too 
>             // early.
>         } while (sawMatch && numIterations++ < 1000);
>     }
> {code}
> The reason for limiting the number of iterations is to avoid infinite loops.  The reason
for iterating over the rules is so that each rule can be applied multiple
> times as necessary.  This allows us to write simple rules, mostly swaps between neighboring
operators, without worrying that we get the plan right in one pass.
> For example, we might have a plan that looks like:  Load->Join->Filter->Foreach,
and we want to optimize it to Load->Foreach->Filter->Join.  With two simple
> rules (swap filter and join and swap foreach and filter), applied iteratively, we can
get from the initial to final plan, without needing to understanding the
> big picture of the entire plan.
> 3) Add three calls to OperatorPlan:
> {code}
> /**
>  * Swap two operators in a plan.  Both of the operators must have single
>  * inputs and single outputs.
>  * @param first operator
>  * @param second operator
>  * @throws PlanException if either operator is not single input and output.
>  */
> public void swap(E first, E second) throws PlanException {
>     ...
> }
> /**
>  * Push one operator in front of another.  This function is for use when
>  * the first operator has multiple inputs.  The caller can specify
>  * which input of the first operator the second operator should be pushed to.
>  * @param first operator, assumed to have multiple inputs.
>  * @param second operator, will be pushed in front of first
>  * @param inputNum, indicates which input of the first operator the second
>  * operator will be pushed onto.  Numbered from 0.
>  * @throws PlanException if inputNum does not exist for first operator
>  */
> public void pushBefore(E first, E second, int inputNum) throws PlanException {
>     ...
> }
> /**
>  * Push one operator after another.  This function is for use when the second
>  * operator has multiple outputs.  The caller can specify which output of the
>  * second operator the first operator should be pushed to.
>  * @param first operator, will be pushed after the second operator
>  * @param second operator, assumed to have multiple outputs
>  * @param outputNum indicates which output of the second operator the first 
>  * operator will be pushed onto.  Numbered from 0.
>  * @throws PlanException if outputNum does not exist for second operator
>  */
> public void pushAfter(E first, E second, int outputNum) throws PlanException {
>     ...
> }
> {code}
> The rules in the optimizer can use these three functions, along with the existing insertBetween(),
replace(), and removeAndReconnect() calls to operate on the
> plan.
> 4) Add a new call to Operator:
> {code}
> /**
>  * Make any necessary changes to a node based on a change of position in the
>  * plan.  This allows operators to rewire their projections, etc. when they
>  * are relocated in a plan.
>  * @param oldPred Operator that was previously the predecessor.
>  * @param newPred Operator thwas will now be the predecessor.
>  * @throws PlanException
>  */
> public abstract void rewire(Operator oldPred, Operator newPred) throws PlanException;
> {code}
> This method will be called by the swap, pushBefore, pushAfter, insertBetween, replace,
and removeAndReconnect in OperatorPlan whenever an operator is moved
> around so that the operator has a chance to make any necessary changes.  
> 5) Add new calls to LogicalOperator and PhysicalOperator
> {code}
> /**
>  * A struct detailing how a projection is altered by an operator.
>  */
> public class ProjectionMap {
>     /**
>      * Quick way for an operator to note that its input and output are the
>      * same.
>      */
>     public boolean noChange;
>     /**
>      * Map of field changes, with keys being the output fields of the 
>      * operator and values being the input fields.  Fields are numbered from
>      * 0.  So for a foreach operator derived from
>      * 'B = foreach A generate $0, $2, $3, udf($1)' 
>      * would produce a mapping of 0->0, 1->2, 2->3
>      */
>     public Map<Integer, Integer> mappedFields;
>     /**
>      * List of fields removed from the input.  This includes fields that were
>      * transformed, and thus are no longer the same fields.  Using the
>      * example foreach given under mappedFields, this list would contain '1'.
>      */
>     public List<Integer> removedFields;
>     /**
>      * List of fields in the output of this operator that were created by this
>      * operator.  Using the example foreach given under mappedFields, this list
>      * would contain '3'.
>      */
>     public List<Integer> addedFields;
> }
> /**
>  * Produce a map describing how this operator modifies its projection.
>  * @returns ProjectionMap null indicates it does not know how the projection
>  * changes, for example a join of two inputs where one input does not have
>  * a schema.
>  */
> public abstract ProjectionMap getProjectionMap();
> /**
>  * Get a list of fields that this operator requires.  This is not necessarily
>  * equivalent to the list of fields the operator projects.  For example,
>  * a filter will project anything passed to it, but requires only the fields
>  * explicitly referenced in its filter expression.
>  * @return list of fields, numbered from 0.
>  */
> public abstract List<Integer> getRequiredFields();
> {code}
> These calls will be called by optimizer rules to determine whether or not a swap can
be done (for example, you can't swap two operators if the second one uses a
> field added by the first), and once the swap is done they will be used by rewire to understand
how to map projections in the operators.
> 6)  It's not clear that the RuleMatcher, in its current form, will work with rules that
are not linear.  That is, it matches rules that look like:
> Operators {Foreach, Filter}
> Edges {0->1}
> But I don't know if it will match rules that look like:
> Operators {Scan, Scan, Join}
> Edges {0->2, 1->2}
> For the optimizer to be able to determine join types and operations with splits, it will
have to be able to do that.
> Examples of types of rules that is optimizer could support:
> 1) Pushing filters in front of joins.
> 2) Pushing foreachs with flattens (which thus greathly expand the data) down the tree
past filters, joins, etc.
> 3) Pushing type casting used for schemas in loads down to the point where the field is
actually used.
> 4) Deciding when to do fragment/replicate join or sort/merge join instead of the standard
hash join.
> 5) The current optimizations:  pushing limit up the tree, making implicit splits explicit,
merge load and stream where possible, using the combiner.
> 6) Merge filters or foreachs where possible
> In particular the combiner optimizer hopefully can be completely rewritten to use the
optimizer framework to make decisions about how to rework physical plans
> to push work into the combiner.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message