crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Whiting <davidwhit...@gmail.com>
Subject A different strategy for file-per-key output
Date Wed, 25 Nov 2015 12:02:21 GMT
I have the requirement at the moment to write out event data split into a
directory tree by the category and time of the event (something I think is
a fairly common use case). There is already support for this in Crunch the
Avro world with the AvroFilePerKeyTarget, but we're using protobufs in
sequence files round here. After investigating the built-in Hadoop
MultipleOutputs mechanism, I decided it was a) very non-intuitive to use b)
difficult to ensure atomicity and c) difficult to integrate with Crunch.
What I did instead was to open the files and write them from within the
reduce tasks themselves and output from the reduce tasks simply a
Pair<String, String> of temporary filename and final filename. That way, if
any part fails you won't have any output. After it completes, you simply
materialize the output and perform a series of HDFS move operations to move
the files into their final location. If something fails, you can delete all
the temporary files and start again.

Usage looks a bit like this:

private Map<String, String> splitEventsToTemporaryDirectory(
        Path temporaryDirectory,
        PTable<String, Pair<Long, ByteBuffer>> events) {

    // Final output files will be written to
hdfs://{keyValue}/{leafFilename}
    String leafFilename = "run_" +
DateTime.now(DateTimeZone.UTC).toString("yyyy-MM-dd_HH-mm-ss");
    return events
            .groupByKey()
            .parallelDo(
                    new SequenceFilePerKeyOutputFn<>(
                            temporaryDirectory.toString(),
                            leafFilename,
                            longs(), bytes()),
                    tableOf(strings(), strings()))
            .materializeToMap();
}


private void moveFilesToFinalLocation(Map<String, String>
temporaryFinalPaths) throws IOException {
    for (Map.Entry<String, String> temporaryFinalPath :
temporaryFinalPaths.entrySet()) {
        Path temporaryPath = new Path(temporaryFinalPath.getKey());
        Path finalPath = new Path(temporaryFinalPath.getValue());
        info("Renaming " + temporaryPath + " to " + finalPath);
        fs.mkdirs(finalPath.getParent());
        fs.rename(temporaryPath, finalPath);
    }
}

This is all working nicely in production at the moment, but I have 2
follow-up questions for the Crunch community:
* Is this a sane thing to do? Will it cause any problems that I haven't
thought about.
* Would this be a useful thing to contribute to Crunch itself? If so, what
would the API look like?

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message