Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4B9C71061D for ; Fri, 23 Oct 2015 17:09:30 +0000 (UTC) Received: (qmail 25476 invoked by uid 500); 23 Oct 2015 17:09:30 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 25444 invoked by uid 500); 23 Oct 2015 17:09:30 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 25427 invoked by uid 99); 23 Oct 2015 17:09:30 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Oct 2015 17:09:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A5712C16A2 for ; Fri, 23 Oct 2015 17:09:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.879 X-Spam-Level: ** X-Spam-Status: No, score=2.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=nuna.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id ydM2oKTgicqI for ; Fri, 23 Oct 2015 17:09:28 +0000 (UTC) Received: from mail-wi0-f176.google.com (mail-wi0-f176.google.com [209.85.212.176]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 075A02304E for ; Fri, 23 Oct 2015 17:09:27 +0000 (UTC) Received: by wicll6 with SMTP id ll6so40022221wic.0 for ; Fri, 23 Oct 2015 10:09:26 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=nuna.com; s=nuna; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=cj/fBXfZCY0TRJO3yl95REQP6sGO9utORtzXSOxHC/8=; b=oYjQTm37k28mhEkHTsNnZwUCuwvVJgaDy7dSzyxmMkQXZMcr/vIRPB7Sui7XEyFQGY Lr2Hb3jK/85MoSXjIhn7ew/Et+LoIOJzIE0TwCOdhZXOlFjywLMhYV9ARWnufyomZqs6 6RAzScp+qhO5Z1D4m8K0mJRu25tvEQW0KS5GE= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=cj/fBXfZCY0TRJO3yl95REQP6sGO9utORtzXSOxHC/8=; b=MpTVg6yu45v4JqgGMcs7dHBs5zHx7xkFnzrPigWgJ4Spq4V2Vcupojsc/v6duWlOny 73qrlz8aH5W249/LBd4v8NUZF268bPJVpPSu/1pvQSA3hGg4PTs7VJ0w+Rzf37ULbtmV DZqaLdLSbuSWFj6J2Dp33G3OIVw7hgQfZH31qyDq+IgkypinfOamdhkcweOi1ztqhv4E IHM1pyYrGDCeyWHau4fp5ASBiz0ja/0i3uKDB7MUHhM7nRX7U7YLzuMF/dVbYN4sjD7O W9tIwj9R+WgYgxs/CKrOpsj1oKhZSvfAZwW4nc5VD3Go3Dv/8Iy+KwQhi1C51iXnzwOz pfuQ== X-Gm-Message-State: ALoCoQmTpfhU7OwnvA550d+JnARyPYOwR5+FEQxEgfXvUr4vcV8LRqIv5u1OlQoXWpIEE9A+gRP+VO81s+kkwexTKwMQnMOkbLTq3ncGGQBKjOyOitkZKPgVltDwtNzDDH1aFU5SI5zy MIME-Version: 1.0 X-Received: by 10.180.182.84 with SMTP id ec20mr6012584wic.42.1445620166651; Fri, 23 Oct 2015 10:09:26 -0700 (PDT) Received: by 10.194.176.231 with HTTP; Fri, 23 Oct 2015 10:09:26 -0700 (PDT) In-Reply-To: References: Date: Fri, 23 Oct 2015 10:09:26 -0700 Message-ID: Subject: Re: Reuse PCollection / fork processing From: Everett Anderson To: user@crunch.apache.org Content-Type: multipart/alternative; boundary=047d7b6250a8839a920522c8ac9b --047d7b6250a8839a920522c8ac9b Content-Type: text/plain; charset=UTF-8 Hi Rushi, What's happening inside your filter() method? What's the boolean flag? Is it calling pipeline.run()? It seems like unless you call pipeline.run() or pipeline.done(), Crunch won't actually perform work and write out the tables to disk before the calls to processFinal, where it tries to read them back from disk. On Fri, Oct 23, 2015 at 9:41 AM, Rushi wrote: > Does anyone have any idea why this might be happening? Is it possible that > after 'done' is called, one of the paths completes processing first, the > staging data gets cleared and thus causing the exception to be thrown for > the other path? > > Thanks. > > On Wed, Oct 21, 2015 at 3:01 PM, Rushi wrote: > >> Thanks for replying. >> >> Actually, I'm not calling done in between the sections, but only at the >> end. The getPipeline().run() call in the processIntermediate() method is >> commented out (I was trying to see if that would help but it didn't so I >> commented it). >> >> >> On Wed, Oct 21, 2015 at 2:05 PM, David Ortiz wrote: >> >>> Don't call done in between sections where you use the PCollection and it >>> should work. >>> >>> On Wed, Oct 21, 2015 at 2:57 PM Rushi wrote: >>> >>>> In Crunch, is it possible to reuse a PCollection multiple times for >>>> different purposes in the same pipeline run? My pseudocode looks something >>>> like the following, but I get an error File does not exist: >>>> /tmp/crunch-.. when I run it. If I comment out the second processing >>>> path (processFinal(paths.second()) line in process() method) I do not get >>>> the error and the pipeline executes successfully. >>>> >>>> // 1. Entry point >>>> *public int run() {* >>>> process(); >>>> >>>> getPipeline().done(); >>>> *} // end of run()* >>>> >>>> >>>> // 2. >>>> *private void process() {* >>>> >>>> Pair paths = processIntermediate(); >>>> >>>> PTable ret1 = processFinal(paths.first()); >>>> PTable ret2 = processFinal(paths.second()); >>>> >>>> return ret1.union(ret2); >>>> >>>> *} // end of process()* >>>> >>>> >>>> // 3. >>>> *private Pair processIntermediate() {* >>>> >>>> PTable data = ...; // read data from wherever >>>> >>>> // filter data from the input >>>> Path path1 = filter(data, fs, true); // filter() will write a >>>> PCollection to an AvroFileSourceTarget and return its path, which will be >>>> used later to read the collection back and do further processing. >>>> Path path2 = filter(data, fs, false); >>>> >>>> // getPipeline().run(); >>>> >>>> return Pair.of(path1, path2); >>>> >>>> *} // end of **processIntermediate* >>>> >>>> >>>> // 4. >>>> *private PTable processFinal(Path path) {* >>>> >>>> PCollection table = getPipeline().read(new >>>> AvroFileSource<>(path), records(Strings)); >>>> >>>> return table.parallelDo(...); >>>> >>>> *} // end of processFinal* >>>> >>>> >>>> I imagine I could probably use Oozie workflow actions to simplify the >>>> processing but if this is just a matter of syntax/rearranging the code, I >>>> would like to know it. >>>> >>>> *Thanks in advance!* >>>> >>> >> > -- *DISCLAIMER:* The contents of this email, including any attachments, may contain information that is confidential, proprietary in nature, protected health information (PHI), or otherwise protected by law from disclosure, and is solely for the use of the intended recipient(s). If you are not the intended recipient, you are hereby notified that any use, disclosure or copying of this email, including any attachments, is unauthorized and strictly prohibited. If you have received this email in error, please notify the sender of this email. Please delete this and all copies of this email from your system. Any opinions either expressed or implied in this email and all attachments, are those of its author only, and do not necessarily reflect those of Nuna Health, Inc. --047d7b6250a8839a920522c8ac9b Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Rushi,

What's happening inside y= our filter() method? What's the boolean flag? Is it calling pipeline.ru= n()?

It seems like unless you call pipeline.run() = or pipeline.done(), Crunch won't actually perform work and write out th= e tables to disk before the calls to processFinal, where it tries to read t= hem back from disk.



=

O= n Fri, Oct 23, 2015 at 9:41 AM, Rushi <hrishi.engineer@gmail.com> wrote:
Does anyone have any idea why this might be happening? Is it po= ssible that after 'done' is called, one of the paths completes proc= essing first, the staging data gets cleared and thus causing the exception = to be thrown for the other path?

Thanks.

On Wed, Oct 21, 2015 at 3:01 PM, Rushi <
hr= ishi.engineer@gmail.com> wrote:
Thanks for replying.=C2=A0

Actually, I'm not calling done in = between the sections, but only at the end. The getPipeline().run() call in = the processIntermediate() method is commented out (I was trying to see if t= hat would help but it didn't so I commented it).

=

On Wed, Oct 21, 2= 015 at 2:05 PM, David Ortiz <dpo5003@gmail.com> wrote:
Don't call done in betwee= n sections where you use the PCollection and it should work.

On Wed, Oct 21, 2015 at 2:= 57 PM Rushi <hrishi.engineer@gmail.com> wrote:
In Crunch, is it possible to reuse a PCo= llection multiple times for different purposes in the same pipeline run? My= pseudocode looks something like the following, but I get an error=C2=A0File does not exist: /tmp/crunch-..=C2=A0 when I run it. If I comment out the second processing path (proce= ssFinal(paths.second()) line in process() method) I do not get the error an= d the pipeline executes successfully.

// 1. = Entry point
public int run() {
=C2=A0 = process();

=C2=A0 getPipeline().done();
} // end of run()

// 2.
private void process() {

=C2=A0 Pair<Path, Path> paths =3D processInter= mediate();

=C2=A0 PTable<String, Stri= ng> ret1 =3D processFinal(paths.first());
=C2=A0 PTable<= String, String> ret2 =3D=C2=A0processFinal(paths.second());

=C2=A0 return ret1.union(ret2);

<= div style=3D"font-family:'comic sans ms',sans-serif;color:rgb(11,83= ,148)">} // end of process()


// 3.
private Pair<Path, Path> processInter= mediate() {

=C2=A0 PTable<String, Int= eger> data =3D ...; // read data from wherever

=C2=A0 // filter data from the input
=C2=A0 Path path1 = =3D filter(data, fs, true); =C2=A0 // filter() will write a PCollection to = an AvroFileSourceTarget and return its path, which will be used later to re= ad the collection back and do further processing.
=C2=A0 Path = path2 =3D filter(data, fs, false);

=C2=A0 //= getPipeline().run();

=C2=A0 return Pair.of(= path1, path2);

} // end of=C2=A0processIntermediate


<= /b>
// 4.
private PTable<String, String>= processFinal(Path path) {

=C2=A0= PCollection<String> table =3D getPipeline().read(new AvroFileSource&= lt;>(path), records(Strings));

=C2=A0 ret= urn table.parallelDo(...);

} // end of pr= ocessFinal


I = imagine I could probably use Oozie workflow actions to simplify the process= ing but if this is just a matter of syntax/rearranging the code, I would li= ke to know it.

Thanks in advance!<= /b>




DISCLAIMER:=C2=A0The conten= ts of this email, including any attachments, may contain information that i= s confidential, proprietary in nature, protected health information (PHI), = or otherwise protected by law from disclosure, and is solely for the use of= the intended recipient(s). If you are not the intended recipient, you are = hereby notified that any use, disclosure or copying of this email, includin= g any attachments, is unauthorized and strictly prohibited. If you have rec= eived this email in error, please notify the sender of this email. Please d= elete this and all copies of this email from your system. Any opinions eith= er expressed or implied in this email and all attachments, are those of its= author only, and do not necessarily reflect those of Nuna Health, Inc. --047d7b6250a8839a920522c8ac9b--