crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chao Shi <>
Subject Re: Multiple output channels from Crunch DoFn
Date Wed, 21 Aug 2013 03:43:25 GMT
Yes, I think so. Do we generally allow nulls in crunch APIs? I'm a afraid
that it would be confusing if some excludes null values while others don't.

2013/8/21 Josh Wills <>

> ...with the assumption that we would exclude null values in the Pair<A, B>?
> On Tue, Aug 20, 2013 at 7:40 PM, Josh Wills <> wrote:
>> That does sound pretty clean...
>> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi <> wrote:
>>> Is it possible to provide a utility that transforms PCollection<Pair<A,
>>> B>> to Pair<PCollection<A>, PCollection<B>>? So one can
simply emit Pairs
>>> and then write them to two Targets. This could be generalized to Tuples.
>>> 2013/8/21 Josh Wills <>
>>>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <
>>>>> wrote:
>>>>>  I like the flexibility of this approach, although would the idea of
>>>>> having some official constants defined for a small set of standard channels
>>>>> be reasonable (the concepts of "out" and "error" are pretty common, others
>>>>> may be warranted as well)?
>>>> So I think the way I would handle this would be having a main output
>>>> directory and an error output directory that was underneath it. Cascading
>>>> does this trick within their existing flows where you can throw exceptions
>>>> to "traps," which is essentially the same idea, though I'm not wild about
>>>> control flow that relies on throwing exceptions.
>>>>>  Is this something that you would see being added to core Crunch APIs
>>>>> (for example, directly to Pipeline), or implemented on top of Crunch
with a
>>>>> filtering approach similar to my original post?  If it's implemented
>>>>> top, shouldn't materialization work as-is?
>>>> Yes, your model would be simpler. I think that mine would require a
>>>> special kind of Target implementation, a custom implementation of the
>>>> Emitter interface that would be used for routing the outputs of the DoFn,
>>>> and possibly some post-processing code to move the data to a sensible
>>>> place. I don't know if that work is strictly necessary, and your impl is
>>>> certainly much more straightforward than mine. :)
>>>>>  If the type was PTable<String, T>, could Union<S,U> be a
choice for
>>>>> T as appropriate? In our case, we would likely be looking at a
>>>>> PTable<String, T extends SpecificRecordBase> and not necessarily
need Union
>>>>> with this approach.
>>>> Yeah, I think it would be fine, but we'd have to be cognizant of it
>>>> when we were implementing the union type, and it would be up to the client
>>>> to ensure that the right data type ended up in the right file, which is
>>>> maybe less good?
>>>>>   From: Josh Wills <>
>>>>> Reply-To: "" <>
>>>>> Date: Tuesday, August 20, 2013 1:00 PM
>>>>> To: "" <>
>>>>> Subject: Re: Multiple output channels from Crunch DoFn
>>>>>   A related idea that has come up a few times has been the idea of
>>>>> having a way of writing values to different files based on a key: some
>>>>> of generalization of Target that would itself write multiple outputs
>>>>> the covers, with the name of the output file indicated by some function
>>>>> the key of the PTable.
>>>>> For this situation, we would have a PTable that was like
>>>>> PTable<String, Union<S, T>>, or just PTable<String, T>
if the output types
>>>>> were all the same, and the String would specify the name of an output
>>>>> directory (that I suppose would live underneath some base output directory
>>>>> for the Target) that the record would be written to.
>>>>>  There are a couple of limitations to this approach, I think: we
>>>>> couldn't consider this kind of PTable "materialized" w/o doing an overhaul
>>>>> of the materialization logic-- it would act sort of like an HTableTarget
>>>>> that it would be write-only in flows. There are probably some others
>>>>> can't think of off the top of my head. What do you guys think?
>>>>>  J
>>>>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <>
>>>>>> I happen to have some context around this, so I wanted to expand
>>>>>> Brandon's question a bit.  The use case here is we're dealing with
a large
>>>>>> volume of third-party input and expect a certain percentage of bogus
>>>>>> malformed data. Rather than simply logging instances of bad records,
>>>>>> want to treat it as a signal we can learn from, both for improving
>>>>>> processing logic and for creating structured reports we can use to
>>>>>> troubleshoot data sources.
>>>>>> This leads to the "standard out" and "standard error" metaphors
>>>>>> Brandon mentions: in most cases, our Crunch DoFns would emit a processed
>>>>>> structure useful downstream. But we'd also like to be able to emit
>>>>>> structured error -- probably as an Avro object in our case -- and
>>>>>> that as a byproduct of our main processing pipeline.
>>>>>> Would it make sense for such DoFn's to emit something some form of
>>>>>> "Option" object? We could then attach two consuming functions to
it: one
>>>>>> that handles the "success" case, sending the resulting Avro object
>>>>>> downstream. Another DoFn attached to the "Option" object would be
a no-op
>>>>>> unless the Option contained an "error" structure, at which point
we persist
>>>>>> it to some well-known location for later analysis.
>>>>>> I think this is entirely achievable using existing mechanisms...but
>>>>>> it seems like common enough use case (at least for us) to establish
>>>>>> idioms for dealing it.
>>>>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>>>> >
>>>>>> > We've been looking at ways to do multiple outputs in Crunch
>>>>>> > specifically writing out some kind of Status or Error Avro object,
>>>>>> based
>>>>>> > on failures that occur processing individual records in various
>>>>>> jobs. It
>>>>>> > had been suggested that, rather than logging these errors to
>>>>>> traditional
>>>>>> > loggers, to consider them an output of the Crunch job.  After
>>>>>> > internal discussion, it was suggested to run the ideas past
>>>>>> Crunch
>>>>>> > community.
>>>>>> >
>>>>>> >
>>>>>> > A major goal we have is to end with all the error output in
>>>>>> location
>>>>>> > that makes it easy to run Hive queries or perform other
>>>>>> MapReduce-style
>>>>>> > analysis to quickly view all errors across the larger system
>>>>>> without the
>>>>>> > need go to multiple facilities.  This means standardizing on
>>>>>> Avro
>>>>>> > object, but it also necessitates decoupling the storage of the
>>>>>> object from
>>>>>> > the "standard output" of the job.
>>>>>> >
>>>>>> >
>>>>>> > As Crunch DoFns support a single Emitter per invocation of
>>>>>> process(), the
>>>>>> > solution that gathered the most support would be to emit an
>>>>>> similar
>>>>>> > to Pair<>, where first would be the "standard out" and
second would
>>>>>> be the
>>>>>> > "standard error".  A DoFn would generally only populate one
>>>>>> > preventing it from populating both if appropriate, but not really
>>>>>> intended
>>>>>> > as a part of general use), and separate DoFns would filter out
>>>>>> two
>>>>>> > components of the pair and write the values to the appropriate
>>>>>> targets.
>>>>>> >
>>>>>> > As far as the emitted pairing object; the concept of a tagged
>>>>>> was
>>>>>> > suggested although there currently isn't support in Java or
>>>>>> for the
>>>>>> > concept; it was noted that
>>>>>> ><>might
be a close
>>>>>> > candidate. Pair<> would meet the requirements, although
it was
>>>>>> suggested
>>>>>> > that a simple object dedicated to the task could make a cleaner
>>>>>> approach.
>>>>>> >
>>>>>> > Any general thoughts on this approach? Are there any other patterns
>>>>>> that
>>>>>> > might serve us better, or anything on the Crunch roadmap that
>>>>>> be
>>>>>> > more appropriate?
>>>>>> >
>>>>>> >
>>>>>> > Brandon Inman
>>>>>> > Software Architect
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > CONFIDENTIALITY NOTICE This message and any included attachments
>>>>>> are from Cerner Corporation and are intended only for the addressee.
>>>>>> information contained in this message is confidential and may constitute
>>>>>> inside or non-public information under international, federal, or
>>>>>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>>>>>> or use of such information is strictly prohibited and may be unlawful.
>>>>>> you are not the addressee, please promptly delete this message and
>>>>>> the sender of the delivery error by e-mail or you may call Cerner's
>>>>>> corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>>>>> (816)221-1024.
>>>>>  --
>>>>> Director of Data Science
>>>>> Cloudera<>
>>>>> Twitter: @josh_wills<>
>> --
>> Director of Data Science
>> Cloudera <>
>> Twitter: @josh_wills <>
> --
> Director of Data Science
> Cloudera <>
> Twitter: @josh_wills <>

View raw message