From Danny Morgan <>
Subject RE: Multiple Reduces in a Single Crunch Job
Date Wed, 26 Nov 2014 01:58:11 GMT
I tried doing a Sample() instead of identity function, but that got fused into the reduce as
well and didn't work.
First thing I tried was sticking a in between there and I was surprised but
it didn't work either, same error. I'll rerun that config now and try to get the dot files
for the plan.
Not sure if this is affecting it but in the same crunch application I have a completely independent
pipeline the runs before this one executes. I'll turn that off as well and see if it's causing
the issue.

Date: Tue, 25 Nov 2014 17:43:52 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job

Drat, I was hoping it was something simple. You could manually fix it by injecting a
call between the secondarySort and the groupByKey(), but of course, we'd like to handle this
situation correctly by default.
On Tue, Nov 25, 2014 at 5:39 PM, Danny Morgan <> wrote:

I did a parallelDo with the IdentityFn of the output of the secondarySort and the IdentityFn
was just fused into the reduce phase of the secondarySort and I got the same error message.
I think you want me to somehow force a map phase in between the two reduces?

Date: Tue, 25 Nov 2014 17:23:29 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job

Oh, dumb question-- if you put like a dummy function between the secondarySort and the groupByKey,
like an IdentityFn or something, do things work again? That would help w/diagnosing the problem.
On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <> wrote:
So if you're getting it quickly, it might be b/c the job isn't recognizing the dependency
between the two separate phases of the job for some reason (e.g., it's not realizing that
one job has to be run before the other one.) That's an odd situation, but we have had bugs
like that in the past; let me see if I can re-create the situation in an integration test.
Which version of Crunch?
On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <> wrote:

No that's definitely not it. I get this issue if I write to a single output as well.
If I remove the groupByKey().combineValues() line and just write out the output from the SecondarySort
it works. Seems to only complain about the temp path not existing when I have multiple reduce
phases in the pipeline. Also the error seems to happen immediately during the setup or planning
phase, I assume this because the yarn jobs get created but they don't do anything, and instead
of FAILED the error message is "Application killed by user."

Date: Tue, 25 Nov 2014 16:30:58 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job

Ack, sorry-- it's this:
On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <> wrote:

Hello Again Josh,
The link to the Jira issue you sent out seems to be cut off, could you please resend it?
I deleted the line where I write the collection to a text file, and retried it but it didn't
work either. Also tried writing the collection out as Avro instead of Parquet, but got the
same error.
Here's the rest of the stracktrace:
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs:///tmp/crunch-2008950085/p1
       at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(
       at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(
       at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits( 
      at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(     
  at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(   
    at org.apache.hadoop.mapreduce.Job$        at org.apache.hadoop.mapreduce.Job$
       at Method)        at
       at org.apache.hadoop.mapreduce.Job.submit(        at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(
       at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(
       at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(
  at$000(        at$
Thanks Josh!
Date: Tue, 25 Nov 2014 16:10:33 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job

Hey Danny,
I'm wondering if this is caused by I think
we use different output committers for text files vs. parquet files, so at least one of the
outputs won't be written properly-- does that make sense?
On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <> wrote:

Hi Crunchers,
I've attached a pdf of what my plan looks like. I've run into this problem before where I
have multiple reduce steps chained together in a single pipeline and always get the same error.
In the case of the attached pdf the error is "org.apache.hadoop.mapreduce.lib.input.InvalidInputException:
Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"
That's the temp directory the crunch planner set up for the first reduce phase.
Can I run multiple chained reduces within the same pipeline? Do I have to manually write out
the output from the first reduce?
Here's what the code looks like:
      // Simple mapper      PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs);
     // Secondary sort happens here      PTable<Danny, Long> second = Danny.extractDannys(first);
     // Regular group by      PTable<Danny, Long> third = second.groupByKey().combineValues(Aggregators.SUM_LONGS());
     // simple function that populates some fields in the Danny object with the aggregate
results      PCollection<Pair<Danny, String>> done = Danny.finalize(third);  
   Pair<PCollection<Danny>, PCollection<String>> splits = Channels.split(done);
     splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE);      Target pq_danny
= new AvroParquetFileTarget(pqPath));      splits.first().write(pq_danny, WriteMode.OVERWRITE)

