crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Execution Control
Date Thu, 13 Feb 2014 17:50:24 GMT
The only option I have for you in that case is pipeline.run or
pipeline.done; LoadIncrementalHFiles isn't Crunch code, so we can't
incorporate it into the planner's decision making process. Does
LoadIncrementalHFiles even run an MR job?


On Thu, Feb 13, 2014 at 9:27 AM, Jinal Shah <jinalshah2007@gmail.com> wrote:

> Hi Josh,
> I tried the option you said and it worked perfectly fine where I'm doing
> parallelDo. But for HBase I'm using
> HFileUtils.writeToHFilesForIncrementalLoad() to write and then Reading
> using  LoadIncrementalHFiles class. So how do I tell the planner in this
> case to run in Sequential format. Here is how the code looks like
>
> HFileUtils.writeToHFilesForIncrementalLoad(PCollection<keyValue>, table,
> path)
> pipeline.run()
>
> LoadIncrementalHFiles loadIncremental = new LoadIncrementalHFiles(config);
>
> loadIncremental.doBulkLoad(path, table);
>
> Thanks
> Jinal
>
>
> On Wed, Feb 12, 2014 at 11:27 AM, Josh Wills <jwills@cloudera.com> wrote:
>
> > I'm not sure what you want here-- there is a mechanism to force the
> planner
> > to run stages sequentially (even if the input to stage 2 does not
> directly
> > depend directly on the output from stage 1) by using ParallelDoOptions to
> > introduce such a dependency, as I indicated before:
> >
> > SourceTarget marker = ...;
> > HBase.read()
> > doSomeChangesOnData
> > dummyDoFnToCreateMarker
> > HBase.write()
> > marker.write()
> > HBase.read().parallelDo(DoFn, PType,
> > ParallelDoOptions.builder().sourceTarget(marker).build());
> >
> > That's less of a hint to the planner and more of a command. Another
> option
> > would be to set the maximum number of simultaneously running jobs in
> Crunch
> > to 1 using the crunch.max.running.jobs configuration parameter, which
> > would
> > run everything in the pipeline sequentially, one job at a time.
> >
> >
> >
> > On Wed, Feb 12, 2014 at 9:15 AM, Jinal Shah <jinalshah2007@gmail.com>
> > wrote:
> >
> > > Can I get some comment on this?
> > >
> > >
> > > On Thu, Feb 6, 2014 at 11:00 AM, Jinal Shah <jinalshah2007@gmail.com>
> > > wrote:
> > >
> > > > Hi Josh and Micah,
> > > >
> > > > In both the scenerios I can easily do a Pipeline.run() and get going
> > from
> > > > there. But my main question would be why should I do a pipeline.run()
> > in
> > > > between just to make the planner run something in a sequential format
> > > > rather than the way it would have planned otherwise. What I'm getting
> > at
> > > is
> > > > that there should some mechanism that will tell the Planner to do
> > > something
> > > > in a certain way to some extend like you can take example of Apache
> > Hive,
> > > > till 0.7 release Hive use to provide a mechanism called HINT which
> > would
> > > > tell the query planner to run something as indicated in the HINT
> rather
> > > > than the way it would have been otherwise. I know that you might say
> it
> > > > might not create optimized plan but at this point the consumer is
> more
> > > > focused on the way it should be planned rather than the optimization.
> > > >
> > > > May be there might be option already there in Crunch that I might
> have
> > > not
> > > > explored but just wanted to put my point out there. If there is an
> > > option I
> > > > would love to learn about it.
> > > >
> > > >
> > > > On Thu, Feb 6, 2014 at 10:44 AM, Josh Wills <josh.wills@gmail.com>
> > > wrote:
> > > >
> > > >> Hey Jinal,
> > > >>
> > > >> On scenario 2, the easiest way to do this is to force a run()
> between
> > > the
> > > >> write and the second read, ala:
> > > >>
> > > >> HBase.read()
> > > >> doSomeChangesOnData
> > > >> HBase.write()
> > > >> Pipeline.run()
> > > >> HBase.read()
> > > >>
> > > >> If that isn't possible for some reason, you'll need to add an output
> > > file
> > > >> to the first phase that can be used to indicate that the HBase.write
> > is
> > > >> complete, and then have the second read depend on that file existing
> > > >> before
> > > >> it can run, which can be done via ParallelDoOptions, e.g.,
> > > >>
> > > >> SourceTarget marker = ...;
> > > >> HBase.read()
> > > >> doSomeChangesOnData
> > > >> dummyDoFnToCreateMarker
> > > >> HBase.write()
> > > >> marker.write()
> > > >> HBase.read().parallelDo(DoFn, PType,
> > > >> ParallelDoOptions.builder().sourceTarget(marker).build());
> > > >>
> > > >> but that's obviously uglier and more complicated.
> > > >>
> > > >> J
> > > >>
> > > >>
> > > >> On Wed, Feb 5, 2014 at 7:14 PM, Jinal Shah <jinalshah2007@gmail.com
> >
> > > >> wrote:
> > > >>
> > > >> > Hi Josh,
> > > >> >
> > > >> > Here is a small example of what I am looking for. So here is
what
> > I'm
> > > >> doing
> > > >> >
> > > >> > Scenario 1:
> > > >> >
> > > >> > PCollection<Something> s = FunctionDoingSomething();
> > > >> > pipeline.write(s, path);
> > > >> > doSomeFilteringOn(s);
> > > >> >
> > > >> > I want that when I do some filtering this should be done in the
> map
> > > >> phase
> > > >> > instead it is doing it in the Reduce phase due to which I have
to
> > > >> introduce
> > > >> > a pipeline.run() and now this is what the code looks like
> > > >> >
> > > >> > PCollection<Something> s = FunctionDoingSomething();
> > > >> > pipeline.write(s, path);
> > > >> > pipeline.run()
> > > >> > doSomeFilteringOn(s);
> > > >> >
> > > >> > Scenerio 2:
> > > >> >
> > > >> > I'm doing an operation on HBase and here is how it looks.
> > > >> >
> > > >> > Hbase.read()
> > > >> > doSomeChangesOnData
> > > >> > HBase.write()
> > > >> > HBase.read()
> > > >> >
> > > >> > Now Crunch at this points considers both the reads as separate
and
> > > >> tries to
> > > >> > run it in parallel so now before I even write my changes it reads
> > > those
> > > >> > changes so I have to again put a pipeline.run() in order to break
> it
> > > >> into 2
> > > >> > separate flow and execute them in sequence.
> > > >> >
> > > >> > So I'm asking is there any way to send an HINT to the Planner
that
> > how
> > > >> it
> > > >> > create the Plan instead of it deciding by itself or someway to
> have
> > > more
> > > >> > control how to make a planner understand in certain situations.
> > > >> >
> > > >> > Thanks
> > > >> > Jinal
> > > >> >
> > > >> >
> > > >> > On Thu, Jan 30, 2014 at 11:10 AM, Josh Wills <jwills@cloudera.com
> >
> > > >> wrote:
> > > >> >
> > > >> > > On Thu, Jan 30, 2014 at 7:09 AM, Jinal Shah <
> > > jinalshah2007@gmail.com>
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hi everyone,
> > > >> > > >
> > > >> > > > This is Jinal Shah, I'm new to the group. I had a question
> about
> > > >> > > Execution
> > > >> > > > Control in Crunch. Is there any way we can force Crunch
to do
> > > >> certain
> > > >> > > > operations in parallel or certain operations in sequential
> ways.
> > > For
> > > >> > > > example, let's say if we want the pipeline to executed
a
> > > particular
> > > >> > DoFn
> > > >> > > > function in the Map phase instead of the Reduce phase
or
> > > >> vice-versa. Or
> > > >> > > > Execute a particular Flow only after a particular flow
is
> > > completed
> > > >> as
> > > >> > > > oppose to running it in parallel.
> > > >> > > >
> > > >> > >
> > > >> > > Forcing a DoFn to operate in a map or reduce phase is tough
for
> > the
> > > >> > planner
> > > >> > > to do right now; we sort of rely on the developer to have
a
> mental
> > > >> model
> > > >> > of
> > > >> > > how the jobs will proceed. The place where you usually want
to
> > > force a
> > > >> > DoFn
> > > >> > > to execute in the reduce vs. the map phase is when you have
> > > dependent
> > > >> > > groupByKey operations, and you can use cache() or materialize()
> on
> > > the
> > > >> > > intermediate output that you want to split on, and the planner
> > will
> > > >> > respect
> > > >> > > that.
> > > >> > >
> > > >> > > On the latter question, the thing to look for is
> > > >> > > org.apache.crunch.ParallelDoOptions, which isn't something
I've
> > > doc'd
> > > >> in
> > > >> > > the user guide yet (it's on the todo list, I promise.) You
can
> > give
> > > a
> > > >> > > parallelDo call an additional argument that specifies one
or
> more
> > > >> > > SourceTargets that have to exist before a particular DoFn
is
> > allowed
> > > >> to
> > > >> > > run. In this way, you can force aspects of the pipeline
to be
> > > >> sequential
> > > >> > > instead of parallel. We make use of ParallelDoOptions inside
of
> > the
> > > >> > > MapsideJoinStrategy code, to ensure that the data set that
we'll
> > be
> > > >> > loading
> > > >> > > in-memory actually exists in the file system before we run
the
> > code
> > > >> that
> > > >> > > reads it into memory.
> > > >> > >
> > > >> > >
> > > >> > > >
> > > >> > > > Maybe this might be asked before so sorry if it came
again. If
> > you
> > > >> guys
> > > >> > > > have further question on the details do let me know
> > > >> > > >
> > > >> > > >
> > > >> > > > Thanks everyone and Have a great day.
> > > >> > > >
> > > >> > > > Thanks
> > > >> > > > Jinal
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > > Director of Data Science
> > > >> > > Cloudera <http://www.cloudera.com>
> > > >> > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > Director of Data Science
> > Cloudera <http://www.cloudera.com>
> > Twitter: @josh_wills <http://twitter.com/josh_wills>
> >
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message