crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Multiple output channels from Crunch DoFn
Date Wed, 21 Aug 2013 02:40:02 GMT
That does sound pretty clean...


On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi <stepinto@live.com> wrote:

> Is it possible to provide a utility that transforms PCollection<Pair<A,
> B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply
emit Pairs
> and then write them to two Targets. This could be generalized to Tuples.
>
>
> 2013/8/21 Josh Wills <josh.wills@gmail.com>
>
>>
>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <Brandon.Inman@cerner.com
>> > wrote:
>>
>>>  I like the flexibility of this approach, although would the idea of
>>> having some official constants defined for a small set of standard channels
>>> be reasonable (the concepts of "out" and "error" are pretty common, others
>>> may be warranted as well)?
>>>
>>
>> So I think the way I would handle this would be having a main output
>> directory and an error output directory that was underneath it. Cascading
>> does this trick within their existing flows where you can throw exceptions
>> to "traps," which is essentially the same idea, though I'm not wild about
>> control flow that relies on throwing exceptions.
>>
>>
>>>  Is this something that you would see being added to core Crunch APIs
>>> (for example, directly to Pipeline), or implemented on top of Crunch with a
>>> filtering approach similar to my original post?  If it's implemented on
>>> top, shouldn't materialization work as-is?
>>>
>>
>> Yes, your model would be simpler. I think that mine would require a
>> special kind of Target implementation, a custom implementation of the
>> Emitter interface that would be used for routing the outputs of the DoFn,
>> and possibly some post-processing code to move the data to a sensible
>> place. I don't know if that work is strictly necessary, and your impl is
>> certainly much more straightforward than mine. :)
>>
>>
>>>
>>>  If the type was PTable<String, T>, could Union<S,U> be a choice
for T
>>> as appropriate? In our case, we would likely be looking at a PTable<String,
>>> T extends SpecificRecordBase> and not necessarily need Union with this
>>> approach.
>>>
>>
>> Yeah, I think it would be fine, but we'd have to be cognizant of it when
>> we were implementing the union type, and it would be up to the client to
>> ensure that the right data type ended up in the right file, which is maybe
>> less good?
>>
>>
>>>
>>>
>>>   From: Josh Wills <jwills@cloudera.com>
>>> Reply-To: "user@crunch.apache.org" <user@crunch.apache.org>
>>> Date: Tuesday, August 20, 2013 1:00 PM
>>> To: "user@crunch.apache.org" <user@crunch.apache.org>
>>> Subject: Re: Multiple output channels from Crunch DoFn
>>>
>>>   A related idea that has come up a few times has been the idea of
>>> having a way of writing values to different files based on a key: some kind
>>> of generalization of Target that would itself write multiple outputs under
>>> the covers, with the name of the output file indicated by some function of
>>> the key of the PTable.
>>>
>>> For this situation, we would have a PTable that was like PTable<String,
>>> Union<S, T>>, or just PTable<String, T> if the output types were
all the
>>> same, and the String would specify the name of an output directory (that I
>>> suppose would live underneath some base output directory for the Target)
>>> that the record would be written to.
>>>
>>>  There are a couple of limitations to this approach, I think: we
>>> couldn't consider this kind of PTable "materialized" w/o doing an overhaul
>>> of the materialization logic-- it would act sort of like an HTableTarget in
>>> that it would be write-only in flows. There are probably some others I
>>> can't think of off the top of my head. What do you guys think?
>>>
>>>  J
>>>
>>>
>>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan <RBRUSH@cerner.com> wrote:
>>>
>>>> I happen to have some context around this, so I wanted to expand on
>>>> Brandon's question a bit.  The use case here is we're dealing with a large
>>>> volume of third-party input and expect a certain percentage of bogus or
>>>> malformed data. Rather than simply logging instances of bad records, we
>>>> want to treat it as a signal we can learn from, both for improving our
>>>> processing logic and for creating structured reports we can use to
>>>> troubleshoot data sources.
>>>>
>>>> This leads to the "standard out" and "standard error" metaphors Brandon
>>>> mentions: in most cases, our Crunch DoFns would emit a processed structure
>>>> useful downstream. But we'd also like to be able to emit a structured error
>>>> -- probably as an Avro object in our case -- and persist that as a
>>>> byproduct of our main processing pipeline.
>>>>
>>>> Would it make sense for such DoFn's to emit something some form of
>>>> "Option" object? We could then attach two consuming functions to it: one
>>>> that handles the "success" case, sending the resulting Avro object
>>>> downstream. Another DoFn attached to the "Option" object would be a no-op
>>>> unless the Option contained an "error" structure, at which point we persist
>>>> it to some well-known location for later analysis.
>>>>
>>>> I think this is entirely achievable using existing mechanisms...but it
>>>> seems like common enough use case (at least for us) to establish some
>>>> idioms for dealing it.
>>>>
>>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>>
>>>> >
>>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>>> > specifically writing out some kind of Status or Error Avro object,
>>>> based
>>>> > on failures that occur processing individual records in various jobs.
>>>> It
>>>> > had been suggested that, rather than logging these errors to
>>>> traditional
>>>> > loggers, to consider them an output of the Crunch job.  After some
>>>> > internal discussion, it was suggested to run the ideas past the Crunch
>>>> > community.
>>>> >
>>>> >
>>>> > A major goal we have is to end with all the error output in a location
>>>> > that makes it easy to run Hive queries or perform other
>>>> MapReduce-style
>>>> > analysis to quickly view all errors across the larger system without
>>>> the
>>>> > need go to multiple facilities.  This means standardizing on the Avro
>>>> > object, but it also necessitates decoupling the storage of the object
>>>> from
>>>> > the "standard output" of the job.
>>>> >
>>>> >
>>>> > As Crunch DoFns support a single Emitter per invocation of process(),
>>>> the
>>>> > solution that gathered the most support would be to emit an object
>>>> similar
>>>> > to Pair<>, where first would be the "standard out" and second
would
>>>> be the
>>>> > "standard error".  A DoFn would generally only populate one (nothing
>>>> > preventing it from populating both if appropriate, but not really
>>>> intended
>>>> > as a part of general use), and separate DoFns would filter out the two
>>>> > components of the pair and write the values to the appropriate
>>>> targets.
>>>> >
>>>> > As far as the emitted pairing object; the concept of a tagged union
>>>> was
>>>> > suggested although there currently isn't support in Java or Avro for
>>>> the
>>>> > concept; it was noted that
>>>> > https://issues.apache.org/jira/browse/CRUNCH-239<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>might
be a close
>>>> > candidate. Pair<> would meet the requirements, although it was
>>>> suggested
>>>> > that a simple object dedicated to the task could make a cleaner
>>>> approach.
>>>> >
>>>> > Any general thoughts on this approach? Are there any other patterns
>>>> that
>>>> > might serve us better, or anything on the Crunch roadmap that might
be
>>>> > more appropriate?
>>>> >
>>>> >
>>>> > Brandon Inman
>>>> > Software Architect
>>>> > www.cerner.com
>>>> >
>>>> >
>>>> > CONFIDENTIALITY NOTICE This message and any included attachments are
>>>> from Cerner Corporation and are intended only for the addressee. The
>>>> information contained in this message is confidential and may constitute
>>>> inside or non-public information under international, federal, or state
>>>> securities laws. Unauthorized forwarding, printing, copying, distribution,
>>>> or use of such information is strictly prohibited and may be unlawful. If
>>>> you are not the addressee, please promptly delete this message and notify
>>>> the sender of the delivery error by e-mail or you may call Cerner's
>>>> corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024
>>>> .
>>>>
>>>>
>>>>
>>>
>>>
>>>  --
>>> Director of Data Science
>>> Cloudera<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>>> Twitter: @josh_wills<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>>>
>>
>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message