crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rushi <hrishi.engin...@gmail.com>
Subject Re: Reuse PCollection / fork processing
Date Fri, 23 Oct 2015 16:41:27 GMT
Does anyone have any idea why this might be happening? Is it possible that
after 'done' is called, one of the paths completes processing first, the
staging data gets cleared and thus causing the exception to be thrown for
the other path?

Thanks.

On Wed, Oct 21, 2015 at 3:01 PM, Rushi <hrishi.engineer@gmail.com> wrote:

> Thanks for replying.
>
> Actually, I'm not calling done in between the sections, but only at the
> end. The getPipeline().run() call in the processIntermediate() method is
> commented out (I was trying to see if that would help but it didn't so I
> commented it).
>
>
> On Wed, Oct 21, 2015 at 2:05 PM, David Ortiz <dpo5003@gmail.com> wrote:
>
>> Don't call done in between sections where you use the PCollection and it
>> should work.
>>
>> On Wed, Oct 21, 2015 at 2:57 PM Rushi <hrishi.engineer@gmail.com> wrote:
>>
>>> In Crunch, is it possible to reuse a PCollection multiple times for
>>> different purposes in the same pipeline run? My pseudocode looks something
>>> like the following, but I get an error File does not exist:
>>> /tmp/crunch-..  when I run it. If I comment out the second processing
>>> path (processFinal(paths.second()) line in process() method) I do not get
>>> the error and the pipeline executes successfully.
>>>
>>> // 1. Entry point
>>> *public int run() {*
>>>   process();
>>>
>>>   getPipeline().done();
>>> *} // end of run()*
>>>
>>>
>>> // 2.
>>> *private void process() {*
>>>
>>>   Pair<Path, Path> paths = processIntermediate();
>>>
>>>   PTable<String, String> ret1 = processFinal(paths.first());
>>>   PTable<String, String> ret2 = processFinal(paths.second());
>>>
>>>   return ret1.union(ret2);
>>>
>>> *} // end of process()*
>>>
>>>
>>> // 3.
>>> *private Pair<Path, Path> processIntermediate() {*
>>>
>>>   PTable<String, Integer> data = ...; // read data from wherever
>>>
>>>   // filter data from the input
>>>   Path path1 = filter(data, fs, true);   // filter() will write a
>>> PCollection to an AvroFileSourceTarget and return its path, which will be
>>> used later to read the collection back and do further processing.
>>>   Path path2 = filter(data, fs, false);
>>>
>>>   // getPipeline().run();
>>>
>>>   return Pair.of(path1, path2);
>>>
>>> *} // end of **processIntermediate*
>>>
>>>
>>> // 4.
>>> *private PTable<String, String> processFinal(Path path) {*
>>>
>>>   PCollection<String> table = getPipeline().read(new
>>> AvroFileSource<>(path), records(Strings));
>>>
>>>   return table.parallelDo(...);
>>>
>>> *} // end of processFinal*
>>>
>>>
>>> I imagine I could probably use Oozie workflow actions to simplify the
>>> processing but if this is just a matter of syntax/rearranging the code, I
>>> would like to know it.
>>>
>>> *Thanks in advance!*
>>>
>>
>

Mime
View raw message