Return-Path: X-Original-To: apmail-crunch-dev-archive@www.apache.org Delivered-To: apmail-crunch-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB7AF105D4 for ; Thu, 27 Feb 2014 20:31:28 +0000 (UTC) Received: (qmail 50056 invoked by uid 500); 27 Feb 2014 20:31:28 -0000 Delivered-To: apmail-crunch-dev-archive@crunch.apache.org Received: (qmail 50020 invoked by uid 500); 27 Feb 2014 20:31:27 -0000 Mailing-List: contact dev-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list dev@crunch.apache.org Received: (qmail 50010 invoked by uid 99); 27 Feb 2014 20:31:27 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Feb 2014 20:31:27 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of jwills@cloudera.com designates 209.85.216.176 as permitted sender) Received: from [209.85.216.176] (HELO mail-qc0-f176.google.com) (209.85.216.176) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Feb 2014 20:31:21 +0000 Received: by mail-qc0-f176.google.com with SMTP id r5so3897289qcx.7 for ; Thu, 27 Feb 2014 12:31:00 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=vDxbRXeJ7iyj0nprynyzingdbpxVCBeKXjIUiSiD2vk=; b=CFuWj8i3w07zMxkmYHqZSUqucyrXqnwhBz23hkWavwgZKZUPrScrs5HufaOURJqei+ IuWXmG4v9SJIgA0zWaB4tuFeFSq1w1zkpHjfCc09eNkiD1bvOifIyujnA6D41lsxFPB8 0PPMXuUxAuDpGzgCLpzKV0r4sUcUSWkNEGd8l22Gv+JDXP+gn5FoOCOJVvwnqKjAUQWm JK0+wjANNBszxsqwCaVo0kVn91mhx4QO/tKmlBS1SRxGogHWpNpR2kXmput7C/qXpuWM yO06/BjgyoeiVMLFH9zrl0WHndLhRmb5P2etCKaSDghq+ie3WH/jNCcP23Kw/fTGRq/e QP2A== X-Gm-Message-State: ALoCoQn8XnancKK/RLlloe9/n9ppX3A2jOR88nzpTSwdgSCL5wgEdMeAYI9lefZykRIld95bhkGj X-Received: by 10.140.22.39 with SMTP id 36mr10012912qgm.59.1393533060612; Thu, 27 Feb 2014 12:31:00 -0800 (PST) MIME-Version: 1.0 Received: by 10.224.172.202 with HTTP; Thu, 27 Feb 2014 12:30:40 -0800 (PST) In-Reply-To: References: From: Josh Wills Date: Thu, 27 Feb 2014 12:30:40 -0800 Message-ID: Subject: Re: Illegal State Exception when doing a union To: dev Content-Type: multipart/alternative; boundary=001a11c153f60f93ca04f369337d X-Virus-Checked: Checked by ClamAV on apache.org --001a11c153f60f93ca04f369337d Content-Type: text/plain; charset=ISO-8859-1 Oh, absolutely-- go right ahead. J On Thu, Feb 27, 2014 at 12:26 PM, Jinal Shah wrote: > Can we atleast log a jira for that? So in that case who ever is available > and interested can work on it. > > > On Thu, Feb 27, 2014 at 1:48 PM, Josh Wills wrote: > > > Yeah, but it will require changing code to do that; there isn't a way to > do > > it as currently implemented. My hypothesis would be that we would need to > > modify Sources to check to see if they were SourceTargets that didn't > exist > > yet, figure out which job was writing them, and then add the sourceTarget > > dependency automatically, and be able to do the size planning for the job > > based on the estimated size of the PCollection(s) that were populating > that > > target. It's not obviously a trivial change (at least, it's not obvious > to > > me yet), and I wouldn't consider it a priority while Pipeline.run() > exists > > as a workaround. > > > > J > > > > > > On Thu, Feb 27, 2014 at 11:38 AM, Jinal Shah > >wrote: > > > > > Hey Josh, Is there no way of telling the planner when it is trying to > do > > > union or co-group or some operation where it is trying to find the size > > > from the location we are reading from after doing a write for planning > > to > > > do a run till there if the source is something that needs to be > generated > > > through the processing prior to planning it ahead of time. May be I'm > > > completely wrong but it was just a thought. > > > > > > > > > On Wed, Feb 26, 2014 at 9:00 PM, Josh Wills > wrote: > > > > > > > Hey Jinal, > > > > > > > > Been thinking about it off-and-on all day, and I don't have a better > > > > solution right now than pipeline.run()... > > > > > > > > J > > > > > > > > > > > > On Wed, Feb 26, 2014 at 6:46 PM, Jinal Shah > > > > > wrote: > > > > > > > > > So Josh what do you think can be done? > > > > > > > > > > > > > > > On Wed, Feb 26, 2014 at 10:37 AM, Jinal Shah < > > jinalshah2007@gmail.com > > > > > >wrote: > > > > > > > > > > > As well as it is trying to run it in parallel so now it is > failing > > on > > > > > that. > > > > > > > > > > > > > > > > > > On Wed, Feb 26, 2014 at 10:30 AM, Jinal Shah < > > > jinalshah2007@gmail.com > > > > > >wrote: > > > > > > > > > > > >> I did as you said but now it is running the DoFn twice since > after > > > > that > > > > > >> parallel do I'm writing that output to HDFS so it divided that > > both > > > > work > > > > > >> into 2 once while storing the output it is running it in the > > reduce > > > > > phase > > > > > >> and then while doing the union it is running it in the map > phase. > > > > > >> > > > > > >> > > > > > >> On Tue, Feb 25, 2014 at 7:41 PM, Josh Wills < > jwills@cloudera.com> > > > > > wrote: > > > > > >> > > > > > >>> So my thought would be that if the DoFn in this step: > > > > > >>> > > > > > >>> beforeWrite.parallelDo(DoFn, U, ParallelDoOptions.builder(). > > > > > >>> sources(target).build()); > > > > > >>> > > > > > >>> signaled that it was going to write a lot of data with a large > > > > > >>> scaleFactor, > > > > > >>> then the planner would use the output from beforeWrite as a > > > > checkpoint, > > > > > >>> and > > > > > >>> save the DoFn processing for the map phase. > > > > > >>> > > > > > >>> > > > > > >>> On Tue, Feb 25, 2014 at 5:08 PM, Jinal Shah < > > > jinalshah2007@gmail.com > > > > > > > > > > >>> wrote: > > > > > >>> > > > > > >>> > Yup this is to avoid .run() ;-) . But I want the beforeWrite > > > output > > > > > to > > > > > >>> be > > > > > >>> > stored. So how do I apply the scaleFactor method and how will > > > help > > > > to > > > > > >>> make > > > > > >>> > the DoFn for afterWrite run in Mapside. > > > > > >>> > > > > > > >>> > > > > > > >>> > On Tue, Feb 25, 2014 at 6:58 PM, Josh Wills < > > > josh.wills@gmail.com> > > > > > >>> wrote: > > > > > >>> > > > > > > >>> > > Okay. Out of curiosity, if you override the float > > scaleFactor() > > > > > >>> method > > > > > >>> > that > > > > > >>> > > you apply here: > > > > > >>> > > > > > > > >>> > > PCollection afterParallelDo = > afterWrite.parallelDo(DoFn, > > U, > > > > > >>> > > ParallelDoOptions.builder().sources(target).build()); > > > > > >>> > > > > > > > >>> > > and apply it to beforeWrite, does it still insist on > writing > > > out > > > > > >>> > > beforeWrite on the reduce side? > > > > > >>> > > > > > > > >>> > > BTW, I'm assuming there is (again) some reason not to > force a > > > > run() > > > > > >>> here. > > > > > >>> > > ;-) > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > On Tue, Feb 25, 2014 at 4:51 PM, Jinal Shah < > > > > > jinalshah2007@gmail.com > > > > > >>> > > > > > > >>> > > wrote: > > > > > >>> > > > > > > > >>> > > > I wanted to run that in the map phase instead of reduce. > > If I > > > > > >>> don't do > > > > > >>> > > that > > > > > >>> > > > it will run in the reduce phase. > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > On Tue, Feb 25, 2014 at 5:38 PM, Josh Wills < > > > > jwills@cloudera.com > > > > > > > > > > > >>> > wrote: > > > > > >>> > > > > > > > > >>> > > > > On Tue, Feb 25, 2014 at 3:04 PM, Jinal Shah < > > > > > >>> jinalshah2007@gmail.com > > > > > >>> > > > > > > > >>> > > > > wrote: > > > > > >>> > > > > > > > > > >>> > > > > > Hi, > > > > > >>> > > > > > > > > > > >>> > > > > > I'm trying to do an union of 3 PTables but I'm > getting > > > this > > > > > >>> error > > > > > >>> > > > > > http://pastebin.com/TkMPunJu > > > > > >>> > > > > > > > > > > >>> > > > > > this is where it is throwing it > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > > > > > > > > > > > https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java#L66 > > > > > >>> > > > > > > > > > > >>> > > > > > this is what I'm trying to do > > > > > >>> > > > > > > > > > > >>> > > > > > PCollection beforeWrite = someOperation(); > > > > > >>> > > > > > > > > > > >>> > > > > > SourceTarget target = new > > > > > AvroFileTarget().asSourceTaget(U); > > > > > >>> > > > > > > > > > > >>> > > > > > pipeline.write(beforeWrite, target); > > > > > >>> > > > > > > > > > > >>> > > > > > PCollection afterWrite = pipeline.read(target); > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > Why are you creating afterWrite here, instead of doing > > the > > > > > >>> processing > > > > > >>> > > in > > > > > >>> > > > > the next step (the one that yields afterParallelDo) > > against > > > > > >>> > > beforeWrite? > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > PCollection afterParallelDo = > > > > afterWrite.parallelDo(DoFn, > > > > > U, > > > > > >>> > > > > > ParallelDoOptions.builder().sources(target).build()); > > > > > >>> > > > > > > > > > > >>> > > > > > PTable afterSomeOperation = someOperations(); > > > > > >>> > > > > > > > > > > >>> > > > > > PTable thatNeedsToBeAdded = comingFromHbase(); > > > > > >>> > > > > > > > > > > >>> > > > > > PTable unionNeeded = > > > > > >>> > > > afterSomeOperation.union(thatNeedsToBeAdded); > > > > > >>> > > > > // > > > > > >>> > > > > > this is where it fails for some reason since it is > > > looking > > > > > for > > > > > >>> the > > > > > >>> > > > target > > > > > >>> > > > > > which is not generated yet. > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > Can anyone help me in understanding why this is > > > happening? > > > > > >>> > > > > > > > > > > >>> > > > > > Thanks > > > > > >>> > > > > > Jinal > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > -- > > > > > >>> > > > > Director of Data Science > > > > > >>> > > > > Cloudera > > > > > >>> > > > > Twitter: @josh_wills > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> -- > > > > > >>> Director of Data Science > > > > > >>> Cloudera > > > > > >>> Twitter: @josh_wills > > > > > >>> > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Director of Data Science > > > > Cloudera > > > > Twitter: @josh_wills > > > > > > > > > > -- Director of Data Science Cloudera Twitter: @josh_wills --001a11c153f60f93ca04f369337d--