crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chao Shi <stepi...@live.com>
Subject Re: Execution Control
Date Fri, 14 Feb 2014 03:36:42 GMT
Hi Jinal,

> So how do I tell the planner in this
case to run in Sequential format. Here is how the code looks like
I didn't get your question. Did you mean "sequence file" format? HBase
cannot load sequence file natively. You have to transform it into HFile via
MR, then bulk load it into HBase.

The code piece you show looks good to me. One potential problem is that,
running MR on the same cluster with HBase will damage the serving quality
(i.e. causing greater latency).

In our production deployment, we have two clusters: one running HBase and
serving online requests and the other for offline MR jobs. So we run a
pipeline to produce HFiles, distcp them to the online cluster (with limited
#mappers), then perform HBase bulkload.

> Is there a way to do it Crunch itself to do incremental reads from HFiles
which are stored in hdfs?

No, there is no way to incrementally read HFiles. I think you have options:
1) use FromHBase#table(String, Scan) and specify the timestamp range that
you are interested. Note that this will issue read RPCs to Region Servers,
which may produce huge traffic and damage your online serving quality.
2) copy HFiles to another location and use HFileUtils#scanHFiles(). You can
also specify the timerange but it internally does a full scan.





2014-02-14 2:30 GMT+08:00 Jinal Shah <jinalshah2007@gmail.com>:

