crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <josh.wi...@gmail.com>
Subject Re: Illegal State Exception when doing a union
Date Thu, 27 Feb 2014 19:48:06 GMT
Yeah, but it will require changing code to do that; there isn't a way to do
it as currently implemented. My hypothesis would be that we would need to
modify Sources to check to see if they were SourceTargets that didn't exist
yet, figure out which job was writing them, and then add the sourceTarget
dependency automatically, and be able to do the size planning for the job
based on the estimated size of the PCollection(s) that were populating that
target. It's not obviously a trivial change (at least, it's not obvious to
me yet), and I wouldn't consider it a priority while Pipeline.run() exists
as a workaround.

J


On Thu, Feb 27, 2014 at 11:38 AM, Jinal Shah <jinalshah2007@gmail.com>wrote:

> Hey Josh, Is there no way of telling the planner when it is trying to do
> union or co-group or some operation where it is trying to find the size
> from the location we are reading from after doing a write for planning  to
> do a run till there if the source is something that needs to be generated
> through the processing prior to planning it ahead of time. May be I'm
> completely wrong but it was just a thought.
>
>
> On Wed, Feb 26, 2014 at 9:00 PM, Josh Wills <jwills@cloudera.com> wrote:
>
> > Hey Jinal,
> >
> > Been thinking about it off-and-on all day, and I don't have a better
> > solution right now than pipeline.run()...
> >
> > J
> >
> >
> > On Wed, Feb 26, 2014 at 6:46 PM, Jinal Shah <jinalshah2007@gmail.com>
> > wrote:
> >
> > > So Josh what do you think can be done?
> > >
> > >
> > > On Wed, Feb 26, 2014 at 10:37 AM, Jinal Shah <jinalshah2007@gmail.com
> > > >wrote:
> > >
> > > > As well as it is trying to run it in parallel so now it is failing on
> > > that.
> > > >
> > > >
> > > > On Wed, Feb 26, 2014 at 10:30 AM, Jinal Shah <
> jinalshah2007@gmail.com
> > > >wrote:
> > > >
> > > >> I did as you said but now it is running the DoFn twice since after
> > that
> > > >> parallel do I'm writing that output to HDFS so it divided that both
> > work
> > > >> into 2 once while storing the output it is running it in the reduce
> > > phase
> > > >> and then while doing the union it is running it in the map phase.
> > > >>
> > > >>
> > > >> On Tue, Feb 25, 2014 at 7:41 PM, Josh Wills <jwills@cloudera.com>
> > > wrote:
> > > >>
> > > >>> So my thought would be that if the DoFn in this step:
> > > >>>
> > > >>> beforeWrite.parallelDo(DoFn, U, ParallelDoOptions.builder().
> > > >>> sources(target).build());
> > > >>>
> > > >>> signaled that it was going to write a lot of data with a large
> > > >>> scaleFactor,
> > > >>> then the planner would use the output from beforeWrite as a
> > checkpoint,
> > > >>> and
> > > >>> save the DoFn processing for the map phase.
> > > >>>
> > > >>>
> > > >>> On Tue, Feb 25, 2014 at 5:08 PM, Jinal Shah <
> jinalshah2007@gmail.com
> > >
> > > >>> wrote:
> > > >>>
> > > >>> > Yup this is to avoid .run() ;-) . But I want the beforeWrite
> output
> > > to
> > > >>> be
> > > >>> > stored. So how do I apply the scaleFactor method and how
will
> help
> > to
> > > >>> make
> > > >>> > the DoFn for afterWrite run in Mapside.
> > > >>> >
> > > >>> >
> > > >>> > On Tue, Feb 25, 2014 at 6:58 PM, Josh Wills <
> josh.wills@gmail.com>
> > > >>> wrote:
> > > >>> >
> > > >>> > > Okay. Out of curiosity, if you override the float scaleFactor()
> > > >>> method
> > > >>> > that
> > > >>> > > you apply here:
> > > >>> > >
> > > >>> > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn,
U,
> > > >>> > > ParallelDoOptions.builder().sources(target).build());
> > > >>> > >
> > > >>> > > and apply it to beforeWrite, does it still insist on
writing
> out
> > > >>> > > beforeWrite on the reduce side?
> > > >>> > >
> > > >>> > > BTW, I'm assuming there is (again) some reason not to
force a
> > run()
> > > >>> here.
> > > >>> > > ;-)
> > > >>> > >
> > > >>> > >
> > > >>> > >
> > > >>> > > On Tue, Feb 25, 2014 at 4:51 PM, Jinal Shah <
> > > jinalshah2007@gmail.com
> > > >>> >
> > > >>> > > wrote:
> > > >>> > >
> > > >>> > > > I wanted to run that in the map phase instead of
reduce. If I
> > > >>> don't do
> > > >>> > > that
> > > >>> > > > it will run in the reduce phase.
> > > >>> > > >
> > > >>> > > >
> > > >>> > > > On Tue, Feb 25, 2014 at 5:38 PM, Josh Wills <
> > jwills@cloudera.com
> > > >
> > > >>> > wrote:
> > > >>> > > >
> > > >>> > > > > On Tue, Feb 25, 2014 at 3:04 PM, Jinal Shah
<
> > > >>> jinalshah2007@gmail.com
> > > >>> > >
> > > >>> > > > > wrote:
> > > >>> > > > >
> > > >>> > > > > > Hi,
> > > >>> > > > > >
> > > >>> > > > > > I'm trying to do an union of 3 PTables
but I'm getting
> this
> > > >>> error
> > > >>> > > > > > http://pastebin.com/TkMPunJu
> > > >>> > > > > >
> > > >>> > > > > > this is where it is throwing it
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > >
> >
> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java#L66
> > > >>> > > > > >
> > > >>> > > > > > this is what I'm trying to do
> > > >>> > > > > >
> > > >>> > > > > > PCollection<U> beforeWrite = someOperation();
> > > >>> > > > > >
> > > >>> > > > > > SourceTarget<U> target = new
> > > AvroFileTarget().asSourceTaget(U);
> > > >>> > > > > >
> > > >>> > > > > > pipeline.write(beforeWrite, target);
> > > >>> > > > > >
> > > >>> > > > > > PCollection<U> afterWrite = pipeline.read(target);
> > > >>> > > > > >
> > > >>> > > > >
> > > >>> > > > > Why are you creating afterWrite here, instead
of doing the
> > > >>> processing
> > > >>> > > in
> > > >>> > > > > the next step (the one that yields afterParallelDo)
against
> > > >>> > > beforeWrite?
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > > > PCollection<U> afterParallelDo
=
> > afterWrite.parallelDo(DoFn,
> > > U,
> > > >>> > > > > > ParallelDoOptions.builder().sources(target).build());
> > > >>> > > > > >
> > > >>> > > > > > PTable<K,U> afterSomeOperation
= someOperations();
> > > >>> > > > > >
> > > >>> > > > > > PTable<K,U> thatNeedsToBeAdded
= comingFromHbase();
> > > >>> > > > > >
> > > >>> > > > > > PTable<K,U> unionNeeded =
> > > >>> > > >  afterSomeOperation.union(thatNeedsToBeAdded);
> > > >>> > > > > //
> > > >>> > > > > > this is where it fails for some reason
since it is
> looking
> > > for
> > > >>> the
> > > >>> > > > target
> > > >>> > > > > > which is not generated yet.
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > > Can anyone help me in understanding why
this is
> happening?
> > > >>> > > > > >
> > > >>> > > > > > Thanks
> > > >>> > > > > > Jinal
> > > >>> > > > > >
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > > --
> > > >>> > > > > Director of Data Science
> > > >>> > > > > Cloudera <http://www.cloudera.com>
> > > >>> > > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> Director of Data Science
> > > >>> Cloudera <http://www.cloudera.com>
> > > >>> Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> >
> >
> >
> > --
> > Director of Data Science
> > Cloudera <http://www.cloudera.com>
> > Twitter: @josh_wills <http://twitter.com/josh_wills>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message