Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 715F710811 for ; Fri, 5 Dec 2014 00:24:30 +0000 (UTC) Received: (qmail 86318 invoked by uid 500); 5 Dec 2014 00:24:30 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 86280 invoked by uid 500); 5 Dec 2014 00:24:30 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 86269 invoked by uid 99); 5 Dec 2014 00:24:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Dec 2014 00:24:30 +0000 X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_NONE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of unluckyboy@hotmail.com designates 65.55.90.212 as permitted sender) Received: from [65.55.90.212] (HELO SNT004-OMC4S9.hotmail.com) (65.55.90.212) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Dec 2014 00:24:03 +0000 Received: from SNT148-W50 ([65.55.90.201]) by SNT004-OMC4S9.hotmail.com over TLS secured channel with Microsoft SMTPSVC(7.5.7601.22751); Thu, 4 Dec 2014 16:23:19 -0800 X-TMN: [g9/4xzm0SlZUTJaY9mr2s/p6HXzhM66o/vUr9k5jUNg=] X-Originating-Email: [unluckyboy@hotmail.com] Message-ID: Content-Type: multipart/alternative; boundary="_a60cb258-9d7e-4671-87b5-7891eabc12d2_" From: Danny Morgan To: "user@crunch.apache.org" Subject: RE: Multiple Reduces in a Single Crunch Job Date: Fri, 5 Dec 2014 00:23:19 +0000 Importance: Normal In-Reply-To: References: , , ,, , ,, ,, MIME-Version: 1.0 X-OriginalArrivalTime: 05 Dec 2014 00:23:19.0489 (UTC) FILETIME=[ABBB5310:01D01021] X-Virus-Checked: Checked by ClamAV on apache.org --_a60cb258-9d7e-4671-87b5-7891eabc12d2_ Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Hi Josh=2C Sorry I mixed up pipelines there is no s3 write in this case. So you are correct the intermediate Avro file that's the output of the Seco= ndarySort is labeled "/tmp" I don't manually create this local file=2C the = crunch planner seems to insert that materialization phase in. If you refer = back to my original email the error I get is: "org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path do= es not exist: hdfs:///tmp/crunch-1279941375/p1" So the dot plan has the file labeled as "/tmp/crunch-*" however when the jo= b runs it's expecting to find an "hdfs:///tmp/crunch-*". Is this a labeling= issue with the plan output or might this be the bug? -Danny From: jwills@cloudera.com Date: Thu=2C 4 Dec 2014 15:44:37 -0800 Subject: Re: Multiple Reduces in a Single Crunch Job To: user@crunch.apache.org Hey Danny=2C Inlined. On Thu=2C Dec 4=2C 2014 at 3:20 PM=2C Danny Morgan = wrote: =0A= =0A= =0A= Hi Josh=2C Thanks for taking the time to look into this. I do get a PCollection and split it. I write the Avro obj= ects as parquet to HDFS and I get the String collection and write it out to= s3n://. I have noticed that the s3n:// targets copy their files to the loc= al filesystem's /tmp and then copy the file up to s3. This process happens = serially and is super slow=2C I'm not sure if it's a crunch issue or a gene= ral HDFS one. I'm not following=3B I'm referring to the second_phase.pdf plan file=2C whi= ch has a bunch of Avro inputs that are being merged together and secondary = sorted (some sort of sessionization=2C I assume) followed by a GBK/combineV= alues and then the write to Parquet. Where does the PCollection fit in? And is the S3 write part of the same Pipeline instance? I'm = wondering if the multiple FileSystems are confusing the planner w/respect t= o where it should create the temp file.=20 Let me know if I can help debug further=2C as I mentioned calling pipeline.= cache() and pipeline.run() between the reduces did solve my problem althoug= h I guess it is a hack. BTW Spotify's crunch-lib looks great=2C any integration plans? I also really like it and would like to incorporate basically all of it=3B = will start a thread on dev@ about it and see if David is up for it.=20 -Danny From: jwills@cloudera.com Date: Thu=2C 4 Dec 2014 14:21:55 -0800 Subject: Re: Multiple Reduces in a Single Crunch Job To: user@crunch.apache.org Danny=2C Spent a couple of hours today banging on this by hacking on some integratio= n tests but couldn't replicate it. However=2C I just took a closer look at = the plan you posted=2C and I noticed that all of the files you are writing = out are prefixed w/ "hdfs:/" except for the /tmp/crunch-* file that Crunch = is creating=3B is it possible that Crunch is creating the temp file locally= on your client machine for some reason? I can't think of why that would ha= ppen off the top of my head=2C but if that is the problem=2C I'll at least = be able to figure out where to look. Josh On Tue=2C Nov 25=2C 2014 at 6:30 PM=2C Danny Morgan wrote: =0A= =0A= =0A= No problem=2C Happy Thanksgiving! Gobble Gobble... From: jwills@cloudera.com Date: Tue=2C 25 Nov 2014 18:23:14 -0800 Subject: Re: Multiple Reduces in a Single Crunch Job To: user@crunch.apache.org Very useful-- thank you. Will dig into it and report back=2C although I'm h= eading out for the holiday so it likely won't be until early next week. J On Tue=2C Nov 25=2C 2014 at 6:16 PM=2C Danny Morgan wrote: =0A= =0A= =0A= =0A= =0A= =0A= Having a single pipeline in the application didn't fix it. Sticking a pipel= ine.run() in the middle also didn't matter either=2C the plan appears such = that the planner is completely ignoring the second the run() I added. However what DOES WORK is if I do: collection =3D secondarySort()pipeline.cache(collection)pipeline.run()newco= llection =3D collection.groupByKey() If I try adding the cache() without calling run() in between it doesn't wor= k. Hope that's enough info for you to fix the possible planner bug. Thanks for the help Josh! From: unluckyboy@hotmail.com To: user@crunch.apache.org Subject: RE: Multiple Reduces in a Single Crunch Job Date: Wed=2C 26 Nov 2014 01:58:11 +0000 =0A= =0A= =0A= I tried doing a Sample() instead of identity function=2C but that got fused= into the reduce as well and didn't work. First thing I tried was sticking a pipeline.run() in between there and I wa= s surprised but it didn't work either=2C same error. I'll rerun that config= now and try to get the dot files for the plan. Not sure if this is affecting it but in the same crunch application I have = a completely independent pipeline the runs before this one executes. I'll t= urn that off as well and see if it's causing the issue. From: jwills@cloudera.com Date: Tue=2C 25 Nov 2014 17:43:52 -0800 Subject: Re: Multiple Reduces in a Single Crunch Job To: user@crunch.apache.org Drat=2C I was hoping it was something simple. You could manually fix it by = injecting a pipeline.run() call between the secondarySort and the groupByKe= y()=2C but of course=2C we'd like to handle this situation correctly by def= ault. J On Tue=2C Nov 25=2C 2014 at 5:39 PM=2C Danny Morgan wrote: =0A= =0A= =0A= I did a parallelDo with the IdentityFn of the output of the secondarySort a= nd the IdentityFn was just fused into the reduce phase of the secondarySort= and I got the same error message. I think you want me to somehow force a map phase in between the two reduces= ? -Danny From: josh.wills@gmail.com Date: Tue=2C 25 Nov 2014 17:23:29 -0800 Subject: Re: Multiple Reduces in a Single Crunch Job To: user@crunch.apache.org Oh=2C dumb question-- if you put like a dummy function between the secondar= ySort and the groupByKey=2C like an IdentityFn or something=2C do things wo= rk again? That would help w/diagnosing the problem. On Tue=2C Nov 25=2C 2014 at 5:15 PM=2C Josh Wills wro= te: So if you're getting it quickly=2C it might be b/c the job isn't recognizin= g the dependency between the two separate phases of the job for some reason= (e.g.=2C it's not realizing that one job has to be run before the other on= e.) That's an odd situation=2C but we have had bugs like that in the past= =3B let me see if I can re-create the situation in an integration test. Whi= ch version of Crunch? J On Tue=2C Nov 25=2C 2014 at 4:40 PM=2C Danny Morgan wrote: =0A= =0A= =0A= No that's definitely not it. I get this issue if I write to a single output= as well. If I remove the groupByKey().combineValues() line and just write out the ou= tput from the SecondarySort it works. Seems to only complain about the temp= path not existing when I have multiple reduce phases in the pipeline. Also= the error seems to happen immediately during the setup or planning phase= =2C I assume this because the yarn jobs get created but they don't do anyth= ing=2C and instead of FAILED the error message is "Application killed by us= er." -Danny From: jwills@cloudera.com Date: Tue=2C 25 Nov 2014 16:30:58 -0800 Subject: Re: Multiple Reduces in a Single Crunch Job To: user@crunch.apache.org Ack=2C sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481 On Tue=2C Nov 25=2C 2014 at 4:24 PM=2C Danny Morgan wrote: =0A= =0A= =0A= Hello Again Josh=2C The link to the Jira issue you sent out seems to be cut off=2C could you pl= ease resend it? I deleted the line where I write the collection to a text file=2C and retri= ed it but it didn't work either. Also tried writing the collection out as A= vro instead of Parquet=2C but got the same error. Here's the rest of the stracktrace: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path doe= s not exist: hdfs:///tmp/crunch-2008950085/p1 at org.apache.hadoop.m= apreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285) = at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSpli= ts(CombineFileInputFormat.java:217) at org.apache.crunch.impl.mr.run= .CrunchInputFormat.getSplits(CrunchInputFormat.java:65) at org.apach= e.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491) = at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java= :508) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(= JobSubmitter.java:392) at org.apache.hadoop.mapreduce.Job$10.run(Job= .java:1268) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)= at java.security.AccessController.doPrivileged(Native Method) = at javax.security.auth.Subject.doAs(Subject.java:415) at org.apach= e.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)= at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265) at = org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submi= t(CrunchControlledJob.java:340) at org.apache.crunch.hadoop.mapreduc= e.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)= at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobContr= ol.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316) at org.ap= ache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113) = at org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55= ) at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java= :84) at java.lang.Thread.run(Thread.java:744) Thanks Josh! From: jwills@cloudera.com Date: Tue=2C 25 Nov 2014 16:10:33 -0800 Subject: Re: Multiple Reduces in a Single Crunch Job To: user@crunch.apache.org Hey Danny=2C I'm wondering if this is caused by https://issues.apache.org/jira/browse/CR= UNCH-481-- I think we use different output committers for text files vs. pa= rquet files=2C so at least one of the outputs won't be written properly-- d= oes that make sense? Josh On Tue=2C Nov 25=2C 2014 at 4:07 PM=2C Danny Morgan wrote: =0A= =0A= =0A= Hi Crunchers=2C I've attached a pdf of what my plan looks like. I've run into this problem = before where I have multiple reduce steps chained together in a single pipe= line and always get the same error. In the case of the attached pdf the error is "org.apache.hadoop.mapreduce.l= ib.input.InvalidInputException: Input path does not exist: hdfs:///tmp/crun= ch-1279941375/p1" That's the temp directory the crunch planner set up for the first reduce ph= ase. Can I run multiple chained reduces within the same pipeline? Do I have to m= anually write out the output from the first reduce? Here's what the code looks like: // Simple mapper PTable> first =3D D= anny.filterForDanny(logs)=3B // Secondary sort happens here PTabl= e second =3D Danny.extractDannys(first)=3B // Regular g= roup by PTable third =3D second.groupByKey().combineVal= ues(Aggregators.SUM_LONGS())=3B // simple function that populates some= fields in the Danny object with the aggregate results PCollection> done =3D Danny.finalize(third)=3B Pair=2C PCollection> splits =3D Channels.split(done)=3B spl= its.second().write(To.textFile(mypath=2C WriteMode.OVERWRITE)=3B Targe= t pq_danny =3D new AvroParquetFileTarget(pqPath))=3B splits.first().wr= ite(pq_danny=2C WriteMode.OVERWRITE) Thanks! -Danny =0A= --=20 Director of Data ScienceClouderaTwitter: @josh_wills=0A= =0A= --=20 Director of Data ScienceClouderaTwitter: @josh_wills=0A= =0A= --=20 Director of Data ScienceClouderaTwitter: @josh_wills=0A= =0A= =0A= --=20 Director of Data ScienceClouderaTwitter: @josh_wills=0A= =0A= =0A= --=20 Director of Data ScienceClouderaTwitter: @josh_wills=0A= =0A= --=20 Director of Data ScienceClouderaTwitter: @josh_wills=0A= =0A= --=20 Director of Data ScienceClouderaTwitter: @josh_wills=0A= = --_a60cb258-9d7e-4671-87b5-7891eabc12d2_ Content-Type: text/html; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable
Hi Josh=2C

