crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <josh.wi...@gmail.com>
Subject Re: Multiple Reduces in a Single Crunch Job
Date Fri, 05 Dec 2014 00:59:00 GMT
I think it's a bug, or at least, a configuration issue. When you construct
the MRPipeline, are you explicitly passing it a Configuration object?
On Thu, Dec 4, 2014 at 6:24 PM Danny Morgan <unluckyboy@hotmail.com> wrote:

> Hi Josh,
>
> Sorry I mixed up pipelines there is no s3 write in this case.
>
> So you are correct the intermediate Avro file that's the output of the
> SecondarySort is labeled "/tmp" I don't manually create this local file,
> the crunch planner seems to insert that materialization phase in. If you
> refer back to my original email the error I get is:
>
> "org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path
> does not exist: hdfs:///tmp/crunch-1279941375/p1
> "
>
> So the dot plan has the file labeled as "/tmp/crunch-*" however when the
> job runs it's expecting to find an "hdfs:///tmp/crunch-*". Is this a
> labeling issue with the plan output or might this be the bug?
>
> -Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Thu, 4 Dec 2014 15:44:37 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Hey Danny,
>
> Inlined.
>
> On Thu, Dec 4, 2014 at 3:20 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> Hi Josh,
>
> Thanks for taking the time to look into this.
>
> I do get a PCollection<Object, String> and split it. I write the Avro
> objects as parquet to HDFS and I get the String collection and write it out
> to s3n://. I have noticed that the s3n:// targets copy their files to the
> local filesystem's /tmp and then copy the file up to s3. This process
> happens serially and is super slow, I'm not sure if it's a crunch issue or
> a general HDFS one.
>
>
> I'm not following; I'm referring to the second_phase.pdf plan file, which
> has a bunch of Avro inputs that are being merged together and secondary
> sorted (some sort of sessionization, I assume) followed by a
> GBK/combineValues and then the write to Parquet. Where does the
> PCollection<Object, String> fit in? And is the S3 write part of the same
> Pipeline instance? I'm wondering if the multiple FileSystems are confusing
> the planner w/respect to where it should create the temp file.
>
>
>
> Let me know if I can help debug further, as I mentioned calling
> pipeline.cache() and pipeline.run() between the reduces did solve my
> problem although I guess it is a hack.
>
> BTW Spotify's crunch-lib looks great, any integration plans?
>
>
> I also really like it and would like to incorporate basically all of it;
> will start a thread on dev@ about it and see if David is up for it.
>
>
>
> -Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Thu, 4 Dec 2014 14:21:55 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Danny,
>
> Spent a couple of hours today banging on this by hacking on some
> integration tests but couldn't replicate it. However, I just took a closer
> look at the plan you posted, and I noticed that all of the files you are
> writing out are prefixed w/ "hdfs:/" except for the /tmp/crunch-* file that
> Crunch is creating; is it possible that Crunch is creating the temp file
> locally on your client machine for some reason? I can't think of why that
> would happen off the top of my head, but if that is the problem, I'll at
> least be able to figure out where to look.
>
> Josh
>
>
> On Tue, Nov 25, 2014 at 6:30 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> No problem, Happy Thanksgiving!
>
> Gobble Gobble...
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 18:23:14 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Very useful-- thank you. Will dig into it and report back, although I'm
> heading out for the holiday so it likely won't be until early next week.
>
> J
>
> On Tue, Nov 25, 2014 at 6:16 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
>  Having a single pipeline in the application didn't fix it. Sticking a
> pipeline.run() in the middle also didn't matter either, the plan appears
> such that the planner is completely ignoring the second the run() I added.
>
> However what DOES WORK is if I do:
>
> collection = secondarySort()
> pipeline.cache(collection)
> pipeline.run()
> newcollection = collection.groupByKey()
>
> If I try adding the cache() without calling run() in between it doesn't
> work. Hope that's enough info for you to fix the possible planner bug.
>
> Thanks for the help Josh!
>
> ------------------------------
> From: unluckyboy@hotmail.com
> To: user@crunch.apache.org
> Subject: RE: Multiple Reduces in a Single Crunch Job
> Date: Wed, 26 Nov 2014 01:58:11 +0000
>
>
> I tried doing a Sample() instead of identity function, but that got fused
> into the reduce as well and didn't work.
>
> First thing I tried was sticking a pipeline.run() in between there and I
> was surprised but it didn't work either, same error. I'll rerun that config
> now and try to get the dot files for the plan.
>
> Not sure if this is affecting it but in the same crunch application I have
> a completely independent pipeline the runs before this one executes. I'll
> turn that off as well and see if it's causing the issue.
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 17:43:52 -0800
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Drat, I was hoping it was something simple. You could manually fix it by
> injecting a pipeline.run() call between the secondarySort and the
> groupByKey(), but of course, we'd like to handle this situation correctly
> by default.
>
> J
>
> On Tue, Nov 25, 2014 at 5:39 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> I did a parallelDo with the IdentityFn of the output of the secondarySort
> and the IdentityFn was just fused into the reduce phase of the
> secondarySort and I got the same error message.
>
> I think you want me to somehow force a map phase in between the two
> reduces?
>
> -Danny
>
> ------------------------------
> From: josh.wills@gmail.com
> Date: Tue, 25 Nov 2014 17:23:29 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Oh, dumb question-- if you put like a dummy function between the
> secondarySort and the groupByKey, like an IdentityFn or something, do
> things work again? That would help w/diagnosing the problem.
>
> On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <jwills@cloudera.com> wrote:
>
> So if you're getting it quickly, it might be b/c the job isn't recognizing
> the dependency between the two separate phases of the job for some reason
> (e.g., it's not realizing that one job has to be run before the other one.)
> That's an odd situation, but we have had bugs like that in the past; let me
> see if I can re-create the situation in an integration test. Which version
> of Crunch?
>
> J
>
> On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> No that's definitely not it. I get this issue if I write to a single
> output as well.
>
> If I remove the groupByKey().combineValues() line and just write out the
> output from the SecondarySort it works. Seems to only complain about the
> temp path not existing when I have multiple reduce phases in the pipeline.
> Also the error seems to happen immediately during the setup or planning
> phase, I assume this because the yarn jobs get created but they don't do
> anything, and instead of FAILED the error message is "Application killed by
> user."
>
> -Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 16:30:58 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Ack, sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481
> <https://issues.apache.org/jira/browse/CRUNCH-481-->
>
> On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> Hello Again Josh,
>
> The link to the Jira issue you sent out seems to be cut off, could you
> please resend it?
>
> I deleted the line where I write the collection to a text file, and
> retried it but it didn't work either. Also tried writing the collection out
> as Avro instead of Parquet, but got the same error.
>
> Here's the rest of the stracktrace:
>
> org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path
> does not exist: hdfs:///tmp/crunch-2008950085/p1
>         at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
>         at
> org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)
>         at
> org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)
>         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
>         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)
>         at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)
>         at java.lang.Thread.run(Thread.java:744)
>
> Thanks Josh!
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 16:10:33 -0800
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
>
> Hey Danny,
>
> I'm wondering if this is caused by
> https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use
> different output committers for text files vs. parquet files, so at least
> one of the outputs won't be written properly-- does that make sense?
>
> Josh
>
> On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> Hi Crunchers,
>
> I've attached a pdf of what my plan looks like. I've run into this problem
> before where I have multiple reduce steps chained together in a single
> pipeline and always get the same error.
>
> In the case of the attached pdf the error is "org.apache.hadoop.mapreduce.lib.input.InvalidInputException:
> Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"
>
> That's the temp directory the crunch planner set up for the first reduce
> phase.
>
> Can I run multiple chained reduces within the same pipeline? Do I have to
> manually write out the output from the first reduce?
>
> Here's what the code looks like:
>
>       // Simple mapper
>       PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs);
>       // Secondary sort happens here
>       PTable<Danny, Long> second = Danny.extractDannys(first);
>       // Regular group by
>       PTable<Danny, Long> third =
> second.groupByKey().combineValues(Aggregators.SUM_LONGS());
>       // simple function that populates some fields in the Danny object
> with the aggregate results
>       PCollection<Pair<Danny, String>> done = Danny.finalize(third);
>       Pair<PCollection<Danny>, PCollection<String>> splits =
> Channels.split(done);
>       splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE);
>       Target pq_danny = new AvroParquetFileTarget(pqPath));
>       splits.first().write(pq_danny, WriteMode.OVERWRITE)
>
> Thanks!
>
> -Danny
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Mime
View raw message