crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Illegal State Exception when doing a union
Date Thu, 27 Feb 2014 20:30:40 GMT
Oh, absolutely-- go right ahead.

J


On Thu, Feb 27, 2014 at 12:26 PM, Jinal Shah <jinalshah2007@gmail.com>wrote:

> Can we atleast log a jira for that? So in that case who ever is available
> and interested can work on it.
>
>
> On Thu, Feb 27, 2014 at 1:48 PM, Josh Wills <josh.wills@gmail.com> wrote:
>
> > Yeah, but it will require changing code to do that; there isn't a way to
> do
> > it as currently implemented. My hypothesis would be that we would need to
> > modify Sources to check to see if they were SourceTargets that didn't
> exist
> > yet, figure out which job was writing them, and then add the sourceTarget
> > dependency automatically, and be able to do the size planning for the job
> > based on the estimated size of the PCollection(s) that were populating
> that
> > target. It's not obviously a trivial change (at least, it's not obvious
> to
> > me yet), and I wouldn't consider it a priority while Pipeline.run()
> exists
> > as a workaround.
> >
> > J
> >
> >
> > On Thu, Feb 27, 2014 at 11:38 AM, Jinal Shah <jinalshah2007@gmail.com
> > >wrote:
> >
> > > Hey Josh, Is there no way of telling the planner when it is trying to
> do
> > > union or co-group or some operation where it is trying to find the size
> > > from the location we are reading from after doing a write for planning
> >  to
> > > do a run till there if the source is something that needs to be
> generated
> > > through the processing prior to planning it ahead of time. May be I'm
> > > completely wrong but it was just a thought.
> > >
> > >
> > > On Wed, Feb 26, 2014 at 9:00 PM, Josh Wills <jwills@cloudera.com>
> wrote:
> > >
> > > > Hey Jinal,
> > > >
> > > > Been thinking about it off-and-on all day, and I don't have a better
> > > > solution right now than pipeline.run()...
> > > >
> > > > J
> > > >
> > > >
> > > > On Wed, Feb 26, 2014 at 6:46 PM, Jinal Shah <jinalshah2007@gmail.com
> >
> > > > wrote:
> > > >
> > > > > So Josh what do you think can be done?
> > > > >
> > > > >
> > > > > On Wed, Feb 26, 2014 at 10:37 AM, Jinal Shah <
> > jinalshah2007@gmail.com
> > > > > >wrote:
> > > > >
> > > > > > As well as it is trying to run it in parallel so now it is
> failing
> > on
> > > > > that.
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 26, 2014 at 10:30 AM, Jinal Shah <
> > > jinalshah2007@gmail.com
> > > > > >wrote:
> > > > > >
> > > > > >> I did as you said but now it is running the DoFn twice since
> after
> > > > that
> > > > > >> parallel do I'm writing that output to HDFS so it divided
that
> > both
> > > > work
> > > > > >> into 2 once while storing the output it is running it in
the
> > reduce
> > > > > phase
> > > > > >> and then while doing the union it is running it in the map
> phase.
> > > > > >>
> > > > > >>
> > > > > >> On Tue, Feb 25, 2014 at 7:41 PM, Josh Wills <
> jwills@cloudera.com>
> > > > > wrote:
> > > > > >>
> > > > > >>> So my thought would be that if the DoFn in this step:
> > > > > >>>
> > > > > >>> beforeWrite.parallelDo(DoFn, U, ParallelDoOptions.builder().
> > > > > >>> sources(target).build());
> > > > > >>>
> > > > > >>> signaled that it was going to write a lot of data with
a large
> > > > > >>> scaleFactor,
> > > > > >>> then the planner would use the output from beforeWrite
as a
> > > > checkpoint,
> > > > > >>> and
> > > > > >>> save the DoFn processing for the map phase.
> > > > > >>>
> > > > > >>>
> > > > > >>> On Tue, Feb 25, 2014 at 5:08 PM, Jinal Shah <
> > > jinalshah2007@gmail.com
> > > > >
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>> > Yup this is to avoid .run() ;-) . But I want the
beforeWrite
> > > output
> > > > > to
> > > > > >>> be
> > > > > >>> > stored. So how do I apply the scaleFactor method
and how will
> > > help
> > > > to
> > > > > >>> make
> > > > > >>> > the DoFn for afterWrite run in Mapside.
> > > > > >>> >
> > > > > >>> >
> > > > > >>> > On Tue, Feb 25, 2014 at 6:58 PM, Josh Wills <
> > > josh.wills@gmail.com>
> > > > > >>> wrote:
> > > > > >>> >
> > > > > >>> > > Okay. Out of curiosity, if you override the
float
> > scaleFactor()
> > > > > >>> method
> > > > > >>> > that
> > > > > >>> > > you apply here:
> > > > > >>> > >
> > > > > >>> > > PCollection<U> afterParallelDo =
> afterWrite.parallelDo(DoFn,
> > U,
> > > > > >>> > > ParallelDoOptions.builder().sources(target).build());
> > > > > >>> > >
> > > > > >>> > > and apply it to beforeWrite, does it still
insist on
> writing
> > > out
> > > > > >>> > > beforeWrite on the reduce side?
> > > > > >>> > >
> > > > > >>> > > BTW, I'm assuming there is (again) some reason
not to
> force a
> > > > run()
> > > > > >>> here.
> > > > > >>> > > ;-)
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > > On Tue, Feb 25, 2014 at 4:51 PM, Jinal Shah
<
> > > > > jinalshah2007@gmail.com
> > > > > >>> >
> > > > > >>> > > wrote:
> > > > > >>> > >
> > > > > >>> > > > I wanted to run that in the map phase
instead of reduce.
> > If I
> > > > > >>> don't do
> > > > > >>> > > that
> > > > > >>> > > > it will run in the reduce phase.
> > > > > >>> > > >
> > > > > >>> > > >
> > > > > >>> > > > On Tue, Feb 25, 2014 at 5:38 PM, Josh
Wills <
> > > > jwills@cloudera.com
> > > > > >
> > > > > >>> > wrote:
> > > > > >>> > > >
> > > > > >>> > > > > On Tue, Feb 25, 2014 at 3:04 PM,
Jinal Shah <
> > > > > >>> jinalshah2007@gmail.com
> > > > > >>> > >
> > > > > >>> > > > > wrote:
> > > > > >>> > > > >
> > > > > >>> > > > > > Hi,
> > > > > >>> > > > > >
> > > > > >>> > > > > > I'm trying to do an union of
3 PTables but I'm
> getting
> > > this
> > > > > >>> error
> > > > > >>> > > > > > http://pastebin.com/TkMPunJu
> > > > > >>> > > > > >
> > > > > >>> > > > > > this is where it is throwing
it
> > > > > >>> > > > > >
> > > > > >>> > > > > >
> > > > > >>> > > > >
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java#L66
> > > > > >>> > > > > >
> > > > > >>> > > > > > this is what I'm trying to
do
> > > > > >>> > > > > >
> > > > > >>> > > > > > PCollection<U> beforeWrite
= someOperation();
> > > > > >>> > > > > >
> > > > > >>> > > > > > SourceTarget<U> target
= new
> > > > > AvroFileTarget().asSourceTaget(U);
> > > > > >>> > > > > >
> > > > > >>> > > > > > pipeline.write(beforeWrite,
target);
> > > > > >>> > > > > >
> > > > > >>> > > > > > PCollection<U> afterWrite
= pipeline.read(target);
> > > > > >>> > > > > >
> > > > > >>> > > > >
> > > > > >>> > > > > Why are you creating afterWrite
here, instead of doing
> > the
> > > > > >>> processing
> > > > > >>> > > in
> > > > > >>> > > > > the next step (the one that yields
afterParallelDo)
> > against
> > > > > >>> > > beforeWrite?
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > > > PCollection<U> afterParallelDo
=
> > > > afterWrite.parallelDo(DoFn,
> > > > > U,
> > > > > >>> > > > > > ParallelDoOptions.builder().sources(target).build());
> > > > > >>> > > > > >
> > > > > >>> > > > > > PTable<K,U> afterSomeOperation
= someOperations();
> > > > > >>> > > > > >
> > > > > >>> > > > > > PTable<K,U> thatNeedsToBeAdded
= comingFromHbase();
> > > > > >>> > > > > >
> > > > > >>> > > > > > PTable<K,U> unionNeeded
=
> > > > > >>> > > >  afterSomeOperation.union(thatNeedsToBeAdded);
> > > > > >>> > > > > //
> > > > > >>> > > > > > this is where it fails for
some reason since it is
> > > looking
> > > > > for
> > > > > >>> the
> > > > > >>> > > > target
> > > > > >>> > > > > > which is not generated yet.
> > > > > >>> > > > > >
> > > > > >>> > > > > >
> > > > > >>> > > > > > Can anyone help me in understanding
why this is
> > > happening?
> > > > > >>> > > > > >
> > > > > >>> > > > > > Thanks
> > > > > >>> > > > > > Jinal
> > > > > >>> > > > > >
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > > --
> > > > > >>> > > > > Director of Data Science
> > > > > >>> > > > > Cloudera <http://www.cloudera.com>
> > > > > >>> > > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > > > >>> > > > >
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> --
> > > > > >>> Director of Data Science
> > > > > >>> Cloudera <http://www.cloudera.com>
> > > > > >>> Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Director of Data Science
> > > > Cloudera <http://www.cloudera.com>
> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > >
> > >
> >
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message