Sorry I mixed = up pipelines there is no s3 write in this case.

So you are correct t= he intermediate Avro file that's the output of the SecondarySort is labeled= "/tmp" I don't manually create this local file=2C the crunch planner seems= to insert that materialization phase in. If you refer back to my original = email the error I get is:

"org.apache.hadoop.mapreduce.lib.input.InvalidInp= utException: Input path does not exist: hdfs:///tmp/crunch-1279941375/= p1"

So the dot plan ha= s the file labeled as "/tmp/crunch-*" however when the job runs it's expect= ing to find an "hdfs:///tmp/crunch-*". Is this a labeling issue with the pl= an output or might this be the bug?

-Danny


From: jwills@cloudera.com
Date: Thu=2C 4 Dec 20= 14 15:44:37 -0800
Subject: Re: Multiple Reduces in a Single Crunch JobTo: user@crunch.apache.org

Hey Danny=2C

<= /div>
Inlined.

On Thu=2C Dec 4=2C 2014 at 3:20 PM=2C Danny Morgan <=3B= unluckyboy@hotmail.com>=3B wrote:
=0A= =0A= =0A=
Hi Josh=2C

Thanks for taking the time to look = into this.

I do get a PCollection<=3BObject=2C String>=3B and sp= lit it. I write the Avro objects as parquet to HDFS and I get the String co= llection and write it out to s3n://. I have noticed that the s3n:// targets= copy their files to the local filesystem's /tmp and then copy the file up = to s3. This process happens serially and is super slow=2C I'm not sure if i= t's a crunch issue or a general HDFS one.
=
I'm not following=3B I'm referring to the second_phase.pdf p= lan file=2C which has a bunch of Avro inputs that are being merged together= and secondary sorted (some sort of sessionization=2C I assume) followed by= a GBK/combineValues and then the write to Parquet. Where does the PCollect= ion<=3BObject=2C String>=3B fit in? And is the S3 write part of the sam= e Pipeline instance? I'm wondering if the multiple FileSystems are confusin= g the planner w/respect to where it should create the temp file.
=  =3B

