crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Danny Morgan <>
Subject RE: Multiple Writes in a single pipeline.
Date Fri, 22 Aug 2014 14:26:44 GMT
This issue looks similar to

It turns out even if I get rid of the reduce phase and do just this:

 PTable<String, String> lines =;
 PCollection<Log> parsed = lines.parallelDo("initial-parsing", new myParser(), Avros.specifics(Log.class));

 PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing",
new VisitsExtractor(),
          Avros.tableOf(Avros.specifics(Visit.class), Avros.pairs(Avros.longs(), Avros.longs())));

visits.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);

The plan shows I should be writing to two different targets in a single map phase however
only "/raw" as data written out to it and "/visits" just contains a _SUCCESS file and no data.

Might this be an issue writing out to two different Avro types in the same phase?

Thanks Again,


Subject: RE: Multiple Writes in a single pipeline.
Date: Fri, 22 Aug 2014 02:02:20 +0000

Hi Josh,

Date: Thu, 21 Aug 2014 17:40:25 -0700
Subject: Re: Multiple Writes in a single pipeline.
The two different executions you have are doing different things, however. In the first one,
Crunch is running a single MapReduce job where the /raw directory is written as a mapper side-output,
and the /visits directory is being written out on the reduce side (or at least, should be--
is there any evidence of a failure in the job in the logs? Are bytes being written out from
the reducer?)

No evidence of any failures in the logs, the single mapper and reducers both succeed. The
mapper definitely writes to HDFS the reducer does not, here are the relevant counters from
the reducer:

FILE: Number of bytes read6FILE: Number of bytes written91811FILE: Number of large read operations0FILE:
Number of read operations0FILE: Number of write operations0HDFS: Number of bytes read6205HDFS:
Number of bytes written0HDFS: Number of large read operations0HDFS: Number of read operations4HDFS:
Number of write operations2
I couldn't find anything related on the crunch jira.

For this problem, I think it would be more efficient to write the parsed -> /raw output
first, call run(), then do the agg -> /visits output followed by done(), which would mean
that you would only need to parse the raw input once, instead of twice.

Would the first option be more efficient if it worked?

A helpful trick for seeing how the Crunch planner is mapping your logic into MapReduce jobs
is to look at the plan dot file via one of the following mechanisms:
1) Instead of calling, call Pipeline.runAsync() and then call the getPlanDotFile()
method on the returned PipelineExecution object. You can print the dot file to a file and
use a dot file viewer to look at how the DoFns are broken up into MR jobs and map/reduce phases.

2) Call MRPipeline.plan() directly, which returns a MRExecutor object that also implements
PipelineExecution. (The difference being that calling MRPipeline.plan will not start the jobs
running, whereas calling runAsync will.)
I ran the two different version through dot and you're right they are two complete different
executions, pretty cool!


View raw message