crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Whitacre <>
Subject Re: A different strategy for file-per-key output
Date Wed, 25 Nov 2015 13:44:19 GMT
So with the exception of sequence files and protobufs I think what you are
trying to achieve is very similar to what Kite does with Partitioned
Datasets[1].  Once again gets you back to Avro though.  It's API is a
little simpler but maybe something to emulate if we wanted to build in
similar functionality in the Crunch or for a sequence file per key option.

For when you are writing the output, because you are using time does that
then guarantee the uniqueness of the output?  Just wondering because with
the rename you might have odd behavior if there are collisions.

[1] -

On Wed, Nov 25, 2015 at 6:02 AM, David Whiting <>

> I have the requirement at the moment to write out event data split into a
> directory tree by the category and time of the event (something I think is
> a fairly common use case). There is already support for this in Crunch the
> Avro world with the AvroFilePerKeyTarget, but we're using protobufs in
> sequence files round here. After investigating the built-in Hadoop
> MultipleOutputs mechanism, I decided it was a) very non-intuitive to use b)
> difficult to ensure atomicity and c) difficult to integrate with Crunch.
> What I did instead was to open the files and write them from within the
> reduce tasks themselves and output from the reduce tasks simply a
> Pair<String, String> of temporary filename and final filename. That way, if
> any part fails you won't have any output. After it completes, you simply
> materialize the output and perform a series of HDFS move operations to move
> the files into their final location. If something fails, you can delete all
> the temporary files and start again.
> Usage looks a bit like this:
> private Map<String, String> splitEventsToTemporaryDirectory(
>         Path temporaryDirectory,
>         PTable<String, Pair<Long, ByteBuffer>> events) {
>     // Final output files will be written to
> hdfs://{keyValue}/{leafFilename}
>     String leafFilename = "run_" +
>     return events
>             .groupByKey()
>             .parallelDo(
>                     new SequenceFilePerKeyOutputFn<>(
>                             temporaryDirectory.toString(),
>                             leafFilename,
>                             longs(), bytes()),
>                     tableOf(strings(), strings()))
>             .materializeToMap();
> }
> private void moveFilesToFinalLocation(Map<String, String>
> temporaryFinalPaths) throws IOException {
>     for (Map.Entry<String, String> temporaryFinalPath :
> temporaryFinalPaths.entrySet()) {
>         Path temporaryPath = new Path(temporaryFinalPath.getKey());
>         Path finalPath = new Path(temporaryFinalPath.getValue());
>         info("Renaming " + temporaryPath + " to " + finalPath);
>         fs.mkdirs(finalPath.getParent());
>         fs.rename(temporaryPath, finalPath);
>     }
> }
> This is all working nicely in production at the moment, but I have 2
> follow-up questions for the Crunch community:
> * Is this a sane thing to do? Will it cause any problems that I haven't
> thought about.
> * Would this be a useful thing to contribute to Crunch itself? If so, what
> would the API look like?

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message