Let me know if= I can help debug further=2C as I mentioned calling pipeline.cache() and pi= peline.run() between the reduces did solve my problem although I guess it i= s a hack.

BTW Spotify's crunch-lib looks great=2C any integration pl= ans?

I also really like it = and would like to incorporate basically all of it=3B will start a thread on= dev@ about it and see if David is up for it.
 =3B

-Danny


From: jwills@cloudera.com
Date: Thu=2C 4 Dec 2014 14:21:55 -0800

Subje= ct: Re: Multiple Reduces in a Single Crunch Job
To:
user@crunch.apache.org

<= div dir=3D"ltr">Danny=2C

Spent a couple of hours today b= anging on this by hacking on some integration tests but couldn't replicate = it. However=2C I just took a closer look at the plan you posted=2C and I no= ticed that all of the files you are writing out are prefixed w/ "hdfs:/" ex= cept for the /tmp/crunch-* file that Crunch is creating=3B is it possible t= hat Crunch is creating the temp file locally on your client machine for som= e reason? I can't think of why that would happen off the top of my head=2C = but if that is the problem=2C I'll at least be able to figure out where to = look.

Josh


On Tue=2C Nov 25=2C 2014 at 6:30 PM=2C Danny Morgan <= =3Bunluckyboy@h= otmail.com>=3B wrote:
=0A= =0A= =0A=
No problem=2C Happy Thanksgiving!

