crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Whitacre <mkw...@gmail.com>
Subject Re: Execution Control
Date Thu, 06 Feb 2014 03:46:06 GMT
Jinal,

In Scenario #1, typically when functions are applied either in the map or
reduce phase are a function of how the graph looks prior to that
processing.  So if you had a flow that looked like the following:

PCollection p1 = pipeline.read
filteredP1 = p1.filter(...)
pipeline.write(filteredP1)

This would all occur in the map phase.  If you however sprinkled in a
cogroup or GBK like the following:

PTable p1 = pipeline.read
pGBK = p1.groupByKey()
filteredP = pGBK.filter(...)
pipeline.write(filteredP)

In that case the filtering would be done in a reduce phase.  If you have
something like the above and wanted the filtering in the map you have the
following two options (though Josh might know of another):

1. Rewrite the Filter to be based off of the value before the operation
that forces a reduce (might not be possible if it doesn't provide enough
context)
2. Write out the pGBK to disk (or cache it).  The question in this case is
if that is really a performance gain as you are hoping that the speed up
from having more mappers will outweigh the
serialization/deserialization+JVM spin up time of shifting the data into a
mapper.

For Scenario #2...

A pipeline instance is essentially a DAG.  In your case you actually have a
cycle between the read -> process -> write -> read.  I'm not sure it'd make
sense to have that inside of a single pipeline and should possibly be
spread across two pipelines like you are essentially doing.  Also I'm
curious why you need to go back to read the table a second time.  You
should have two PCollections, the original table and your deltas to the
table, so couldn't you create a PCollection that represents the table
without performing another read?

This doesn't necessarily help answer your question about if there are
explicit hints to do what you are asking but more raises the question of if
there are other ways of designing your pipeline that doesn't requirer fine
grained control over how it is planned.



On Wed, Feb 5, 2014 at 9:14 PM, Jinal Shah <jinalshah2007@gmail.com> wrote:

> Hi Josh,
>
> Here is a small example of what I am looking for. So here is what I'm doing
>
> Scenario 1:
>
> PCollection<Something> s = FunctionDoingSomething();
> pipeline.write(s, path);
> doSomeFilteringOn(s);
>
> I want that when I do some filtering this should be done in the map phase
> instead it is doing it in the Reduce phase due to which I have to introduce
> a pipeline.run() and now this is what the code looks like
>
> PCollection<Something> s = FunctionDoingSomething();
> pipeline.write(s, path);
> pipeline.run()
> doSomeFilteringOn(s);
>
> Scenerio 2:
>
> I'm doing an operation on HBase and here is how it looks.
>
> Hbase.read()
> doSomeChangesOnData
> HBase.write()
> HBase.read()
>
> Now Crunch at this points considers both the reads as separate and tries to
> run it in parallel so now before I even write my changes it reads those
> changes so I have to again put a pipeline.run() in order to break it into 2
> separate flow and execute them in sequence.
>
> So I'm asking is there any way to send an HINT to the Planner that how it
> create the Plan instead of it deciding by itself or someway to have more
> control how to make a planner understand in certain situations.
>
> Thanks
> Jinal
>
>
> On Thu, Jan 30, 2014 at 11:10 AM, Josh Wills <jwills@cloudera.com> wrote:
>
> > On Thu, Jan 30, 2014 at 7:09 AM, Jinal Shah <jinalshah2007@gmail.com>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > This is Jinal Shah, I'm new to the group. I had a question about
> > Execution
> > > Control in Crunch. Is there any way we can force Crunch to do certain
> > > operations in parallel or certain operations in sequential ways. For
> > > example, let's say if we want the pipeline to executed a particular
> DoFn
> > > function in the Map phase instead of the Reduce phase or vice-versa. Or
> > > Execute a particular Flow only after a particular flow is completed as
> > > oppose to running it in parallel.
> > >
> >
> > Forcing a DoFn to operate in a map or reduce phase is tough for the
> planner
> > to do right now; we sort of rely on the developer to have a mental model
> of
> > how the jobs will proceed. The place where you usually want to force a
> DoFn
> > to execute in the reduce vs. the map phase is when you have dependent
> > groupByKey operations, and you can use cache() or materialize() on the
> > intermediate output that you want to split on, and the planner will
> respect
> > that.
> >
> > On the latter question, the thing to look for is
> > org.apache.crunch.ParallelDoOptions, which isn't something I've doc'd in
> > the user guide yet (it's on the todo list, I promise.) You can give a
> > parallelDo call an additional argument that specifies one or more
> > SourceTargets that have to exist before a particular DoFn is allowed to
> > run. In this way, you can force aspects of the pipeline to be sequential
> > instead of parallel. We make use of ParallelDoOptions inside of the
> > MapsideJoinStrategy code, to ensure that the data set that we'll be
> loading
> > in-memory actually exists in the file system before we run the code that
> > reads it into memory.
> >
> >
> > >
> > > Maybe this might be asked before so sorry if it came again. If you guys
> > > have further question on the details do let me know
> > >
> > >
> > > Thanks everyone and Have a great day.
> > >
> > > Thanks
> > > Jinal
> > >
> >
> >
> >
> > --
> > Director of Data Science
> > Cloudera <http://www.cloudera.com>
> > Twitter: @josh_wills <http://twitter.com/josh_wills>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message