crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Whitacre <mkwhita...@gmail.com>
Subject Re: Multiple output channels from Crunch DoFn
Date Thu, 22 Aug 2013 22:45:25 GMT
I view them as separate pieces of functionality.  The splitting of a
grouping PType (Pair, Tuple) seems reusable in a number of contexts.  When
we support Unions (or Either) we could provide similar functionality to
split PCollection<Union<T, U>> -> Pair<PCollection<T>, PCollection<U>>.


On Thu, Aug 22, 2013 at 5:33 PM, Josh Wills <jwills@cloudera.com> wrote:

> I'm +1 for the PCollection<Pair<T, U>> -> Pair<PCollection<T>,
> PCollection<U>> approach outlined by Brandon and Chao. I think the only
> question is whether or not we want to add in the Union<T, U> (or Either<T,
> U>?) feature as part of doing that change.
>
> J
>
>
> On Wed, Aug 21, 2013 at 9:19 AM, Inman,Brandon <Brandon.Inman@cerner.com>wrote:
>
>> This is close to how I had imagined the implementation to look.  Very
>> roughly-
>>
>>  public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends
>> DoFn<Pair<U, ?>, U> {
>>
>>         @Override
>>         public void process(Pair<U, ?> input, Emitter<U> emitter) {
>>             final U first = input.first();
>>             if (first != null) {
>>                 emitter.emit(first);
>>             }
>>         }
>>     }
>> }
>>
>> There would be a very similar DoFn for second() that I'll omit for
>> brevity. I originally envisioned the utility method calling the DoFn that
>> generated the pair, but I like the idea of a smaller utility. The utility
>> method should be as simple as...
>>
>> public static <T, U> Pair<PCollection<T>,PCollection<U>>
>> filterChannels(final PCollection<Pair<T,U>> pCollection, final PType<T>
>> firstPType, final PType<U> secondPType) {
>>
>>   final PCollection<T> stdout = collection.parallelDo(new
>> FirstEmittingDoFn<T>, firstPType);
>>   final PCollection<U> stderr = collection.parallelDo(new
>> SecondEmittingDoFn<U>, secondPType);
>>
>>
>>   return Pair.of(stdout,stderr);
>>     }
>>
>>
>> Disclaimer; I didn't try to compile (all) this code, so treat as
>> pseudocode.
>>
>> From:  Josh Wills <jwills@cloudera.com>
>> Reply-To:  "user@crunch.apache.org" <user@crunch.apache.org>
>> Date:  Tuesday, August 20, 2013 9:40 PM
>> To:  "user@crunch.apache.org" <user@crunch.apache.org>
>> Subject:  Re: Multiple output channels from Crunch DoFn
>>
>>
>> That does sound pretty clean...
>>
>>
>> On Tue, Aug 20, 2013 at 7:34 PM, Chao Shi
>> <stepinto@live.com> wrote:
>>
>> Is it possible to provide a utility that transforms PCollection<Pair<A,
>> B>> to Pair<PCollection<A>, PCollection<B>>? So one can simply
emit Pairs
>> and then write them to two Targets. This could be generalized to Tuples.
>>
>>
>> 2013/8/21 Josh Wills <josh.wills@gmail.com>
>>
>>
>> On Tue, Aug 20, 2013 at 12:23 PM, Inman,Brandon <Brandon.Inman@cerner.com
>> >
>> wrote:
>>
>> I like the flexibility of this approach, although would the idea of having
>> some official constants defined for a small set of standard channels be
>> reasonable (the concepts of "out" and "error" are pretty common, others
>> may be warranted as well)?
>>
>>
>>
>>
>>
>>
>> So I think the way I would handle this would be having a main output
>> directory and an error output directory that was underneath it. Cascading
>> does this trick within their existing flows where you can throw exceptions
>> to "traps," which is essentially the
>>  same idea, though I'm not wild about control flow that relies on throwing
>> exceptions.
>>
>>
>>
>> Is this something that you would see being added to core Crunch APIs (for
>> example, directly to Pipeline), or implemented on top of Crunch with a
>> filtering approach similar to my original post?  If it's implemented on
>> top, shouldn't materialization work
>>  as-is?
>>
>>
>>
>>
>>
>>
>> Yes, your model would be simpler. I think that mine would require a
>> special kind of Target implementation, a custom implementation of the
>> Emitter interface that would be used for routing the outputs of the DoFn,
>> and possibly some post-processing code to
>>  move the data to a sensible place. I don't know if that work is strictly
>> necessary, and your impl is certainly much more straightforward than mine.
>> :)
>>
>>
>>
>>
>> If the type was PTable<String, T>, could Union<S,U> be a choice for T
as
>> appropriate? In our case, we would likely be looking at a PTable<String, T
>> extends SpecificRecordBase> and not necessarily need Union with this
>> approach.
>>
>>
>>
>>
>>
>>
>> Yeah, I think it would be fine, but we'd have to be cognizant of it when
>> we were implementing the union type, and it would be up to the client to
>> ensure that the right data type ended up in the right file, which is maybe
>> less good?
>>
>>
>>
>>
>>
>> From: Josh Wills <jwills@cloudera.com>
>> Reply-To: "user@crunch.apache.org" <user@crunch.apache.org>
>> Date: Tuesday, August 20, 2013 1:00 PM
>> To: "user@crunch.apache.org" <user@crunch.apache.org>
>> Subject: Re: Multiple output channels from Crunch DoFn
>>
>>
>> A related idea that has come up a few times has been the idea of having a
>> way of writing values to different files based on a key: some kind of
>> generalization of Target that would itself write multiple outputs under
>> the covers, with the name
>>  of the output file indicated by some function of the key of the PTable.
>>
>> For this situation, we would have a PTable that was like PTable<String,
>> Union<S, T>>, or just PTable<String, T> if the output types were all
the
>> same, and the String would specify the name of an output directory (that I
>> suppose would live underneath some base
>>  output directory for the Target) that the record would be written to.
>>
>> There are a couple of limitations to this approach, I think: we couldn't
>> consider this kind of PTable "materialized" w/o doing an overhaul of the
>> materialization logic-- it would act sort of like an HTableTarget in that
>> it would be write-only in flows.
>>  There are probably some others I can't think of off the top of my head.
>> What do you guys think?
>>
>> J
>>
>>
>>
>> On Tue, Aug 20, 2013 at 9:49 AM, Brush,Ryan
>> <RBRUSH@cerner.com> wrote:
>>
>> I happen to have some context around this, so I wanted to expand on
>> Brandon's question a bit.  The use case here is we're dealing with a large
>> volume of third-party input and expect a certain percentage of bogus or
>> malformed data. Rather than simply logging
>>  instances of bad records, we want to treat it as a signal we can learn
>> from, both for improving our processing logic and for creating structured
>> reports we can use to troubleshoot data sources.
>>
>> This leads to the "standard out" and "standard error" metaphors Brandon
>> mentions: in most cases, our Crunch DoFns would emit a processed structure
>> useful downstream. But we'd also like to be able to emit a structured
>> error -- probably as an Avro object in our
>>  case -- and persist that as a byproduct of our main processing pipeline.
>>
>> Would it make sense for such DoFn's to emit something some form of
>> "Option" object? We could then attach two consuming functions to it: one
>> that handles the "success" case, sending the resulting Avro object
>> downstream. Another DoFn attached to the "Option"
>>  object would be a no-op unless the Option contained an "error" structure,
>> at which point we persist it to some well-known location for later
>> analysis.
>>
>> I think this is entirely achievable using existing mechanisms...but it
>> seems like common enough use case (at least for us) to establish some
>> idioms for dealing it.
>>
>> On Aug 20, 2013, at 11:13 AM, Inman,Brandon wrote:
>>
>> >
>> > We've been looking at ways to do multiple outputs in Crunch jobs,
>> > specifically writing out some kind of Status or Error Avro object, based
>> > on failures that occur processing individual records in various jobs. It
>> > had been suggested that, rather than logging these errors to traditional
>> > loggers, to consider them an output of the Crunch job.  After some
>> > internal discussion, it was suggested to run the ideas past the Crunch
>> > community.
>> >
>> >
>> > A major goal we have is to end with all the error output in a location
>> > that makes it easy to run Hive queries or perform other MapReduce-style
>> > analysis to quickly view all errors across the larger system without the
>> > need go to multiple facilities.  This means standardizing on the Avro
>> > object, but it also necessitates decoupling the storage of the object
>> >from
>> > the "standard output" of the job.
>> >
>> >
>> > As Crunch DoFns support a single Emitter per invocation of process(),
>> the
>> > solution that gathered the most support would be to emit an object
>> >similar
>> > to Pair<>, where first would be the "standard out" and second would be
>> >the
>> > "standard error".  A DoFn would generally only populate one (nothing
>> > preventing it from populating both if appropriate, but not really
>> >intended
>> > as a part of general use), and separate DoFns would filter out the two
>> > components of the pair and write the values to the appropriate targets.
>> >
>> > As far as the emitted pairing object; the concept of a tagged union was
>> > suggested although there currently isn't support in Java or Avro for the
>> > concept; it was noted that
>> >
>> https://issues.apache.org/jira/browse/CRUNCH-239
>> <
>> https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/
>>
>> browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjr
>>
>> SpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMaf
>> c%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/browse/CRUNCH-239&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=dceef88f8fadf4d34b61b47e1728bc63dda36ad51151ccfceb5c84ea45be0e82>
>> >
>>  might be a close
>> > candidate. Pair<> would meet the requirements, although it was suggested
>> > that a simple object dedicated to the task could make a cleaner
>> approach.
>> >
>> > Any general thoughts on this approach? Are there any other patterns that
>> > might serve us better, or anything on the Crunch roadmap that might be
>> > more appropriate?
>> >
>> >
>> > Brandon Inman
>> > Software Architect
>> > www.cerner.com <http://www.cerner.com>
>> >
>> >
>> > CONFIDENTIALITY NOTICE This message and any included attachments are
>> >from Cerner Corporation and are intended only for the addressee. The
>> >information contained in this message is confidential and may constitute
>> >inside or non-public information under international,
>>  federal, or state securities laws. Unauthorized forwarding, printing,
>> copying, distribution, or use of such information is strictly prohibited
>> and may be unlawful. If you are not the addressee, please promptly delete
>> this message and notify the sender of the
>>  delivery error by e-mail or you may call Cerner's corporate offices in
>> Kansas City, Missouri, U.S.A at
>>
>> (+1) (816)221-1024 <tel:%28%2B1%29%20%28816%29221-1024>.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera
>> <
>> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
>>
>> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
>>
>> 3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f
>> 7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=508adfd2097ef3f7c9738fe9f729f47d95ae1d6568dabe09697317fd6d53f9d1>
>> >
>> Twitter:
>> @josh_wills
>> <
>> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
>>
>> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
>>
>> 7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e2
>> 90f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=ZOuvUFJf2XiQL4mXsKMy9ArJwoDf7VP6eNKgaIHMafc%3D%0A&s=585b666e290f5104a6f13a0fcbc52f4fc6cd93365dc1d44d3e49ed09c2fe1996>
>> >
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera
>> <
>> https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqf
>>
>> XspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%
>>
>> 3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e4
>> 31966b19fec39773cae0b9319fc310155b4ab636cabd4799a<https://urldefense.proofpoint.com/v1/url?u=http://www.cloudera.com&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=1095ecaa17ab1e431966b19fec39773cae0b9319fc310155b4ab636cabd4799a>
>> >
>> Twitter:
>> @josh_wills
>> <
>> https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k
>>
>> =PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH
>>
>> 7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae6
>> 0caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6<https://urldefense.proofpoint.com/v1/url?u=http://twitter.com/josh_wills&k=PmKqfXspAHNo6iYJ48Q45A%3D%3D%0A&r=RiPWMqlVaSiSs74U1fVjrSpZO%2FvyTEWUW1RhCH7Ftlg%3D%0A&m=JjOKxAMa8Miu4X1FpLdnSvt5WCGlwK4xE7i92OmAex0%3D%0A&s=8b9feeae60caabb4edd6caff1fd188790717924e130b1d3533089bee9a85e9a6>
>> >
>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Mime
View raw message