crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <josh.wi...@gmail.com>
Subject Re: Multiple Reduces in a Single Crunch Job
Date Wed, 26 Nov 2014 01:23:29 GMT
Oh, dumb question-- if you put like a dummy function between the
secondarySort and the groupByKey, like an IdentityFn or something, do
things work again? That would help w/diagnosing the problem.

On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <jwills@cloudera.com> wrote:

> So if you're getting it quickly, it might be b/c the job isn't recognizing
> the dependency between the two separate phases of the job for some reason
> (e.g., it's not realizing that one job has to be run before the other one.)
> That's an odd situation, but we have had bugs like that in the past; let me
> see if I can re-create the situation in an integration test. Which version
> of Crunch?
>
> J
>
> On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
>> No that's definitely not it. I get this issue if I write to a single
>> output as well.
>>
>> If I remove the groupByKey().combineValues() line and just write out the
>> output from the SecondarySort it works. Seems to only complain about the
>> temp path not existing when I have multiple reduce phases in the pipeline.
>> Also the error seems to happen immediately during the setup or planning
>> phase, I assume this because the yarn jobs get created but they don't do
>> anything, and instead of FAILED the error message is "Application killed by
>> user."
>>
>> -Danny
>>
>> ------------------------------
>> From: jwills@cloudera.com
>> Date: Tue, 25 Nov 2014 16:30:58 -0800
>>
>> Subject: Re: Multiple Reduces in a Single Crunch Job
>> To: user@crunch.apache.org
>>
>> Ack, sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481
>> <https://issues.apache.org/jira/browse/CRUNCH-481-->
>>
>> On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <unluckyboy@hotmail.com>
>> wrote:
>>
>> Hello Again Josh,
>>
>> The link to the Jira issue you sent out seems to be cut off, could you
>> please resend it?
>>
>> I deleted the line where I write the collection to a text file, and
>> retried it but it didn't work either. Also tried writing the collection out
>> as Avro instead of Parquet, but got the same error.
>>
>> Here's the rest of the stracktrace:
>>
>> org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path
>> does not exist: hdfs:///tmp/crunch-2008950085/p1
>>         at
>> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
>>         at
>> org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)
>>         at
>> org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65)
>>         at
>> org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)
>>         at
>> org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)
>>         at
>> org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)
>>         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
>>         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)
>>         at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
>>         at
>> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)
>>         at
>> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)
>>         at
>> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)
>>         at
>> org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)
>>         at
>> org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
>>         at
>> org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Thanks Josh!
>>
>> ------------------------------
>> From: jwills@cloudera.com
>> Date: Tue, 25 Nov 2014 16:10:33 -0800
>> Subject: Re: Multiple Reduces in a Single Crunch Job
>> To: user@crunch.apache.org
>>
>>
>> Hey Danny,
>>
>> I'm wondering if this is caused by
>> https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use
>> different output committers for text files vs. parquet files, so at least
>> one of the outputs won't be written properly-- does that make sense?
>>
>> Josh
>>
>> On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <unluckyboy@hotmail.com>
>> wrote:
>>
>> Hi Crunchers,
>>
>> I've attached a pdf of what my plan looks like. I've run into this
>> problem before where I have multiple reduce steps chained together in a
>> single pipeline and always get the same error.
>>
>> In the case of the attached pdf the error is "org.apache.hadoop.mapreduce.lib.input.InvalidInputException:
>> Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"
>>
>> That's the temp directory the crunch planner set up for the first reduce
>> phase.
>>
>> Can I run multiple chained reduces within the same pipeline? Do I have to
>> manually write out the output from the first reduce?
>>
>> Here's what the code looks like:
>>
>>       // Simple mapper
>>       PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs);
>>       // Secondary sort happens here
>>       PTable<Danny, Long> second = Danny.extractDannys(first);
>>       // Regular group by
>>       PTable<Danny, Long> third =
>> second.groupByKey().combineValues(Aggregators.SUM_LONGS());
>>       // simple function that populates some fields in the Danny object
>> with the aggregate results
>>       PCollection<Pair<Danny, String>> done = Danny.finalize(third);
>>       Pair<PCollection<Danny>, PCollection<String>> splits =
>> Channels.split(done);
>>       splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE);
>>       Target pq_danny = new AvroParquetFileTarget(pqPath));
>>       splits.first().write(pq_danny, WriteMode.OVERWRITE)
>>
>> Thanks!
>>
>> -Danny
>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Mime
View raw message