= Gobble Gobble...


From: jwills@cloudera.com
Date: Tue=2C 25 Nov 2014 18= :23:14 -0800

Subject: Re: Multiple Reduces in a Single Crunch = Job
To: user= @crunch.apache.org

Very useful-- thank you. Wil= l dig into it and report back=2C although I'm heading out for the holiday s= o it likely won't be until early next week.

J

On Tue=2C Nov 25=2C 2014 at 6:16 PM=2C Danny Morgan <=3Bunluckyboy@hotmail.com>=3B wrote:
=0A= =0A= =0A=
=0A= =0A= =0A=
Having a single pipeline in the application didn't fi= x it. Sticking a pipeline.run() in the middle also didn't matter either=2C = the plan appears such that the planner is completely ignoring the second th= e run() I added.

However what DOES WORK is if I do= :

collection =3D secondarySort()
pipelin= e.cache(collection)
pipeline.run()
newcollection =3D co= llection.groupByKey()

If I try adding the cache() = without calling run() in between it doesn't work. Hope that's enough info f= or you to fix the possible planner bug.

Thanks for= the help Josh!


From: unluckyboy@hotmail.com
To: user@crunch.apache.or= g
Subject: RE: Multiple Reduces in a Single Crunch Job
Date: Wed= =2C 26 Nov 2014 01:58:11 +0000


