crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <>
Subject Re: Multiple Writes in a single pipeline.
Date Fri, 22 Aug 2014 00:40:25 GMT
You can have multiple outputs in a single pipeline-- that shouldn't be a

The two different executions you have are doing different things, however.
In the first one, Crunch is running a single MapReduce job where the /raw
directory is written as a mapper side-output, and the /visits directory is
being written out on the reduce side (or at least, should be-- is there any
evidence of a failure in the job in the logs? Are bytes being written out
from the reducer?)

The second execution, where you call run(), parsed.write(), and then
done(), is running two distinct MR jobs: one MapReduce job that does the
aggregation and writes the /visits table, and then a second map-only job
that re-processes the input data and generates the /raw directory.

For this problem, I think it would be more efficient to write the parsed ->
/raw output first, call run(), then do the agg -> /visits output followed
by done(), which would mean that you would only need to parse the raw input
once, instead of twice.

A helpful trick for seeing how the Crunch planner is mapping your logic
into MapReduce jobs is to look at the plan dot file via one of the
following mechanisms:

1) Instead of calling, call Pipeline.runAsync() and then
call the getPlanDotFile() method on the returned PipelineExecution object.
You can print the dot file to a file and use a dot file viewer to look at
how the DoFns are broken up into MR jobs and map/reduce phases.
2) Call MRPipeline.plan() directly, which returns a MRExecutor object that
also implements PipelineExecution. (The difference being that calling
MRPipeline.plan will not start the jobs running, whereas calling runAsync


On Thu, Aug 21, 2014 at 5:19 PM, Danny Morgan <>

> I seem to be misunderstanding write() in pipelines somehow.
> MyClass extends CrunchTool {
>  PTable<String, String> lines =;
>  PCollection<Log> parsed = lines.parallelDo("initial-parsing", new
> myParser(), Avros.specifics(Log.class));
>  PTable<Visit, Pair<Long, Long>> visits =
> parsed.parallelDo("visits-parsing", new VisitsExtractor(),
>           Avros.tableOf(Avros.specifics(Visit.class),
> Avros.pairs(Avros.longs(), Avros.longs())));
> PTable<Visit, Pair<Long, Long>> agg =
> visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
> Aggregators.MIN_LONGS()));
> agg.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
> parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);
> this.done();
> So this runs fine however at the end of it all the "/raw" contains:
> part-m-00000.avro
> but the "/visits" directory is missing its avro files:
> Now if I change the above code to this:
> agg.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
> parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);
> this.done();
> The change being when I call both directories now contain their
> respective avro files and _SUCCESS files.
> What's going on? Can I have multiple writes in a single pipeline? Would I
> ever call run() on a pipeline more than once? How does the order of when I
> call run() matter especially since I try to write to "/visits" first?
> BTW My crunch version is crunch-0.10.0-hadoop2
> Thanks!

Director of Data Science
Cloudera <>
Twitter: @josh_wills <>

View raw message