Return-Path: X-Original-To: apmail-incubator-crunch-dev-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B696AD77B for ; Thu, 20 Sep 2012 17:50:48 +0000 (UTC) Received: (qmail 16470 invoked by uid 500); 20 Sep 2012 17:50:48 -0000 Delivered-To: apmail-incubator-crunch-dev-archive@incubator.apache.org Received: (qmail 16449 invoked by uid 500); 20 Sep 2012 17:50:48 -0000 Mailing-List: contact crunch-dev-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-dev@incubator.apache.org Received: (qmail 16441 invoked by uid 99); 20 Sep 2012 17:50:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Sep 2012 17:50:48 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of gabriel.reid@gmail.com designates 74.125.83.47 as permitted sender) Received: from [74.125.83.47] (HELO mail-ee0-f47.google.com) (74.125.83.47) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Sep 2012 17:50:40 +0000 Received: by eeit10 with SMTP id t10so93175eei.6 for ; Thu, 20 Sep 2012 10:50:20 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=date:from:to:message-id:in-reply-to:references:subject:x-mailer :mime-version:content-type:content-transfer-encoding :content-disposition; bh=xoKjQKx9hvdKnvkXANHYMWtuCxXiCVzQm2aAzNfP29U=; b=hqP3l+e0o2eYRQGveXqvG1VpZfVzVArll5721HYdJypiC3j5KRM48zLqIWDxQ8EKgY g+Gs/WY4A5qHjgzHjYBNWVRFzEOWPYoVwypji4ZaL6fwS/eUZUlIRhnpFhS1BxNIlBQe FLmBPCLJlHaXMMmYWvuCHRkkrBsAI2MMyA9oa7byOB43hp/MRkFGoeyNKTrKPuXNHj08 tbUqom8J2+MinsjH3yBGbeuoXNWxFpG9kmIsjqkeE0+qjBDuvN3WztGZm/gwwCG3xkCe dEAqwodbE1wtoVvw3JF0XT5jzo6/djMn1AObWocRK4HUExf5LCEPwprUSz8FTF+kJ0IW tn6w== Received: by 10.14.180.68 with SMTP id i44mr3210543eem.20.1348163420737; Thu, 20 Sep 2012 10:50:20 -0700 (PDT) Received: from [192.168.0.157] (78-22-137-231.access.telenet.be. [78.22.137.231]) by mx.google.com with ESMTPS id k49sm17430228een.4.2012.09.20.10.50.19 (version=TLSv1/SSLv3 cipher=OTHER); Thu, 20 Sep 2012 10:50:20 -0700 (PDT) Date: Thu, 20 Sep 2012 19:50:17 +0200 From: Gabriel Reid To: crunch-dev@incubator.apache.org Message-ID: <295334AF026D415B9DE765A6DB81CD44@gmail.com> In-Reply-To: References: <9DE954B8CC2F46CABABE4ABFB81C67D5@gmail.com> <7547A06B3AC741E9AA773826899FF6A3@gmail.com> Subject: Re: Checkpointing in pipelines X-Mailer: sparrow 1.6.3 (build 1172) MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline On Thursday 20 September 2012 at 16:50, Josh Wills wrote: > Hey Gabriel (and others), > =20 > I think we are on the same page-- you're basically talking about > creating a way to send hints (or perhaps, orders) to the optimizer in > terms of how it should decide how to break a job up. I am very much on > board with this. > =20 > J Ok, cool, I'll try to put something together for this. - Gabriel =20 > =20 > On Thu, Sep 20, 2012 at 5:40 AM, Gabriel Reid wrote: > > Hi Josh (and others), > > =20 > > I'm not sure if we were on the same page about this or not -- any tho= ughts on it in the meantime=3F > > =20 > > - Gabriel > > =20 > > =20 > > On Thursday 6 September 2012 at 16:18, Gabriel Reid wrote: > > =20 > > > Hi Josh, > > > =20 > > > The last thing I would be doing after completing a trans-atlantic > > > flight is checking developer mailing lists ;-) > > > =20 > > > What you're talking about (having a kind of rollback for job failur= es > > > somewhere along the pipeline) could be facilitated with what I was > > > talking about here, but it's not what I was trying to accomplish (I= > > > think you realize that, but I'm just making sure). However, it does= > > > kind of show that the name =22checkpoint=22 isn't that descriptive = for the > > > specific use case that I was talking about (which is what I was a b= it > > > worried about). > > > =20 > > > To clarify, I'm talking about making it possible to have specify th= at > > > a node in the execution graph of the pipeline shouldn't be merged i= n > > > between two other nodes (for example, an output or a GBK). The > > > specific use case that I'm going for is customizing the execution p= lan > > > for performance, and not for failure recovery. > > > =20 > > > I think we're on the same page here, but just referring to two > > > different use cases, right=3F > > > =20 > > > - Gabriel > > > =20 > > > =20 > > > On Thu, Sep 6, 2012 at 4:00 PM, Josh Wills wrote: > > > > I grok the concept and see the use case, but I was expecting that= this > > > > email was going to be about checkpointing in the sense of having = Crunch > > > > save state about the intermediate outputs of a processing pipelin= e and then > > > > supporting the ability to restart a failed pipeline from a checkp= ointed > > > > stage-- does that notion line up with what you had in mind here, = or am I > > > > just sleep deprived=3F > > > > =20 > > > > Josh, who just arrived in London > > > > =20 > > > > On Wed, Sep 5, 2012 at 9:16 PM, Gabriel Reid wrote: > > > > =20 > > > > > Hi guys, > > > > > =20 > > > > > In some instances, we want to do some kind of iterative process= ing in > > > > > Crunch, and run the same (or a similar) Do=46n on the same PCol= lection > > > > > multiple times. > > > > > =20 > > > > > =46or example, let's say we've got a PCollection of =22grid=22 = objects, and we > > > > > want to iteratively divide each of these grids into four sub-gr= ids, leading > > > > > to exponential growth of the data. The naive way to do this wou= ld be to do > > > > > the following: > > > > > =20 > > > > > PCollection grids =3D =E2=80=A6; > > > > > for (=E2=80=A6)=7B > > > > > grids =3D grids.parallelDo(new Subdivide=46n()); > > > > > =7D > > > > > =20 > > > > > However, the above code would be optimized into a single string= of Do=46ns, > > > > > and not increasing the number of mappers we've got per iteratio= n, which of > > > > > course wouldn't work well with the exponential growth of data. > > > > > =20 > > > > > The current way of getting around this is to add a call to > > > > > materialize().iterator() on the PCollection in each iteration (= this is also > > > > > done in the PageRankIT integration test). > > > > > =20 > > > > > What I propose is adding a =22checkpoint=22 method to PCollecti= on to signify > > > > > that this should be an actual step in processing. This could wo= rk as > > > > > follows: > > > > > =20 > > > > > PCollection grids =3D =E2=80=A6; > > > > > for (=E2=80=A6)=7B > > > > > grids =3D grids.parallelDo(new Subdivide=46n()).checkpoint(); > > > > > =7D > > > > > =20 > > > > > =20 > > > > > In the short term this could even be implemented as just a call= to > > > > > materialize().iterator(), but putting encapsulating it in a met= hod like > > > > > this would allow us to work more efficiently with it in the fut= ure, > > > > > especially once CRUNCH-34 is merged. > > > > > =20 > > > > > Any thoughts on this=3F The actual name of the method is my big= gest concern, > > > > > I'm not sure if =22checkpoint=22 is the best name for it, but I= can't think of > > > > > anything better at the moment. > > > > > =20 > > > > > - Gabriel > > > > =20 > > > > =20 > > > > -- > > > > Director of Data Science > > > > Cloudera > > > > Twitter: =40josh=5Fwills > > > =20 > > =20 > =20 > =20 > =20 > =20 > =20 > -- =20 > Director of Data Science > Cloudera > Twitter: =40josh=5Fwills