=0A= =0A= =0A=
I tried doing a Sample() instead of identity function=2C b= ut that got fused into the reduce as well and didn't work.

First thing I tried was sticking a pipeline.run() in between there and I= was surprised but it didn't work either=2C same error. I'll rerun that con= fig now and try to get the dot files for the plan.

Not sure if this is affecting it but in the same crunch application I have= a completely independent pipeline the runs before this one executes. I'll = turn that off as well and see if it's causing the issue.


Fr= om: jwills@clouder= a.com
Date: Tue=2C 25 Nov 2014 17:43:52 -0800
Subject: Re: Multip= le Reduces in a Single Crunch Job
To: user@crunch.apache.org

Drat=2C I was hoping it was something simple. You could manually fix it b= y injecting a pipeline.run() call between the secondarySort and the groupBy= Key()=2C but of course=2C we'd like to handle this situation correctly by d= efault.

J

On Tue=2C Nov 25=2C 2= 014 at 5:39 PM=2C Danny Morgan <=3Bunluckyboy@hotmail.com>=3B wrote:
=0A= =0A= =0A=
I did a parallelDo with the IdentityFn of the output = of the secondarySort and the IdentityFn was just fused into the reduce phas= e of the secondarySort and I got the same error message.

I think you want me to somehow force a map phase in between the two reduce= s?

-Danny


From: josh.wills@gmail.com
Date: T= ue=2C 25 Nov 2014 17:23:29 -0800

Subject: Re: Multiple Reduces= in a Single Crunch Job
To: user@crunch.apache.org

Oh=2C du= mb question-- if you put like a dummy function between the secondarySort an= d the groupByKey=2C like an IdentityFn or something=2C do things work again= ? That would help w/diagnosing the problem.

