crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jinal Shah <jinalshah2...@gmail.com>
Subject Re: Illegal State Exception when doing a union
Date Thu, 27 Feb 2014 02:46:10 GMT
So Josh what do you think can be done?


On Wed, Feb 26, 2014 at 10:37 AM, Jinal Shah <jinalshah2007@gmail.com>wrote:

> As well as it is trying to run it in parallel so now it is failing on that.
>
>
> On Wed, Feb 26, 2014 at 10:30 AM, Jinal Shah <jinalshah2007@gmail.com>wrote:
>
>> I did as you said but now it is running the DoFn twice since after that
>> parallel do I'm writing that output to HDFS so it divided that both work
>> into 2 once while storing the output it is running it in the reduce phase
>> and then while doing the union it is running it in the map phase.
>>
>>
>> On Tue, Feb 25, 2014 at 7:41 PM, Josh Wills <jwills@cloudera.com> wrote:
>>
>>> So my thought would be that if the DoFn in this step:
>>>
>>> beforeWrite.parallelDo(DoFn, U, ParallelDoOptions.builder().
>>> sources(target).build());
>>>
>>> signaled that it was going to write a lot of data with a large
>>> scaleFactor,
>>> then the planner would use the output from beforeWrite as a checkpoint,
>>> and
>>> save the DoFn processing for the map phase.
>>>
>>>
>>> On Tue, Feb 25, 2014 at 5:08 PM, Jinal Shah <jinalshah2007@gmail.com>
>>> wrote:
>>>
>>> > Yup this is to avoid .run() ;-) . But I want the beforeWrite output to
>>> be
>>> > stored. So how do I apply the scaleFactor method and how will help to
>>> make
>>> > the DoFn for afterWrite run in Mapside.
>>> >
>>> >
>>> > On Tue, Feb 25, 2014 at 6:58 PM, Josh Wills <josh.wills@gmail.com>
>>> wrote:
>>> >
>>> > > Okay. Out of curiosity, if you override the float scaleFactor()
>>> method
>>> > that
>>> > > you apply here:
>>> > >
>>> > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn,
U,
>>> > > ParallelDoOptions.builder().sources(target).build());
>>> > >
>>> > > and apply it to beforeWrite, does it still insist on writing out
>>> > > beforeWrite on the reduce side?
>>> > >
>>> > > BTW, I'm assuming there is (again) some reason not to force a run()
>>> here.
>>> > > ;-)
>>> > >
>>> > >
>>> > >
>>> > > On Tue, Feb 25, 2014 at 4:51 PM, Jinal Shah <jinalshah2007@gmail.com
>>> >
>>> > > wrote:
>>> > >
>>> > > > I wanted to run that in the map phase instead of reduce. If I
>>> don't do
>>> > > that
>>> > > > it will run in the reduce phase.
>>> > > >
>>> > > >
>>> > > > On Tue, Feb 25, 2014 at 5:38 PM, Josh Wills <jwills@cloudera.com>
>>> > wrote:
>>> > > >
>>> > > > > On Tue, Feb 25, 2014 at 3:04 PM, Jinal Shah <
>>> jinalshah2007@gmail.com
>>> > >
>>> > > > > wrote:
>>> > > > >
>>> > > > > > Hi,
>>> > > > > >
>>> > > > > > I'm trying to do an union of 3 PTables but I'm getting
this
>>> error
>>> > > > > > http://pastebin.com/TkMPunJu
>>> > > > > >
>>> > > > > > this is where it is throwing it
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java#L66
>>> > > > > >
>>> > > > > > this is what I'm trying to do
>>> > > > > >
>>> > > > > > PCollection<U> beforeWrite = someOperation();
>>> > > > > >
>>> > > > > > SourceTarget<U> target = new AvroFileTarget().asSourceTaget(U);
>>> > > > > >
>>> > > > > > pipeline.write(beforeWrite, target);
>>> > > > > >
>>> > > > > > PCollection<U> afterWrite = pipeline.read(target);
>>> > > > > >
>>> > > > >
>>> > > > > Why are you creating afterWrite here, instead of doing the
>>> processing
>>> > > in
>>> > > > > the next step (the one that yields afterParallelDo) against
>>> > > beforeWrite?
>>> > > > >
>>> > > > >
>>> > > > > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn,
U,
>>> > > > > > ParallelDoOptions.builder().sources(target).build());
>>> > > > > >
>>> > > > > > PTable<K,U> afterSomeOperation = someOperations();
>>> > > > > >
>>> > > > > > PTable<K,U> thatNeedsToBeAdded = comingFromHbase();
>>> > > > > >
>>> > > > > > PTable<K,U> unionNeeded =
>>> > > >  afterSomeOperation.union(thatNeedsToBeAdded);
>>> > > > > //
>>> > > > > > this is where it fails for some reason since it is looking
for
>>> the
>>> > > > target
>>> > > > > > which is not generated yet.
>>> > > > > >
>>> > > > > >
>>> > > > > > Can anyone help me in understanding why this is happening?
>>> > > > > >
>>> > > > > > Thanks
>>> > > > > > Jinal
>>> > > > > >
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > --
>>> > > > > Director of Data Science
>>> > > > > Cloudera <http://www.cloudera.com>
>>> > > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message