crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Danny Morgan <>
Subject Multiple Writes in a single pipeline.
Date Fri, 22 Aug 2014 00:19:14 GMT
I seem to be misunderstanding write() in pipelines somehow.

MyClass extends CrunchTool {

 PTable<String, String> lines =;
 PCollection<Log> parsed = lines.parallelDo("initial-parsing", new myParser(), Avros.specifics(Log.class));

 PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing",
new VisitsExtractor(),
          Avros.tableOf(Avros.specifics(Visit.class), Avros.pairs(Avros.longs(), Avros.longs())));
PTable<Visit, Pair<Long, Long>> agg = visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),

agg.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);;

So this runs fine however at the end of it all the "/raw" contains:

but the "/visits" directory is missing its avro files:

Now if I change the above code to this:
agg.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);;
parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);

The change being when I call both directories now contain their respective avro
files and _SUCCESS files.

What's going on? Can I have multiple writes in a single pipeline? Would I ever call run()
on a pipeline more than once? How does the order of when I call run() matter especially since
I try to write to "/visits" first?

BTW My crunch version is crunch-0.10.0-hadoop2

View raw message