crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Multiple output channels from Crunch DoFn
Date Thu, 22 Aug 2013 22:47:16 GMT
On Thu, Aug 22, 2013 at 3:45 PM, Micah Whitacre <mkwhitacre@gmail.com>wrote:

> I view them as separate pieces of functionality.  The splitting of a
> grouping PType (Pair, Tuple) seems reusable in a number of contexts.  When
> we support Unions (or Either) we could provide similar functionality to
> split PCollection<Union<T, U>> -> Pair<PCollection<T>, PCollection<U>>.
>

That's a fair point. Okay, let's go ahead w/the PCollection<Pair<>> ->
Pair<PCollection<>> plan.


>
>
> On Thu, Aug 22, 2013 at 5:33 PM, Josh Wills <jwills@cloudera.com> wrote:
>
>> I'm +1 for the PCollection<Pair<T, U>> -> Pair<PCollection<T>,
>> PCollection<U>> approach outlined by Brandon and Chao. I think the only
>> question is whether or not we want to add in the Union<T, U> (or Either<T,
>> U>?) feature as part of doing that change.
>>
>> J
>>
>>
>> On Wed, Aug 21, 2013 at 9:19 AM, Inman,Brandon <Brandon.Inman@cerner.com>wrote:
>>
>>> This is close to how I had imagined the implementation to look.  Very
>>> roughly-
>>>
>>>  public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends
>>> DoFn<Pair<U, ?>, U> {
>>>
>>>         @Override
>>>         public void process(Pair<U, ?> input, Emitter<U> emitter)
{
>>>             final U first = input.first();
>>>             if (first != null) {
>>>                 emitter.emit(first);
>>>             }
>>>         }
>>>     }
>>> }
>>>
>>> There would be a very similar DoFn for second() that I'll omit for
>>> brevity. I originally envisioned the utility method calling the DoFn that
>>> generated the pair, but I like the idea of a smaller utility. The utility
>>> method should be as simple as...
>>>
>>> public static <T, U> Pair<PCollection<T>,PCollection<U>>
>>> filterChannels(final PCollection<Pair<T,U>> pCollection, final PType<T>
>>> firstPType, final PType<U> secondPType) {
>>>
>>>   final PCollection<T> stdout = collection.parallelDo(new
>>> FirstEmittingDoFn<T>, firstPType);
>>>   final PCollection<U> stderr = collection.parallelDo(new
>>> SecondEmittingDoFn<U>, secondPType);
>>>
>>>
>>>   return Pair.of(stdout,stderr);
>>>     }
>>>
>>>
>>> Disclaimer; I didn't try to compile (all) this code, so treat as
>>> pseudocode.
>>>
>>> From:  Josh Wills <jwills@cloudera.com>
>>> Reply-To:  "user@crunch.apache.org" <user@crunch.apache.org>
>>> Date:  Tuesday, August 20, 2013 9:40 PM
>>> To:  "user@crunch.apache.org" <user@crunch.apache.org>
>>> Subject:  Re: Multiple output channels from Crunch DoFn
>>>
>>>
>>> That does sound pretty clean...
>>>
>>>
>>> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi
>>> <stepinto@live.com> wrote:
>>>
>>> Is it possible to provide a utility that transforms PCollection<Pair<A,
>>> B>> to Pair<PCollection<A>, PCollection<B>>? So one can
simply emit Pairs
>>> and then write them to two Targets. This could be generalized to Tuples.
>>>
>>>
>>> 2013/8/21 Josh Wills <josh.wills@gmail.com>
>>>
>>>
>>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <
>>> Brandon.Inman@cerner.com>
>>> wrote:
>>>
>>> I like the flexibility of this approach, although would the idea of
>>> having
>>> some official constants defined for a small set of standard channels be
>>> reasonable (the concepts of "out" and "error" are pretty common, others
>>> may be warranted as well)?
>>>
>>>
>>>
>>>
>>>
>>>
>>> So I think the way I would handle this would be having a main output
>>> directory and an error output directory that was underneath it. Cascading
>>> does this trick within their existing flows where you can throw
>>> exceptions
>>> to "traps," which is essentially the
>>>  same idea, though I'm not wild about control flow that relies on
>>> throwing
>>> exceptions.
>>>
>>>
>>>
>>> Is this something that you would see being added to core Crunch APIs (for
>>> example, directly to Pipeline), or implemented on top of Crunch with a
>>> filtering approach similar to my original post?  If it's implemented on
>>> top, shouldn't materialization work
>>>  as-is?
>>>
>>>
>>>
>>>
>>>
>>>
>>> Yes, your model would be simpler. I think that mine would require a
>>> special kind of Target implementation, a custom implementation of the
>>> Emitter interface that would be used for routing the outputs of the DoFn,
>>> and possibly some post-processing code to
>>>  move the data to a sensible place. I don't know if that work is strictly
>>> necessary, and your impl is certainly much more straightforward than
>>> mine.
>>> :)
>>>
>>>
>>>
>>>
>>> If the type was PTable<String, T>, could Union<S,U> be a choice for
T as
>>> appropriate? In our case, we would likely be looking at a PTable<String,
>>> T
>>> extends SpecificRecordBase> and not necessarily need Union with this
>>> approach.
>>>
>>>
>>>
>>>
>>>
>>>
>>> Yeah, I think it would be fine, but we'd have to be cognizant of it when
>>> we were implementing the union type, and it would be up to the client to
>>> ensure that the right data type ended up in the right file, which is
>>> maybe
>>> less good?
>>>
>>>
>>>
>>>
>>>
>>> From: Josh Wills <jwills@cloudera.com>
>>> Reply-To: "user@crunch.apache.org" <user@crunch.apache.org>
>>> Date: Tuesday, August 20, 2013 1:00 PM
>>> To: "user@crunch.apache.org" <user@crunch.apache.org>
>>> Subject: Re: Multiple output channels from Crunch DoFn
>>>
>>>
>>> A related idea that has come up a few times has been the idea of having a
>>> way of writing values to different files based on a key: some kind of
>>> generalization of Target that would itself write multiple outputs under
>>> the covers, with the name
>>>  of the output file indicated by some function of the key of the PTable.
>>>
>>> For this situation, we would have a PTable that was like PTable<String,
>>> Union<S, T>>, or just PTable<String, T> if the output types were
all the
>>> same, and the String would specify the name of an output directory (that
>>> I
>>> suppose would live underneath some base
>>>  output directory for the Target) that the record would be written to.
>>>
>>> There are a couple of limitations to this approach, I think: we couldn't
>>> consider this kind of PTable "materialized" w/o doing an overhaul of the
>>> materialization logic-- it would act sort of like an HTableTarget in that
>>> it would be write-only in flows.
>>>  There are probably some others I can't think of off the top of my head.
>>> What do you guys think?
>>>
>>> J
>>>
>>>
>>>
>>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan
>>> <RBRUSH@cerner.com> wrote:
>>>
>>> I happen to have some context around this, so I wanted to expand on
>>> Brandon's question a bit.  The use case here is we're dealing with a
>>> large
>>> volume of third-party input and expect a certain percentage of bogus or
>>> malformed data. Rather than simply logging
>>>  instances of bad records, we want to treat it as a signal we can learn
>>> from, both for improving our processing logic and for creating structured
>>> reports we can use to troubleshoot data sources.
>>>
>>> This leads to the "standard out" and "standard error" metaphors Brandon
>>> mentions: in most cases, our Crunch DoFns would emit a processed
>>> structure
>>> useful downstream. But we'd also like to be able to emit a structured
>>> error -- probably as an Avro object in our
>>>  case -- and persist that as a byproduct of our main processing pipeline.
>>>
>>> Would it make sense for such DoFn's to emit something some form of
>>> "Option" object? We could then attach two consuming functions to it: one
>>> that handles the "success" case, sending the resulting Avro object
>>> downstream. Another DoFn attached to the "Option"
>>>  object would be a no-op unless the Option contained an "error"
>>> structure,
>>> at which point we persist it to some well-known location for later
>>> analysis.
>>>
>>> I think this is entirely achievable using existing mechanisms...but it
>>> seems like common enough use case (at least for us) to establish some
>>> idioms for dealing it.
>>>
>>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>>
>>> >
>>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>>> > specifically writing out some kind of Status or Error Avro object,
>>> based
>>> > on failures that occur processing individual records in various jobs.
>>> It
>>> > had been suggested that, rather than logging these errors to
>>> traditional
>>> > loggers, to consider them an output of the Crunch job.  After some
>>> > internal discussion, it was suggested to run the ideas past the Crunch
>>> > community.
>>> >
>>> >
>>> > A major goal we have is to end with all the error output in a location
>>> > that makes it easy to run Hive queries or perform other MapReduce-style
>>> > analysis to quickly view all errors across the larger system without
>>> the
>>> > need go to multiple facilities.  This means standardizing on the Avro
>>> > object, but it also necessitates decoupling the storage of the object
>>> >from
>>> > the "standard output" of the job.
>>> >
>>> >
>>> > As Crunch DoFns support a single Emitter per invocation of process(),
>>> the
>>> > solution that gathered the most support would be to emit an object
>>> >similar
>>> > to Pair<>, where first would be the "standard out" and second would
be
>>> >the
>>> > "standard error".  A DoFn would generally only populate one (nothing
>>> > preventing it from populating both if appropriate, but not really
>>> >intended
>>> > as a part of general use), and separate DoFns would filter out the two
>>> > components of the pair and write the values to the appropriate targets.
>>> >
>>> > As far as the emitted pairing object; the concept of a tagged union was
>>> > suggested although there currently isn't support in Java or Avro for
>>> the
>>> > concept; it was noted that
>>> >
>>> https://issues.apache.org/jira/browse/CRUNCH-239
>>> <
>>> https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/
>>>
>>> browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjr
>>>
>>> SpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMaf
>>>
>>> c%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>
>>> >
>>>  might be a close
>>> > candidate. Pair<> would meet the requirements, although it was
>>> suggested
>>> > that a simple object dedicated to the task could make a cleaner
>>> approach.
>>> >
>>> > Any general thoughts on this approach? Are there any other patterns
>>> that
>>> > might serve us better, or anything on the Crunch roadmap that might be
>>> > more appropriate?
>>> >
>>> >
>>> > Brandon Inman
>>> > Software Architect
>>> > www.cerner.com <http://www.cerner.com>
>>> >
>>> >
>>> > CONFIDENTIALITY NOTICE This message and any included attachments are
>>> >from Cerner Corporation and are intended only for the addressee. The
>>> >information contained in this message is confidential and may constitute
>>> >inside or non-public information under international,
>>>  federal, or state securities laws. Unauthorized forwarding, printing,
>>> copying, distribution, or use of such information is strictly prohibited
>>> and may be unlawful. If you are not the addressee, please promptly delete
>>> this message and notify the sender of the
>>>  delivery error by e-mail or you may call Cerner's corporate offices in
>>> Kansas City, Missouri, U.S.A at
>>>
>>> (+1) (816)221-1024 <tel:%28%2B1%29%20%28816%29221-1024>.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera
>>> <
>>> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
>>>
>>> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
>>>
>>> 3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f
>>> 7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>>> >
>>> Twitter:
>>> @josh_wills
>>> <
>>> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
>>>
>>> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
>>>
>>> 7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e2
>>> 90f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>>> >
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera
>>> <
>>> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
>>>
>>> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
>>>
>>> 3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e4
>>> 31966b19fec39773cae0b9319fc310155b4ab636cabd4799a<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e431966b19fec39773cae0b9319fc310155b4ab636cabd4799a>
>>> >
>>> Twitter:
>>> @josh_wills
>>> <
>>> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
>>>
>>> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
>>>
>>> 7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae6
>>> 0caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae60caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6>
>>> >
>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message