> Is there a way to do it Crunch itself to do incremental reads from HFiles
> which are stored in hdfs?
>
> Thanks
>
>
> On Thu, Feb 13, 2014 at 11:50 AM, Josh Wills <jwills@cloudera.com> wrote:
>
> > The only option I have for you in that case is pipeline.run or
> > pipeline.done; LoadIncrementalHFiles isn't Crunch code, so we can't
> > incorporate it into the planner's decision making process. Does
> > LoadIncrementalHFiles even run an MR job?
> >
> >
> > On Thu, Feb 13, 2014 at 9:27 AM, Jinal Shah <jinalshah2007@gmail.com>
> > wrote:
> >
> > > Hi Josh,
> > > I tried the option you said and it worked perfectly fine where I'm
> doing
> > > parallelDo. But for HBase I'm using
> > > HFileUtils.writeToHFilesForIncrementalLoad() to write and then Reading
> > > using  LoadIncrementalHFiles class. So how do I tell the planner in
> this
> > > case to run in Sequential format. Here is how the code looks like
> > >
> > > HFileUtils.writeToHFilesForIncrementalLoad(PCollection<keyValue>,
> table,
> > > path)
> > > pipeline.run()
> > >
> > > LoadIncrementalHFiles loadIncremental = new
> > LoadIncrementalHFiles(config);
> > >
> > > loadIncremental.doBulkLoad(path, table);
> > >
> > > Thanks
> > > Jinal
> > >
> > >
> > > On Wed, Feb 12, 2014 at 11:27 AM, Josh Wills <jwills@cloudera.com>
> > wrote:
> > >
> > > > I'm not sure what you want here-- there is a mechanism to force the
> > > planner
> > > > to run stages sequentially (even if the input to stage 2 does not
> > > directly
> > > > depend directly on the output from stage 1) by using
> ParallelDoOptions
> > to
> > > > introduce such a dependency, as I indicated before:
> > > >
> > > > SourceTarget marker = ...;
> > > > HBase.read()
> > > > doSomeChangesOnData
> > > > dummyDoFnToCreateMarker
> > > > HBase.write()
> > > > marker.write()
> > > > HBase.read().parallelDo(DoFn, PType,
> > > > ParallelDoOptions.builder().sourceTarget(marker).build());
> > > >
> > > > That's less of a hint to the planner and more of a command. Another
> > > option
> > > > would be to set the maximum number of simultaneously running jobs in
> > > Crunch
> > > > to 1 using the crunch.max.running.jobs configuration parameter,
> which
> > > > would
> > > > run everything in the pipeline sequentially, one job at a time.
> > > >
> > > >
> > > >
> > > > On Wed, Feb 12, 2014 at 9:15 AM, Jinal Shah <jinalshah2007@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Can I get some comment on this?
> > > > >
> > > > >
> > > > > On Thu, Feb 6, 2014 at 11:00 AM, Jinal Shah <
> jinalshah2007@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi Josh and Micah,
> > > > > >
> > > > > > In both the scenerios I can easily do a Pipeline.run() and get
> > going
> > > > from
> > > > > > there. But my main question would be why should I do a
> > pipeline.run()
> > > > in
> > > > > > between just to make the planner run something in a sequential
> > format
> > > > > > rather than the way it would have planned otherwise. What I'm
> > getting
> > > > at
> > > > > is
> > > > > > that there should some mechanism that will tell the Planner
to do
> > > > > something
> > > > > > in a certain way to some extend like you can take example of
> Apache
> > > > Hive,
> > > > > > till 0.7 release Hive use to provide a mechanism called HINT
> which
> > > > would
> > > > > > tell the query planner to run something as indicated in the
HINT
> > > rather
> > > > > > than the way it would have been otherwise. I know that you might
> > say
> > > it
> > > > > > might not create optimized plan but at this point the consumer
is
> > > more
> > > > > > focused on the way it should be planned rather than the
> > optimization.
> > > > > >
> > > > > > May be there might be option already there in Crunch that I
might
> > > have
> > > > > not
> > > > > > explored but just wanted to put my point out there. If there
is
> an
> > > > > option I
> > > > > > would love to learn about it.
> > > > > >
> > > > > >
> > > > > > On Thu, Feb 6, 2014 at 10:44 AM, Josh Wills <
> josh.wills@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Hey Jinal,
> > > > > >>
> > > > > >> On scenario 2, the easiest way to do this is to force a
run()
> > > between
> > > > > the
> > > > > >> write and the second read, ala:
> > > > > >>
> > > > > >> HBase.read()
> > > > > >> doSomeChangesOnData
> > > > > >> HBase.write()
> > > > > >> Pipeline.run()
> > > > > >> HBase.read()
> > > > > >>
> > > > > >> If that isn't possible for some reason, you'll need to add
an
> > output
> > > > > file
> > > > > >> to the first phase that can be used to indicate that the
> > HBase.write
> > > > is
> > > > > >> complete, and then have the second read depend on that file
> > existing
> > > > > >> before
> > > > > >> it can run, which can be done via ParallelDoOptions, e.g.,
> > > > > >>
> > > > > >> SourceTarget marker = ...;
> > > > > >> HBase.read()
> > > > > >> doSomeChangesOnData
> > > > > >> dummyDoFnToCreateMarker
> > > > > >> HBase.write()
> > > > > >> marker.write()
> > > > > >> HBase.read().parallelDo(DoFn, PType,
> > > > > >> ParallelDoOptions.builder().sourceTarget(marker).build());
> > > > > >>
> > > > > >> but that's obviously uglier and more complicated.
> > > > > >>
> > > > > >> J
> > > > > >>
> > > > > >>
> > > > > >> On Wed, Feb 5, 2014 at 7:14 PM, Jinal Shah <
> > jinalshah2007@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hi Josh,
> > > > > >> >
> > > > > >> > Here is a small example of what I am looking for. So
here is
> > what
> > > > I'm
> > > > > >> doing
> > > > > >> >
> > > > > >> > Scenario 1:
> > > > > >> >
> > > > > >> > PCollection<Something> s = FunctionDoingSomething();
> > > > > >> > pipeline.write(s, path);
> > > > > >> > doSomeFilteringOn(s);
> > > > > >> >
> > > > > >> > I want that when I do some filtering this should be
done in
> the
> > > map
> > > > > >> phase
> > > > > >> > instead it is doing it in the Reduce phase due to which
I have
> > to
> > > > > >> introduce
> > > > > >> > a pipeline.run() and now this is what the code looks
like
> > > > > >> >
> > > > > >> > PCollection<Something> s = FunctionDoingSomething();
> > > > > >> > pipeline.write(s, path);
> > > > > >> > pipeline.run()
> > > > > >> > doSomeFilteringOn(s);
> > > > > >> >
> > > > > >> > Scenerio 2:
> > > > > >> >
> > > > > >> > I'm doing an operation on HBase and here is how it
looks.
> > > > > >> >
> > > > > >> > Hbase.read()
> > > > > >> > doSomeChangesOnData
> > > > > >> > HBase.write()
> > > > > >> > HBase.read()
> > > > > >> >
> > > > > >> > Now Crunch at this points considers both the reads
as separate
> > and
> > > > > >> tries to
> > > > > >> > run it in parallel so now before I even write my changes
it
> > reads
> > > > > those
> > > > > >> > changes so I have to again put a pipeline.run() in
order to
> > break
> > > it
> > > > > >> into 2
> > > > > >> > separate flow and execute them in sequence.
> > > > > >> >
> > > > > >> > So I'm asking is there any way to send an HINT to the
Planner
> > that
> > > > how
> > > > > >> it
> > > > > >> > create the Plan instead of it deciding by itself or
someway to
> > > have
> > > > > more
> > > > > >> > control how to make a planner understand in certain
> situations.
> > > > > >> >
> > > > > >> > Thanks
> > > > > >> > Jinal
> > > > > >> >
> > > > > >> >
> > > > > >> > On Thu, Jan 30, 2014 at 11:10 AM, Josh Wills <
> > jwills@cloudera.com
> > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > On Thu, Jan 30, 2014 at 7:09 AM, Jinal Shah <
> > > > > jinalshah2007@gmail.com>
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > Hi everyone,
> > > > > >> > > >
> > > > > >> > > > This is Jinal Shah, I'm new to the group.
I had a question
> > > about
> > > > > >> > > Execution
> > > > > >> > > > Control in Crunch. Is there any way we can
force Crunch to
> > do
> > > > > >> certain
> > > > > >> > > > operations in parallel or certain operations
in sequential
> > > ways.
> > > > > For
> > > > > >> > > > example, let's say if we want the pipeline
to executed a
> > > > > particular
> > > > > >> > DoFn
> > > > > >> > > > function in the Map phase instead of the
Reduce phase or
> > > > > >> vice-versa. Or
> > > > > >> > > > Execute a particular Flow only after a particular
flow is
> > > > > completed
> > > > > >> as
> > > > > >> > > > oppose to running it in parallel.
> > > > > >> > > >
> > > > > >> > >
> > > > > >> > > Forcing a DoFn to operate in a map or reduce phase
is tough
> > for
> > > > the
> > > > > >> > planner
> > > > > >> > > to do right now; we sort of rely on the developer
to have a
> > > mental
> > > > > >> model
> > > > > >> > of
> > > > > >> > > how the jobs will proceed. The place where you
usually want
> to
> > > > > force a
> > > > > >> > DoFn
> > > > > >> > > to execute in the reduce vs. the map phase is
when you have
> > > > > dependent
> > > > > >> > > groupByKey operations, and you can use cache()
or
> > materialize()
> > > on
> > > > > the
> > > > > >> > > intermediate output that you want to split on,
and the
> planner
> > > > will
> > > > > >> > respect
> > > > > >> > > that.
> > > > > >> > >
> > > > > >> > > On the latter question, the thing to look for
is
> > > > > >> > > org.apache.crunch.ParallelDoOptions, which isn't
something
> > I've
> > > > > doc'd
> > > > > >> in
> > > > > >> > > the user guide yet (it's on the todo list, I promise.)
You
> can
> > > > give
> > > > > a
> > > > > >> > > parallelDo call an additional argument that specifies
one or
> > > more
> > > > > >> > > SourceTargets that have to exist before a particular
DoFn is
> > > > allowed
> > > > > >> to
> > > > > >> > > run. In this way, you can force aspects of the
pipeline to
> be
> > > > > >> sequential
> > > > > >> > > instead of parallel. We make use of ParallelDoOptions
inside
> > of
> > > > the
> > > > > >> > > MapsideJoinStrategy code, to ensure that the data
set that
> > we'll
> > > > be
> > > > > >> > loading
> > > > > >> > > in-memory actually exists in the file system before
we run
> the
> > > > code
> > > > > >> that
> > > > > >> > > reads it into memory.
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > >
> > > > > >> > > > Maybe this might be asked before so sorry
if it came
> again.
> > If
> > > > you
> > > > > >> guys
> > > > > >> > > > have further question on the details do let
me know
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > Thanks everyone and Have a great day.
> > > > > >> > > >
> > > > > >> > > > Thanks
> > > > > >> > > > Jinal
> > > > > >> > > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > --
> > > > > >> > > Director of Data Science
> > > > > >> > > Cloudera <http://www.cloudera.com>
> > > > > >> > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Director of Data Science
> > > > Cloudera <http://www.cloudera.com>
> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > >
> > >
> >
> >
> >
> > --
> > Director of Data Science
> > Cloudera <http://www.cloudera.com>
> > Twitter: @josh_wills <http://twitter.com/josh_wills>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message