On Tue=2C No= v 25=2C 2014 at 5:15 PM=2C Josh Wills <=3Bjwills@cloudera.com>=3B wrote:
So if you're getting it quickly=2C it might be= b/c the job isn't recognizing the dependency between the two separate phas= es of the job for some reason (e.g.=2C it's not realizing that one job has = to be run before the other one.) That's an odd situation=2C but we have had= bugs like that in the past=3B let me see if I can re-create the situation = in an integration test. Which version of Crunch?

J

O= n Tue=2C Nov 25=2C 2014 at 4:40 PM=2C Danny Morgan <=3B= unluckyboy@hotm= ail.com>=3B wrote:
=0A= =0A= =0A=
No that's definitely not it. I get this issue if I wr= ite to a single output as well.

If I remove the =3Bg= roupByKey().combineValues() line and just write out the output from the Sec= ondarySort it works. Seems to only complain about the temp path not existin= g when I have multiple reduce phases in the pipeline. Also the error seems = to happen immediately during the setup or planning phase=2C I assume this b= ecause the yarn jobs get created but they don't do anything=2C and instead = of FAILED the error message is "Application killed by user."

=
-Danny


From: jwills@cloudera.com
Date: Tue=2C 25 Nov 2014 1= 6:30:58 -0800

Subject: Re: Multiple Reduces in a Single Crunch= Job
To: use= r@crunch.apache.org

Ack=2C sorry-- it's this:&n= bsp=3Bhttps://issues.apache.org/jira/browse/CRUNCH-481

On Tue=2C Nov 25=2C 2014 at 4:24 PM=2C Danny Morgan <=3Bunlu= ckyboy@hotmail.com>=3B wrote:
=0A= =0A= =0A=
Hello Again Josh=2C

The link to th= e Jira issue you sent out seems to be cut off=2C could you please resend it= ?

I deleted the line where I write the collection = to a text file=2C and retried it but it didn't work either. Also tried writ= ing the collection out as Avro instead of Parquet=2C but got the same error= .

Here's the rest of the stracktrace:
org.apache.hadoop.mapreduce.lib.input.InvalidInputExcepti= on: Input path does not exist: hdfs:///tmp/crunch-2008950085/p1
&= nbsp=3B  =3B  =3B  =3B at org.apache.hadoop.mapreduce.lib.input= .FileInputFormat.listStatus(FileInputFormat.java:285)
 =3B &n= bsp=3B  =3B  =3B at org.apache.hadoop.mapreduce.lib.input.CombineFi= leInputFormat.getSplits(CombineFileInputFormat.java:217)
 =3B=  =3B  =3B  =3B at org.apache.crunch.impl.mr.run.CrunchInputFor= mat.getSplits(CrunchInputFormat.java:65)
 =3B  =3B  = =3B  =3B at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(Job= Submitter.java:491)
 =3B  =3B  =3B  =3B at org.ap= ache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)
=
 =3B  =3B  =3B  =3B at org.apache.hadoop.mapreduce.Job= Submitter.submitJobInternal(JobSubmitter.java:392)
 =3B  = =3B  =3B  =3B at org.apache.hadoop.mapreduce.Job$10.run(Job.java:12= 68)
 =3B  =3B  =3B  =3B at org.apache.hadoop.mapr= educe.Job$10.run(Job.java:1265)
 =3B  =3B  =3B  = =3B at java.security.AccessController.doPrivileged(Native Method)
 =3B  =3B  =3B  =3B at javax.security.auth.Subject.doAs(Su= bject.java:415)
 =3B  =3B  =3B  =3B at org.apache= .hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)<= /div>
 =3B  =3B  =3B  =3B at org.apache.hadoop.mapreduc= e.Job.submit(Job.java:1265)
 =3B  =3B  =3B  =3B a= t org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.sub= mit(CrunchControlledJob.java:340)
 =3B  =3B  =3B &nbs= p=3B at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.= startReadyJobs(CrunchJobControl.java:277)
 =3B  =3B  = =3B  =3B at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJob= Control.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)
