crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chao Shi <stepi...@live.com>
Subject Re: Execution Control
Date Sun, 16 Feb 2014 14:30:04 GMT
Hi Jinal,

You can set crunch.max.running.jobs to 1 to force all independent jobs to
be executed sequentially. The default value is 5, which means there will be
at most 5 concurrently running job per pipeline instance.

There is no automatic way to  bulk load data to HBase. Crunch only produces
HFiles and you will have to call LoadIncrementalHFiles#doBulkLoad after the
pipeline completes. HFileTargetIT [1] is an example.

[1]
https://github.com/apache/crunch/blob/master/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java


2014-02-15 3:47 GMT+08:00 Jinal Shah <jinalshah2007@gmail.com>:

> Hi Chao,
>
> Thanks for the reply. By Sequential I mean the run jobs in Sequence rather
> than in Parallel. Is there a way in Crunch to bulk load data to HBase
> directly?
>
> Thanks
> Jinal
>
>
>
>
>
> On Thu, Feb 13, 2014 at 9:36 PM, Chao Shi <stepinto@live.com> wrote:
>
> > Hi Jinal,
> >
> > > So how do I tell the planner in this
> > case to run in Sequential format. Here is how the code looks like
> > I didn't get your question. Did you mean "sequence file" format? HBase
> > cannot load sequence file natively. You have to transform it into HFile
> via
> > MR, then bulk load it into HBase.
> >
> > The code piece you show looks good to me. One potential problem is that,
> > running MR on the same cluster with HBase will damage the serving quality
> > (i.e. causing greater latency).
> >
> > In our production deployment, we have two clusters: one running HBase and
> > serving online requests and the other for offline MR jobs. So we run a
> > pipeline to produce HFiles, distcp them to the online cluster (with
> limited
> > #mappers), then perform HBase bulkload.
> >
> > > Is there a way to do it Crunch itself to do incremental reads from
> HFiles
> > which are stored in hdfs?
> >
> > No, there is no way to incrementally read HFiles. I think you have
> options:
> > 1) use FromHBase#table(String, Scan) and specify the timestamp range that
> > you are interested. Note that this will issue read RPCs to Region
> Servers,
> > which may produce huge traffic and damage your online serving quality.
> > 2) copy HFiles to another location and use HFileUtils#scanHFiles(). You
> can
> > also specify the timerange but it internally does a full scan.
> >
> >
> >
> >
> >
> > 2014-02-14 2:30 GMT+08:00 Jinal Shah <jinalshah2007@gmail.com>:
> >
> > > Is there a way to do it Crunch itself to do incremental reads from
> HFiles
> > > which are stored in hdfs?
> > >
> > > Thanks
> > >
> > >
> > > On Thu, Feb 13, 2014 at 11:50 AM, Josh Wills <jwills@cloudera.com>
> > wrote:
> > >
> > > > The only option I have for you in that case is pipeline.run or
> > > > pipeline.done; LoadIncrementalHFiles isn't Crunch code, so we can't
> > > > incorporate it into the planner's decision making process. Does
> > > > LoadIncrementalHFiles even run an MR job?
> > > >
> > > >
> > > > On Thu, Feb 13, 2014 at 9:27 AM, Jinal Shah <jinalshah2007@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi Josh,
> > > > > I tried the option you said and it worked perfectly fine where I'm
> > > doing
> > > > > parallelDo. But for HBase I'm using
> > > > > HFileUtils.writeToHFilesForIncrementalLoad() to write and then
> > Reading
> > > > > using  LoadIncrementalHFiles class. So how do I tell the planner
in
> > > this
> > > > > case to run in Sequential format. Here is how the code looks like
> > > > >
> > > > > HFileUtils.writeToHFilesForIncrementalLoad(PCollection<keyValue>,
> > > table,
> > > > > path)
> > > > > pipeline.run()
> > > > >
> > > > > LoadIncrementalHFiles loadIncremental = new
> > > > LoadIncrementalHFiles(config);
> > > > >
> > > > > loadIncremental.doBulkLoad(path, table);
> > > > >
> > > > > Thanks
> > > > > Jinal
> > > > >
> > > > >
> > > > > On Wed, Feb 12, 2014 at 11:27 AM, Josh Wills <jwills@cloudera.com>
> > > > wrote:
> > > > >
> > > > > > I'm not sure what you want here-- there is a mechanism to force
> the
> > > > > planner
> > > > > > to run stages sequentially (even if the input to stage 2 does
not
> > > > > directly
> > > > > > depend directly on the output from stage 1) by using
> > > ParallelDoOptions
> > > > to
> > > > > > introduce such a dependency, as I indicated before:
> > > > > >
> > > > > > SourceTarget marker = ...;
> > > > > > HBase.read()
> > > > > > doSomeChangesOnData
> > > > > > dummyDoFnToCreateMarker
> > > > > > HBase.write()
> > > > > > marker.write()
> > > > > > HBase.read().parallelDo(DoFn, PType,
> > > > > > ParallelDoOptions.builder().sourceTarget(marker).build());
> > > > > >
> > > > > > That's less of a hint to the planner and more of a command.
> Another
> > > > > option
> > > > > > would be to set the maximum number of simultaneously running
jobs
> > in
> > > > > Crunch
> > > > > > to 1 using the crunch.max.running.jobs configuration parameter,
> > > which
> > > > > > would
> > > > > > run everything in the pipeline sequentially, one job at a time.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 12, 2014 at 9:15 AM, Jinal Shah <
> > jinalshah2007@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Can I get some comment on this?
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Feb 6, 2014 at 11:00 AM, Jinal Shah <
> > > jinalshah2007@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Josh and Micah,
> > > > > > > >
> > > > > > > > In both the scenerios I can easily do a Pipeline.run()
and
> get
> > > > going
> > > > > > from
> > > > > > > > there. But my main question would be why should I
do a
> > > > pipeline.run()
> > > > > > in
> > > > > > > > between just to make the planner run something in
a
> sequential
> > > > format
> > > > > > > > rather than the way it would have planned otherwise.
What I'm
> > > > getting
> > > > > > at
> > > > > > > is
> > > > > > > > that there should some mechanism that will tell the
Planner
> to
> > do
> > > > > > > something
> > > > > > > > in a certain way to some extend like you can take
example of
> > > Apache
> > > > > > Hive,
> > > > > > > > till 0.7 release Hive use to provide a mechanism called
HINT
> > > which
> > > > > > would
> > > > > > > > tell the query planner to run something as indicated
in the
> > HINT
> > > > > rather
> > > > > > > > than the way it would have been otherwise. I know
that you
> > might
> > > > say
> > > > > it
> > > > > > > > might not create optimized plan but at this point
the
> consumer
> > is
> > > > > more
> > > > > > > > focused on the way it should be planned rather than
the
> > > > optimization.
> > > > > > > >
> > > > > > > > May be there might be option already there in Crunch
that I
> > might
> > > > > have
> > > > > > > not
> > > > > > > > explored but just wanted to put my point out there.
If there
> is
> > > an
> > > > > > > option I
> > > > > > > > would love to learn about it.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Feb 6, 2014 at 10:44 AM, Josh Wills <
> > > josh.wills@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hey Jinal,
> > > > > > > >>
> > > > > > > >> On scenario 2, the easiest way to do this is to
force a
> run()
> > > > > between
> > > > > > > the
> > > > > > > >> write and the second read, ala:
> > > > > > > >>
> > > > > > > >> HBase.read()
> > > > > > > >> doSomeChangesOnData
> > > > > > > >> HBase.write()
> > > > > > > >> Pipeline.run()
> > > > > > > >> HBase.read()
> > > > > > > >>
> > > > > > > >> If that isn't possible for some reason, you'll
need to add
> an
> > > > output
> > > > > > > file
> > > > > > > >> to the first phase that can be used to indicate
that the
> > > > HBase.write
> > > > > > is
> > > > > > > >> complete, and then have the second read depend
on that file
> > > > existing
> > > > > > > >> before
> > > > > > > >> it can run, which can be done via ParallelDoOptions,
e.g.,
> > > > > > > >>
> > > > > > > >> SourceTarget marker = ...;
> > > > > > > >> HBase.read()
> > > > > > > >> doSomeChangesOnData
> > > > > > > >> dummyDoFnToCreateMarker
> > > > > > > >> HBase.write()
> > > > > > > >> marker.write()
> > > > > > > >> HBase.read().parallelDo(DoFn, PType,
> > > > > > > >> ParallelDoOptions.builder().sourceTarget(marker).build());
> > > > > > > >>
> > > > > > > >> but that's obviously uglier and more complicated.
> > > > > > > >>
> > > > > > > >> J
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Wed, Feb 5, 2014 at 7:14 PM, Jinal Shah <
> > > > jinalshah2007@gmail.com
> > > > > >
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> > Hi Josh,
> > > > > > > >> >
> > > > > > > >> > Here is a small example of what I am looking
for. So here
> is
> > > > what
> > > > > > I'm
> > > > > > > >> doing
> > > > > > > >> >
> > > > > > > >> > Scenario 1:
> > > > > > > >> >
> > > > > > > >> > PCollection<Something> s = FunctionDoingSomething();
> > > > > > > >> > pipeline.write(s, path);
> > > > > > > >> > doSomeFilteringOn(s);
> > > > > > > >> >
> > > > > > > >> > I want that when I do some filtering this
should be done
> in
> > > the
> > > > > map
> > > > > > > >> phase
> > > > > > > >> > instead it is doing it in the Reduce phase
due to which I
> > have
> > > > to
> > > > > > > >> introduce
> > > > > > > >> > a pipeline.run() and now this is what the
code looks like
> > > > > > > >> >
> > > > > > > >> > PCollection<Something> s = FunctionDoingSomething();
> > > > > > > >> > pipeline.write(s, path);
> > > > > > > >> > pipeline.run()
> > > > > > > >> > doSomeFilteringOn(s);
> > > > > > > >> >
> > > > > > > >> > Scenerio 2:
> > > > > > > >> >
> > > > > > > >> > I'm doing an operation on HBase and here
is how it looks.
> > > > > > > >> >
> > > > > > > >> > Hbase.read()
> > > > > > > >> > doSomeChangesOnData
> > > > > > > >> > HBase.write()
> > > > > > > >> > HBase.read()
> > > > > > > >> >
> > > > > > > >> > Now Crunch at this points considers both
the reads as
> > separate
> > > > and
> > > > > > > >> tries to
> > > > > > > >> > run it in parallel so now before I even write
my changes
> it
> > > > reads
> > > > > > > those
> > > > > > > >> > changes so I have to again put a pipeline.run()
in order
> to
> > > > break
> > > > > it
> > > > > > > >> into 2
> > > > > > > >> > separate flow and execute them in sequence.
> > > > > > > >> >
> > > > > > > >> > So I'm asking is there any way to send an
HINT to the
> > Planner
> > > > that
> > > > > > how
> > > > > > > >> it
> > > > > > > >> > create the Plan instead of it deciding by
itself or
> someway
> > to
> > > > > have
> > > > > > > more
> > > > > > > >> > control how to make a planner understand
in certain
> > > situations.
> > > > > > > >> >
> > > > > > > >> > Thanks
> > > > > > > >> > Jinal
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Thu, Jan 30, 2014 at 11:10 AM, Josh Wills
<
> > > > jwills@cloudera.com
> > > > > >
> > > > > > > >> wrote:
> > > > > > > >> >
> > > > > > > >> > > On Thu, Jan 30, 2014 at 7:09 AM, Jinal
Shah <
> > > > > > > jinalshah2007@gmail.com>
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Hi everyone,
> > > > > > > >> > > >
> > > > > > > >> > > > This is Jinal Shah, I'm new to
the group. I had a
> > question
> > > > > about
> > > > > > > >> > > Execution
> > > > > > > >> > > > Control in Crunch. Is there any
way we can force
> Crunch
> > to
> > > > do
> > > > > > > >> certain
> > > > > > > >> > > > operations in parallel or certain
operations in
> > sequential
> > > > > ways.
> > > > > > > For
> > > > > > > >> > > > example, let's say if we want the
pipeline to
> executed a
> > > > > > > particular
> > > > > > > >> > DoFn
> > > > > > > >> > > > function in the Map phase instead
of the Reduce phase
> or
> > > > > > > >> vice-versa. Or
> > > > > > > >> > > > Execute a particular Flow only
after a particular flow
> > is
> > > > > > > completed
> > > > > > > >> as
> > > > > > > >> > > > oppose to running it in parallel.
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> > > Forcing a DoFn to operate in a map or
reduce phase is
> > tough
> > > > for
> > > > > > the
> > > > > > > >> > planner
> > > > > > > >> > > to do right now; we sort of rely on
the developer to
> have
> > a
> > > > > mental
> > > > > > > >> model
> > > > > > > >> > of
> > > > > > > >> > > how the jobs will proceed. The place
where you usually
> > want
> > > to
> > > > > > > force a
> > > > > > > >> > DoFn
> > > > > > > >> > > to execute in the reduce vs. the map
phase is when you
> > have
> > > > > > > dependent
> > > > > > > >> > > groupByKey operations, and you can use
cache() or
> > > > materialize()
> > > > > on
> > > > > > > the
> > > > > > > >> > > intermediate output that you want to
split on, and the
> > > planner
> > > > > > will
> > > > > > > >> > respect
> > > > > > > >> > > that.
> > > > > > > >> > >
> > > > > > > >> > > On the latter question, the thing to
look for is
> > > > > > > >> > > org.apache.crunch.ParallelDoOptions,
which isn't
> something
> > > > I've
> > > > > > > doc'd
> > > > > > > >> in
> > > > > > > >> > > the user guide yet (it's on the todo
list, I promise.)
> You
> > > can
> > > > > > give
> > > > > > > a
> > > > > > > >> > > parallelDo call an additional argument
that specifies
> one
> > or
> > > > > more
> > > > > > > >> > > SourceTargets that have to exist before
a particular
> DoFn
> > is
> > > > > > allowed
> > > > > > > >> to
> > > > > > > >> > > run. In this way, you can force aspects
of the pipeline
> to
> > > be
> > > > > > > >> sequential
> > > > > > > >> > > instead of parallel. We make use of
ParallelDoOptions
> > inside
> > > > of
> > > > > > the
> > > > > > > >> > > MapsideJoinStrategy code, to ensure
that the data set
> that
> > > > we'll
> > > > > > be
> > > > > > > >> > loading
> > > > > > > >> > > in-memory actually exists in the file
system before we
> run
> > > the
> > > > > > code
> > > > > > > >> that
> > > > > > > >> > > reads it into memory.
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > >
> > > > > > > >> > > > Maybe this might be asked before
so sorry if it came
> > > again.
> > > > If
> > > > > > you
> > > > > > > >> guys
> > > > > > > >> > > > have further question on the details
do let me know
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks everyone and Have a great
day.
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks
> > > > > > > >> > > > Jinal
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > --
> > > > > > > >> > > Director of Data Science
> > > > > > > >> > > Cloudera <http://www.cloudera.com>
> > > > > > > >> > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Director of Data Science
> > > > > > Cloudera <http://www.cloudera.com>
> > > > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Director of Data Science
> > > > Cloudera <http://www.cloudera.com>
> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message