crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chao Shi <>
Subject Re: Multiple output channels from Crunch DoFn
Date Wed, 21 Aug 2013 02:34:54 GMT
Is it possible to provide a utility that transforms PCollection<Pair<A, B>>
to Pair<PCollection<A>, PCollection<B>>? So one can simply emit Pairs and
then write them to two Targets. This could be generalized to Tuples.

2013/8/21 Josh Wills <>

> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <>wrote:
>>  I like the flexibility of this approach, although would the idea of
>> having some official constants defined for a small set of standard channels
>> be reasonable (the concepts of "out" and "error" are pretty common, others
>> may be warranted as well)?
> So I think the way I would handle this would be having a main output
> directory and an error output directory that was underneath it. Cascading
> does this trick within their existing flows where you can throw exceptions
> to "traps," which is essentially the same idea, though I'm not wild about
> control flow that relies on throwing exceptions.
>>  Is this something that you would see being added to core Crunch APIs
>> (for example, directly to Pipeline), or implemented on top of Crunch with a
>> filtering approach similar to my original post?  If it's implemented on
>> top, shouldn't materialization work as-is?
> Yes, your model would be simpler. I think that mine would require a
> special kind of Target implementation, a custom implementation of the
> Emitter interface that would be used for routing the outputs of the DoFn,
> and possibly some post-processing code to move the data to a sensible
> place. I don't know if that work is strictly necessary, and your impl is
> certainly much more straightforward than mine. :)
>>  If the type was PTable<String, T>, could Union<S,U> be a choice for
>> as appropriate? In our case, we would likely be looking at a PTable<String,
>> T extends SpecificRecordBase> and not necessarily need Union with this
>> approach.
> Yeah, I think it would be fine, but we'd have to be cognizant of it when
> we were implementing the union type, and it would be up to the client to
> ensure that the right data type ended up in the right file, which is maybe
> less good?
>>   From: Josh Wills <>
>> Reply-To: "" <>
>> Date: Tuesday, August 20, 2013 1:00 PM
>> To: "" <>
>> Subject: Re: Multiple output channels from Crunch DoFn
>>   A related idea that has come up a few times has been the idea of
>> having a way of writing values to different files based on a key: some kind
>> of generalization of Target that would itself write multiple outputs under
>> the covers, with the name of the output file indicated by some function of
>> the key of the PTable.
>> For this situation, we would have a PTable that was like PTable<String,
>> Union<S, T>>, or just PTable<String, T> if the output types were all
>> same, and the String would specify the name of an output directory (that I
>> suppose would live underneath some base output directory for the Target)
>> that the record would be written to.
>>  There are a couple of limitations to this approach, I think: we
>> couldn't consider this kind of PTable "materialized" w/o doing an overhaul
>> of the materialization logic-- it would act sort of like an HTableTarget in
>> that it would be write-only in flows. There are probably some others I
>> can't think of off the top of my head. What do you guys think?
>>  J
>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <> wrote:
>>> I happen to have some context around this, so I wanted to expand on
>>> Brandon's question a bit.  The use case here is we're dealing with a large
>>> volume of third-party input and expect a certain percentage of bogus or
>>> malformed data. Rather than simply logging instances of bad records, we
>>> want to treat it as a signal we can learn from, both for improving our
>>> processing logic and for creating structured reports we can use to
>>> troubleshoot data sources.
>>> This leads to the "standard out" and "standard error" metaphors Brandon
>>> mentions: in most cases, our Crunch DoFns would emit a processed structure
>>> useful downstream. But we'd also like to be able to emit a structured error
>>> -- probably as an Avro object in our case -- and persist that as a
>>> byproduct of our main processing pipeline.
>>> Would it make sense for such DoFn's to emit something some form of
>>> "Option" object? We could then attach two consuming functions to it: one
>>> that handles the "success" case, sending the resulting Avro object
>>> downstream. Another DoFn attached to the "Option" object would be a no-op
>>> unless the Option contained an "error" structure, at which point we persist
>>> it to some well-known location for later analysis.
>>> I think this is entirely achievable using existing mechanisms...but it
>>> seems like common enough use case (at least for us) to establish some
>>> idioms for dealing it.
>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>> >
>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>> > specifically writing out some kind of Status or Error Avro object,
>>> based
>>> > on failures that occur processing individual records in various jobs.
>>> It
>>> > had been suggested that, rather than logging these errors to
>>> traditional
>>> > loggers, to consider them an output of the Crunch job.  After some
>>> > internal discussion, it was suggested to run the ideas past the Crunch
>>> > community.
>>> >
>>> >
>>> > A major goal we have is to end with all the error output in a location
>>> > that makes it easy to run Hive queries or perform other MapReduce-style
>>> > analysis to quickly view all errors across the larger system without
>>> the
>>> > need go to multiple facilities.  This means standardizing on the Avro
>>> > object, but it also necessitates decoupling the storage of the object
>>> from
>>> > the "standard output" of the job.
>>> >
>>> >
>>> > As Crunch DoFns support a single Emitter per invocation of process(),
>>> the
>>> > solution that gathered the most support would be to emit an object
>>> similar
>>> > to Pair<>, where first would be the "standard out" and second would
>>> the
>>> > "standard error".  A DoFn would generally only populate one (nothing
>>> > preventing it from populating both if appropriate, but not really
>>> intended
>>> > as a part of general use), and separate DoFns would filter out the two
>>> > components of the pair and write the values to the appropriate targets.
>>> >
>>> > As far as the emitted pairing object; the concept of a tagged union was
>>> > suggested although there currently isn't support in Java or Avro for
>>> the
>>> > concept; it was noted that
>>> ><>might
be a close
>>> > candidate. Pair<> would meet the requirements, although it was
>>> suggested
>>> > that a simple object dedicated to the task could make a cleaner
>>> approach.
>>> >
>>> > Any general thoughts on this approach? Are there any other patterns
>>> that
>>> > might serve us better, or anything on the Crunch roadmap that might be
>>> > more appropriate?
>>> >
>>> >
>>> > Brandon Inman
>>> > Software Architect
>>> >
>>> >
>>> >
>>> > CONFIDENTIALITY NOTICE This message and any included attachments are
>>> from Cerner Corporation and are intended only for the addressee. The
>>> information contained in this message is confidential and may constitute
>>> inside or non-public information under international, federal, or state
>>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>>> or use of such information is strictly prohibited and may be unlawful. If
>>> you are not the addressee, please promptly delete this message and notify
>>> the sender of the delivery error by e-mail or you may call Cerner's
>>> corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>>  --
>> Director of Data Science
>> Cloudera<>
>> Twitter: @josh_wills<>

View raw message