crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Everett Anderson <ever...@nuna.com>
Subject Re: Reuse PCollection / fork processing
Date Fri, 23 Oct 2015 17:09:26 GMT
Hi Rushi,

What's happening inside your filter() method? What's the boolean flag? Is
it calling pipeline.run()?

It seems like unless you call pipeline.run() or pipeline.done(), Crunch
won't actually perform work and write out the tables to disk before the
calls to processFinal, where it tries to read them back from disk.





On Fri, Oct 23, 2015 at 9:41 AM, Rushi <hrishi.engineer@gmail.com> wrote:

> Does anyone have any idea why this might be happening? Is it possible that
> after 'done' is called, one of the paths completes processing first, the
> staging data gets cleared and thus causing the exception to be thrown for
> the other path?
>
> Thanks.
>
> On Wed, Oct 21, 2015 at 3:01 PM, Rushi <hrishi.engineer@gmail.com> wrote:
>
>> Thanks for replying.
>>
>> Actually, I'm not calling done in between the sections, but only at the
>> end. The getPipeline().run() call in the processIntermediate() method is
>> commented out (I was trying to see if that would help but it didn't so I
>> commented it).
>>
>>
>> On Wed, Oct 21, 2015 at 2:05 PM, David Ortiz <dpo5003@gmail.com> wrote:
>>
>>> Don't call done in between sections where you use the PCollection and it
>>> should work.
>>>
>>> On Wed, Oct 21, 2015 at 2:57 PM Rushi <hrishi.engineer@gmail.com> wrote:
>>>
>>>> In Crunch, is it possible to reuse a PCollection multiple times for
>>>> different purposes in the same pipeline run? My pseudocode looks something
>>>> like the following, but I get an error File does not exist:
>>>> /tmp/crunch-..  when I run it. If I comment out the second processing
>>>> path (processFinal(paths.second()) line in process() method) I do not get
>>>> the error and the pipeline executes successfully.
>>>>
>>>> // 1. Entry point
>>>> *public int run() {*
>>>>   process();
>>>>
>>>>   getPipeline().done();
>>>> *} // end of run()*
>>>>
>>>>
>>>> // 2.
>>>> *private void process() {*
>>>>
>>>>   Pair<Path, Path> paths = processIntermediate();
>>>>
>>>>   PTable<String, String> ret1 = processFinal(paths.first());
>>>>   PTable<String, String> ret2 = processFinal(paths.second());
>>>>
>>>>   return ret1.union(ret2);
>>>>
>>>> *} // end of process()*
>>>>
>>>>
>>>> // 3.
>>>> *private Pair<Path, Path> processIntermediate() {*
>>>>
>>>>   PTable<String, Integer> data = ...; // read data from wherever
>>>>
>>>>   // filter data from the input
>>>>   Path path1 = filter(data, fs, true);   // filter() will write a
>>>> PCollection to an AvroFileSourceTarget and return its path, which will be
>>>> used later to read the collection back and do further processing.
>>>>   Path path2 = filter(data, fs, false);
>>>>
>>>>   // getPipeline().run();
>>>>
>>>>   return Pair.of(path1, path2);
>>>>
>>>> *} // end of **processIntermediate*
>>>>
>>>>
>>>> // 4.
>>>> *private PTable<String, String> processFinal(Path path) {*
>>>>
>>>>   PCollection<String> table = getPipeline().read(new
>>>> AvroFileSource<>(path), records(Strings));
>>>>
>>>>   return table.parallelDo(...);
>>>>
>>>> *} // end of processFinal*
>>>>
>>>>
>>>> I imagine I could probably use Oozie workflow actions to simplify the
>>>> processing but if this is just a matter of syntax/rearranging the code, I
>>>> would like to know it.
>>>>
>>>> *Thanks in advance!*
>>>>
>>>
>>
>

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Mime
View raw message