pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Pig Wiki] Update of "PigMultiQueryPerformanceSpecification" by GuntherHagleitner
Date Sat, 07 Feb 2009 04:23:04 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The following page has been changed by GuntherHagleitner:

New page:
= Multi-query Performance =

Currently scripts with multiple store commands can result in a lot of duplicated work. The
idea how to avoid the duplication is described here: https://issues.apache.org/jira/browse/PIG-627

== External ==

=== Use cases: ===

==== Explicit/implicit split: ====

There might be cases in which you want to different processing on separate parts of the same
datastream. Like so:

A = load ...
split A' into B if ..., C if ...
store B' ...
store C' ...


A=load ...
B=filter A' ...
C=filter A' ...
store B' ...
store C' ...

In the current system the first example will dump A' to disk and then start jobs for B' and
C'. In the second example Pig will execute all the dependencies of B' and store it. And then
execute all the dependencies of C' and store it.

Both of the above are equivalent, but the performance will be different. 

Here's what we plan to do to increase the performance:

   * In the second case we will add an implicit split to transform the query to case number
one. That will eliminate the processing of A' multiple times.
   * Make the split non-blocking and allow processing to continue. This will help reduce the
amount of data that has to be stored right at the split.
   * Allow multiple outputs from a job. This way we can store some results as a side-effect.
This is also necessary to make the previous item work.
   * Allow multiple split branches to be carried on to the combiner/reducer. This will reduce
the amount of IO again in the case where multiple branches in the split can benefit from a
combiner run.

==== Storing intermediate results ====

Sometimes people will store intermediate results.

A=load ...
store A'
store A''

If the script doesn't re-load A' for the processing of A'' the steps above A' will be duplicated.
This is basically a special case of Number 2 above, so the same steps are recommended. With
the proposed changes the script will basically process A'' and dump A' as a side-effect. Which
is what the user probably wanted to begin with.

=== Why? ===

Pig's philosophy is: Optimize it yourself, why don't you.


   * Implicit splits: It's probably what you expect when you use the same handle in different
   * Store/Load vs Split: When optimizing, it's a reasonable assumption that splits are faster
than load/store combinations
   * Side-effects: There is no way right now to make use of this

=== Changes ===

==== Execution in batch mode ====

Batch mode is entered when Pig is given a script to execute. Interactive mode is on the grunt
shell ("grunt:>"). Right now there isn't much difference between them. In order for us
to optimize the multi-query case, we'll need to distinguish the two more. 

Right now whenever the parser sees a store (or dump, explain, illustrate or describe) it will
kick of the execution of that part of the script. Part of this proposal is that in batch mode,
we parse the entire script first and see if we can combine things to reduce the overall amount
of work that needs to be done. Only after that will the execution start. 

The following changes are proposed (in batch):

   * Store will not trigger an immediate execution. The entire script is considered before
the execution starts.
   * Explicit splits will be put in places where a handle has multiple children. If the user
wants to explicitly force re-computation of common ancestors she has to provide multiple scripts.
   * Multiple split branches/stores in the script will be combined into the same job, if possible.
Again, using multiple scripts is the way to go to avoid this (if that is desired).

For diagnostic operators there are some problems with this:

   * They work on handles, which only gives you a slice of the entire script execution at
a time. What's more, is that at the point they may occur in a script they might not give you
an accurate picture about the situation, since the execution plans might change once the entire
script is handled.
   * They change the logical tree. This means that we need to clone the tree before we run
them - something that we want to avoid in batch execution.

The proposal therefore is:

   * Have Pig in batch mode ignore explain, dump, illustrate and describe.
   * Add a load command to the shell to execute a script in interactive mode.
   * Add scripts as a target (in additions to handles) to some diagnostic parameters.
   * Add dot as an output type to explain (a graphical explanation of the graph will make
multi-query explains more understandable.)

That means that while someone is developing a PIG script they can put any diagnostic operator
into the script and then go to the grunt shell and load the script. The statement will be
executed and give you some information about that part of the script. When a script is loaded,
the user will also be able to refer to any handles defined in the script on the shell. 