&= nbsp=3B  =3B  =3B  =3B at org.apache.crunch.impl.mr.exec.MRExec= utor.monitorLoop(MRExecutor.java:113)
 =3B  =3B  =3B =  =3B at org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor= .java:55)
 =3B  =3B  =3B  =3B at org.apache.crunc= h.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)
 =3B &nbs= p=3B  =3B  =3B at java.lang.Thread.run(Thread.java:744)
<= br>
Thanks Josh!


From: jwills@cloudera.com
Date: Tue=2C 25= Nov 2014 16:10:33 -0800
Subject: Re: Multiple Reduces in a Single Crunc= h Job
To: us= er@crunch.apache.org


Hey Danny=2C
I'm wondering if this is caused by =3Bhttps://iss= ues.apache.org/jira/browse/CRUNCH-481-- I think we use different output= committers for text files vs. parquet files=2C so at least one of the outp= uts won't be written properly-- does that make sense?

<= div>Josh

On Tue=2C Nov 25=2C 2014 at 4:07 PM=2C Da= nny Morgan <=3Bunluckyboy@hotmail.com>=3B wrote:
=0A= =0A= =0A=
Hi Crunchers=2C

I've attached a pd= f of what my plan looks like. I've run into this problem before where I hav= e multiple reduce steps chained together in a single pipeline and always ge= t the same error.

In the case of the attached pdf the error is "org.apache.had= oop.mapreduce.lib.input.InvalidInputException: Input path does not exist: h= dfs:///tmp/crunch-1279941375/p1"
<= span style=3D"text-align:center=3B">
That's the tem= p directory the crunch planner set up for the first reduce phase.

Can I run multiple chained reduces within the same pipeline? Do I have to= manually write out the output from the first reduce?

=
Here's wha= t the code looks like:

 =3B  =3B  =3B // Simple mapper
 =3B  =3B  =3B PTable<=3BStri= ng=2C Pair<=3BLong=2C Log>=3B>=3B first =3D Danny.filterForDanny(logs= )=3B
 =3B  =3B  =3B // S= econdary sort happens here
 =3B =  =3B  =3B PTable<=3BDanny=2C Long>=3B second =3D Danny.extractD= annys(first)=3B
 =3B  =3B &n= bsp=3B // Regular group by
 =3B =  =3B  =3B PTable<=3BDanny=2C Long>=3B third =3D second.groupByK= ey().combineValues(Aggregators.SUM_LONGS())=3B
 =3B  =3B  =3B // simple function that populates som= e fields in the Danny object with the aggregate results
 =3B  =3B  =3B PCollection<=3BPair<=3BD= anny=2C String>=3B>=3B done =3D Danny.finalize(third)=3B
 =3B  =3B  =3B Pair<=3BPCollection&l= t=3BDanny>=3B=2C PCollection<=3BString>=3B>=3B splits =3D Channels.= split(done)=3B
 =3B  =3B &nb= sp=3B splits.second().write(To.textFile(mypath=2C WriteMode.OVERWRITE)=3B
 =3B  =3B  =3B Target pq_= danny =3D new AvroParquetFileTarget(pqPath))=3B
 =3B  =3B  =3B splits.first().write(pq_danny=2C Wri= teMode.OVERWRITE)

Thanks!

-Danny
= =0A=



--
Di= rector of Data Science
Twitter: @josh_wills
=0A=
=0A=



--
Di= rector of Data Science
Twitter: @josh_wills
=0A=
=0A=



--
Di= rector of Data Science
Twitter: @josh_wills
=0A=
=0A=

=
=0A=



--
Di= rector of Data Science
Twitter: @josh_wills
=0A=
=0A= =0A=


--
Di= rector of Data Science
Twitter: @josh_wills
=0A= =0A=


--
Di= rector of Data Science
Twitter: @josh_wills
=0A= =0A=


--
Director of Data Science
Twitter: = @josh_wills=
=0A= = --_a60cb258-9d7e-4671-87b5-7891eabc12d2_--