crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Illegal State Exception when doing a union
Date Thu, 27 Feb 2014 03:00:47 GMT
Hey Jinal,

Been thinking about it off-and-on all day, and I don't have a better
solution right now than pipeline.run()...

J


On Wed, Feb 26, 2014 at 6:46 PM, Jinal Shah <jinalshah2007@gmail.com> wrote:

> So Josh what do you think can be done?
>
>
> On Wed, Feb 26, 2014 at 10:37 AM, Jinal Shah <jinalshah2007@gmail.com
> >wrote:
>
> > As well as it is trying to run it in parallel so now it is failing on
> that.
> >
> >
> > On Wed, Feb 26, 2014 at 10:30 AM, Jinal Shah <jinalshah2007@gmail.com
> >wrote:
> >
> >> I did as you said but now it is running the DoFn twice since after that
> >> parallel do I'm writing that output to HDFS so it divided that both work
> >> into 2 once while storing the output it is running it in the reduce
> phase
> >> and then while doing the union it is running it in the map phase.
> >>
> >>
> >> On Tue, Feb 25, 2014 at 7:41 PM, Josh Wills <jwills@cloudera.com>
> wrote:
> >>
> >>> So my thought would be that if the DoFn in this step:
> >>>
> >>> beforeWrite.parallelDo(DoFn, U, ParallelDoOptions.builder().
> >>> sources(target).build());
> >>>
> >>> signaled that it was going to write a lot of data with a large
> >>> scaleFactor,
> >>> then the planner would use the output from beforeWrite as a checkpoint,
> >>> and
> >>> save the DoFn processing for the map phase.
> >>>
> >>>
> >>> On Tue, Feb 25, 2014 at 5:08 PM, Jinal Shah <jinalshah2007@gmail.com>
> >>> wrote:
> >>>
> >>> > Yup this is to avoid .run() ;-) . But I want the beforeWrite output
> to
> >>> be
> >>> > stored. So how do I apply the scaleFactor method and how will help
to
> >>> make
> >>> > the DoFn for afterWrite run in Mapside.
> >>> >
> >>> >
> >>> > On Tue, Feb 25, 2014 at 6:58 PM, Josh Wills <josh.wills@gmail.com>
> >>> wrote:
> >>> >
> >>> > > Okay. Out of curiosity, if you override the float scaleFactor()
> >>> method
> >>> > that
> >>> > > you apply here:
> >>> > >
> >>> > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn,
U,
> >>> > > ParallelDoOptions.builder().sources(target).build());
> >>> > >
> >>> > > and apply it to beforeWrite, does it still insist on writing out
> >>> > > beforeWrite on the reduce side?
> >>> > >
> >>> > > BTW, I'm assuming there is (again) some reason not to force a
run()
> >>> here.
> >>> > > ;-)
> >>> > >
> >>> > >
> >>> > >
> >>> > > On Tue, Feb 25, 2014 at 4:51 PM, Jinal Shah <
> jinalshah2007@gmail.com
> >>> >
> >>> > > wrote:
> >>> > >
> >>> > > > I wanted to run that in the map phase instead of reduce.
If I
> >>> don't do
> >>> > > that
> >>> > > > it will run in the reduce phase.
> >>> > > >
> >>> > > >
> >>> > > > On Tue, Feb 25, 2014 at 5:38 PM, Josh Wills <jwills@cloudera.com
> >
> >>> > wrote:
> >>> > > >
> >>> > > > > On Tue, Feb 25, 2014 at 3:04 PM, Jinal Shah <
> >>> jinalshah2007@gmail.com
> >>> > >
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > > > Hi,
> >>> > > > > >
> >>> > > > > > I'm trying to do an union of 3 PTables but I'm
getting this
> >>> error
> >>> > > > > > http://pastebin.com/TkMPunJu
> >>> > > > > >
> >>> > > > > > this is where it is throwing it
> >>> > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java#L66
> >>> > > > > >
> >>> > > > > > this is what I'm trying to do
> >>> > > > > >
> >>> > > > > > PCollection<U> beforeWrite = someOperation();
> >>> > > > > >
> >>> > > > > > SourceTarget<U> target = new
> AvroFileTarget().asSourceTaget(U);
> >>> > > > > >
> >>> > > > > > pipeline.write(beforeWrite, target);
> >>> > > > > >
> >>> > > > > > PCollection<U> afterWrite = pipeline.read(target);
> >>> > > > > >
> >>> > > > >
> >>> > > > > Why are you creating afterWrite here, instead of doing
the
> >>> processing
> >>> > > in
> >>> > > > > the next step (the one that yields afterParallelDo)
against
> >>> > > beforeWrite?
> >>> > > > >
> >>> > > > >
> >>> > > > > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn,
> U,
> >>> > > > > > ParallelDoOptions.builder().sources(target).build());
> >>> > > > > >
> >>> > > > > > PTable<K,U> afterSomeOperation = someOperations();
> >>> > > > > >
> >>> > > > > > PTable<K,U> thatNeedsToBeAdded = comingFromHbase();
> >>> > > > > >
> >>> > > > > > PTable<K,U> unionNeeded =
> >>> > > >  afterSomeOperation.union(thatNeedsToBeAdded);
> >>> > > > > //
> >>> > > > > > this is where it fails for some reason since it
is looking
> for
> >>> the
> >>> > > > target
> >>> > > > > > which is not generated yet.
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > Can anyone help me in understanding why this is
happening?
> >>> > > > > >
> >>> > > > > > Thanks
> >>> > > > > > Jinal
> >>> > > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > > --
> >>> > > > > Director of Data Science
> >>> > > > > Cloudera <http://www.cloudera.com>
> >>> > > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> Director of Data Science
> >>> Cloudera <http://www.cloudera.com>
> >>> Twitter: @josh_wills <http://twitter.com/josh_wills>
> >>>
> >>
> >>
> >
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message