Finally, when the script is ready the user can run the same script in batch and all the diagnostic
operators are ignored.

==== Load ====

(See https://issues.apache.org/jira/browse/PIG-574 - this is basically the same as requested

The new command has the format:

load <script name>

Which will run the script in interactive mode.

==== Explain ====

Changes to the command:

explain <script>||<handle> [using text||dot] [into <path>]


   * Explain is not executed in batch mode.
   * If explain is given a script, it will output the entire execution graph (logical, physical,
MR + moving result files)


   * Text will give what we have today, dot will output a format that can be passed to dot
for graphical display.
   * In Text mode, multiple output (split) will be broken out in sections.
   * Default (no using clause): Text


   * Will generate logical.[txt||dot], physical.[txt||dot], mapred.[txt||dot] in the specified
   * Default (no path given): Stdout

==== Illustrate ====

Changes to the command:

illustrate <script>||<handle> [into <file>]


   * Illustrate is not executed in batch mode.
   * If illustrate is given a script, it will output the entire execution graph (logical,
physical, MR + moving result files)


   * Will write the illustrate output into the specified file.
   * Default: Stdout

== Phases ==

These are the identified steps to get the proposal implemented.

=== Phase 1 ===
Phase one is about getting some infrastructural things in place. 

   * Batch execution (instead of single store execution)
   * Merge logical plans into single
   * Updated explain/illustrate
   * Change local and hadoop engine to execute multiple store plans (with store-load per split).

At the end of phase one, we'll have implicit splits. But the MR plan will be the same as if
the user made the splits explicit.

=== Phase 2 ===
Phase two is about getting the MR Compiler to agree to this.

   * Allow multiple stores in single job
   * Merge multiple plans into the split operator
   * Terminate all but one with stores

=== Phase 3 ===
Phase three is about using the combiner/reducer on multiple split branches in the same job

   * Merge Combiner/Reducer plans
   * Put in logic to decide when to multiplex pipelines

== Internal Changes ==

==== Grunt parser (Phase 1) ====
The parser currently uses a bottom up approach. When it sees a store (dump, explain), it goes
bottom up and generates the plan that needs to happen for this particular store. In order
to optimize the multi-query example, we need, however, a peek on the entire graph for a script
(interactive mode can be handled differently).

In order to do this we will change the batch mode of the parser to:

   * Not execute the plan when we see a store (or dump, illustrate, describe, explain - which
will be ignored)
   * Alter the already existing merge functionality to allow intersecting graphs to be joined
into a single logical plan.
   * Wait until the entire script is parsed and merged before sending the plan on to do validation,
optimization, etc.

The new "load" command will simply feed all the lines of a script through the interactive

==== Explain, Dump, Describe and Illustrate (Phase 1) ====

As described above the changes are:

   * Ignore these operations in batch mode
   * Add options to explain and illustrate to work on a script file as well as a handle.
   * Add the ability to print plans as dot files and to write explain and illustrate output
to files.

There will be some work to nicely represent the graphs resulting from explain in text form.
Right now operators with multiple outputs will result in the ancestor tree be duplicated for
each output. It might be nicer to show the ancestors once and mark the other places as copies
of that one.

==== Local Execution engine (Phase 1) ====
We need to make the local execution engine understand multiple store plans too. This might
come for free or at least cheaply. The current local engine uses a call to store() on each
physical store node to trigger the execution of the pipeline and write out the result. Split
is realized as a blocking operator that will process the entire input and hands out an iterator
to the tuples.

This will give the right result, but re-processes all the dependencies once per store. We
might in a later phase want to align this better with the hadoop engine and allow a non blocking
split as well as a separating the storing of the records with the pulling from the pipeline.

==== Implicit split insertion (Phase 1) ====
Implicit split insertion already exists as part of the optimizer. It translates any non-split
logical node with multiple outputs into a split - split output combination. This is what we
need to put the splits for the multi-query optimization in place. Right now, however, the
parser is set up in a way that multiple stores will never end up in the same plan and thus
the insertion doesn't happen for this case. 

In short: Once we change the parser to look at the entire graph for a script instead of a
store-by-store basis, we will get this for free. We might actually have to add logic to suppress
this behavior in cases where the split will be slower than the dual processing.

==== Store/Multiple output (Phase 2) ====
If we put implicit splits in place and enhance splits to contain additional operators or even
multiplex the split output within the same map reduce job, at some point either a map or a
reduce job need to be able to produce multiple outputs. Currently there is a single output
collector that will store the results in part-* files.

Here are some options:
===== hadoop 0.19 supports MultipleOutput =====
Link: http://hadoop.apache.org/core/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html#addNamedOutput(org.apache.hadoop.mapred.JobConf,%20java.lang.String,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class)

All the output will still be in the same directory, but the developer can give name for different
sets of output data. So, in our case we might name the output "split1" and "split2" and the
output would come out to be:


===== Side-Effect files =====
Link: http://hadoop.apache.org/core/docs/current/mapred_tutorial.html#Task+Side-Effect+Files

===== Store operator (Phase 2) =====
In the current system the store op is removed by JobControlCompiler and used to set up the
output directory, etc. It holds information about how and where to store the files, but doesn't
actually do anything in the pipeline. The logic for storing records in the map-reduce case,
is handled by the map only and map reduce classes. The logic is simple: Whatever comes out
of the pipeline will be transformed into a key/value pair collected using the output collector.

Using multiple output it would make sense to let the store operator do the actual collection.
The job compiler would have the duty to configure multiple output streams and then assign
the right collectors to the store operator. The actual mapper/reducers will still have the
responsibility to run the pipeline and check for errors but the storing is handled by the
store operator. Anything that is not stored and trickles out at the bottom of the pipeline
goes into the standard collector. After the job is run we will have to move the files to the
right directories.

It seems to make more sense to use hadoop's multiple output functionality. Trying to build
the same functionality with side-effect files will duplicate efforts made in hadoop 19. However,
that way we might have to provide modes to run the queries differently depending on the version
of hadoop.

==== Split operator (Phase 2) ====

The goal is to make the split operator non-blocking. Instead of dumping a split immediately
to disk we'll try to keep on going as long as possible. So the split operator would return
the same record for as many times as there are children. This leaves you with multiple branches
of the operator tree in the same map or reduce stage. These are going to be realized as nested
plans inside split.

===== Multiplex/Demultiplex (Phase 3) =====

The multiplex operation will serialize different nested plans of the split into the same output
stream. If there are multiple distinct output streams it will add a key to distinguish between
those. If all but one of the branches are terminated by a store, it will simple stream the
single remaining one to the output.

Demuxing is done by the split operator followed by a special filter. The filter will only
accept tuples for the particular pipeline.

In phase 2, we'll simply terminate all but one split branch and store it into a tmp dir.

===== Combiner/Reducer (Phase 3) =====

Multiplexing different branches into one stream allows us to run a combiner on the result
- reducing the amount of data that will written. Otherwise we would have to dump everything
in a map only job and then start one or more MR jobs to pick up the pieces.

The current plan is to split the byte used for the join key between splits and joins. That
leaves a nibble for each and reduces the number of joins and splits to 16.

===== MRCompiler (Phase 2 and 3) =====
The MR Compiler right now looks for splits, terminates the MR job at that point and connects
the remaining operators via load and store.

We'll add a new optimizer pass to look for these split scenarios. This gives us the ability
to use the combiner plan information to make the determination of multipexing or not (Phase
3) and also allows us more easily to switch back to the old style handling, if multiple outputs
are not available.

===== Parallelism (Phase 3) =====

If we multiplex outputs from different split branches we have to decide what to do with the
requested parallelism: Max, sum or average?

==== Diamond problem (Phase 3) ====
What happens when different split plans come back together?

Should come for free. Need to make sure unions can handle multiple split branches.

View raw message