crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frances Perry <...@google.com.INVALID>
Subject Re: Alternative strategy for incorporating Java 8 lambdas into Crunch
Date Fri, 18 Dec 2015 05:19:58 GMT
Would love to discuss -- only caveat being that I know FlumeJava and
Dataflow, with only a passing familiarity of Crunch. But I hear it's
similar ;-)

As you might guess based on Dataflow, I'm a huge fan of encapsulating
transformations as objects. We found two main benefits:

(1) Extensibility -- There is no distinction between 'built-in' and
user-defined composite transformations. Users can create their own
libraries that behave just like the ones that happen to be provided by us.
PCollection doesn't have 8675309 methods defined on it. (Note an unintended
downside here for discoverability: Eclipse autocomplete does not help you
ruminate on what things you can do to a PCollection...)

(2) Modularity  --  Any good developer writes pipelines modularly by using
java methods, etc. But if that structure disappears at execution time, the
graph quickly gets huge and impossible to follow. And reasoning about the
execution of the smaller optimized graph isn't much better because nothing
is where you expect, particularly when your program logic DoFns are hidden
in amongst all the trivial DoFns tagging elements within join libraries and
the like. But by defining these building blocks in a way that the system
can understand, we are able to preserve that structure in our monitoring
and execution experience.

Other implications / tradeoffs of transformations as objects:

(1) We removed PTable because GroupByKey no longer needed a home there once
it became a PTransform, and there wasn't as much point with it gone. But I
do still miss the type compactness of PTables every time I have to type
that extra KV<...>.

(2) We were hoping PTransforms they would be useful even beyond
transformations intended as reusable libraries because of the monitoring
benefits (see our "best practices" version of WordCount
<https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/b4a10dc2b044745f48f7e1c30a4c7783a5694288/src/main/java/com/google/cloud/dataflow/examples/WordCount.java>).
But many users are still writing java methods for organization instead
because PTransforms feel heavyweight. (I blame Java.)

(3) Transforms with multiple PCollection inputs & outputs get a little
hairy.

We really, really, really, really wanted lambda support, but wanted to
maintain Java 7 compatibility without having to completely fork things. So
we decided to define our own basic function interface and overload common
PTransforms to take that. Sounded simple, but we hit a snag with coder
inference. When possible, we try to avoid having users specify the coder
(aka. your PType) for the output PCollection by inferring it from the Java
type information. We couldn't make that work with lambdas due to erasure,
but we also didn't want to muddy things for Java 7 users.

The solution we went with was to define a SerializableFunction
<https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/f25aa39e94cc1401fba734c22204c36ad1a557b4/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/SerializableFunction.java>
interface
and have an abstract class SimpleFunction
<https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/f25aa39e94cc1401fba734c22204c36ad1a557b4/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/SimpleFunction.java>
that
implements that (and captures typing info). Then transformations can
provide either option or both.

Here's our MapElements
<https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/f25aa39e94cc1401fba734c22204c36ad1a557b4/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java>
PTransform.

MinimalWordCount in Java 7
<https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/b4a10dc2b044745f48f7e1c30a4c7783a5694288/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java>,
calls MapElements via a SimpleFunction and infers output coders from the
types:

     .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                       @Override
                       public String apply(KV<String, Long> input) {
                         return input.getKey() + ": " + input.getValue();
                       }
                     }))

Minimal WordCount in Java 8
<https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/b4a10dc2b044745f48f7e1c30a4c7783a5694288/src/main/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java>,
calls MapElements via a SerializableFunction and provided type descriptor:

     .apply(MapElements
         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " +
wordCount.getValue())
         .withOutputType(new TypeDescriptor<String>() {}))

None of us are particularly happy with having to provide the type
descriptor, but at least there are lambdas.

Hope some of that was helpful. There are plenty more things Dataflow does
differently than in FlumeJava. Some day I hope to actually write down the
reasons for them all...

Frances

On Wed, Dec 16, 2015 at 9:34 PM, Josh Wills <josh.wills@gmail.com> wrote:

> I remember thinking when I first read the Dataflow Javadoc about
> PTransforms something along the lines of "Damn. That is the right way to do
> this." It is a stupendously elegant API, the kind that only an API
> developer could really appreciate (as I think David lays out nicely below.)
> I'll be amused if we end up converging to the same approach that they took
> with Dataflow, given that Crunch is based on the way FlumeJava looked when
> I left Google in 2011.
>
> I'm taking advantage of the fact that I know Frances Perry (TL of the
> Dataflow project) to cc her on this email and ask if there is a way that we
> can collaborate with them on this. I'd be sort of shocked if the folks at
> Google hadn't thought about it already.
>
> J
>
> On Wed, Dec 16, 2015 at 2:25 PM, David Whiting <davidwhiting@gmail.com>
> wrote:
>
>> So I've done a few more iterations on this and I'm back with even more
>> comments (I really want to get this right, can you tell? :-P).
>>
>> The "wrap the whole collection" approach gives you a really usable API,
>> but
>> with minor inconvenience to the developer (wrapping PCollections into
>> LCollections before they can start using it). I tried both (a) making
>> LCollection<S> implement PCollection<S> and delegate _everything_
>> (wrapping
>> the relevant return types into LCollections) and (b) Having a cut-back
>> functional-first API (like Scrunch), which requires digging down to the
>> underlying PCollection and re-promoting afterwards to do more complex
>> stuff.
>>
>> The problem with (a) is that you have an unwieldy mega-interface in your
>> LThings, increasing your maintainence workload and requiring to be updated
>> with every change to PThings, and just shifting the problem with the
>> existing IDoFn/IMapFn implementation into the lambda project instead - not
>> really providing much of an improvement over the current master branch and
>> increasing code volume significantly.
>>
>> The problem with (b) is that existing tools and library functions will
>> require unwrapping and wrapping to use, and the whole notion of
>> compatibility becomes awkward. We don't have the "luxury" of being able to
>> promote and demote types from wrapped versions implicitly like we do in
>> Scala, so it again becomes slightly awkward to use for when stuff gets
>> more
>> complex. Also, one has to define what the subset of allowable operations
>> is, and that would end up being somewhat arbitrary. There was one
>> interesting development to come out of this investigation, and that's the
>> idea of cutting the size of the interface in half by not having separate
>> "String name," prefixed versions of every method, and instead making it a
>> separate method call to set it, ie:
>>
>> someCollection.id("Only adult users").filter(user -> user.getAge() >= 18)
>>               .id("Extact username").by(User::getUsername, strings())
>>
>> But anyway I digress, back to topic: That leaves me us with 3 different
>> versions of Java lambda support, all of which I find slightly unsatisfying
>> in their own way. The master branch implementation of IFns leads to
>> exploding PThings interfaces and the corresponding burden on implementers,
>> as well as not allowing other Java 8 language things; the
>> static-import-wrap solution leaves the code looking awkward and makes it
>> necessary to give lots of options kinda arbitrary names; and the "wrap the
>> whole interface" solution essentially creates a divergence in the APIs,
>> reduces compatibility with existing code and/or increases the maintainence
>> burden.
>>
>> So, here comes option 4. Bear with me on this one, because it's a bit of
>> an
>> shift in mindset, inspired in part by what Google did in the time between
>> the FlumeJava paper and DataFlow on GCP.
>>
>> The fundamental reason it's difficult to implement this change is that we
>> can't just "add a few things" to the PCollection interface. If we extend
>> it
>> via inheritance or composition we must extend and wrap everything so that
>> we return the extended type, and if we want to add things to the interface
>> itself, they must be implemented and propagated through all the
>> implementations. If we could use default methods in the PCollection
>> interface it would help, but there's a different approach we could take.
>>
>> If we encapsulate the transformation itself (map, flatMap, filter,...)
>> into
>> an object, comprising of a function to operate on the PCollection itself,
>> then everyone can extend this in whatever way they like. In a sense we add
>> an extension point so that people can add user-defined functions without
>> breaking the fluency of the interface (in the way that many of the library
>> functions now require you to).
>>
>> This Gist probably explains it better than I can in words:
>> https://gist.github.com/DavW/02d935366bc610c1cbb3
>>
>> The latter part of it is also very interesting because it solves another
>> problem. We can potentially (subject to a deprecation cycle) remove many
>> of
>> methods that have been added to PThings for fluency reasons (like max(),
>> min(), first(), length(), ...) and just use them via Transforms instead,
>> keeping only lower-level primitives as part of the PCollection interface
>> itself, making it easier for people wishing to implement Crunch for future
>> data technologies (which will be crucial to the long-term survival of this
>> project).
>>
>> I'm not sure that this is definitely better than all of the other 3
>> options, but consider this an RFC on the topic.
>>
>>
>>
>>
>>
>> On 15 December 2015 at 10:34, Gabriel Reid <gabriel.reid@gmail.com>
>> wrote:
>>
>> > Yeah, looking at the two next to each other I'm going for the
>> > collections approach as well. +1
>> >
>> > On Tue, Dec 15, 2015 at 2:04 AM, Josh Wills <josh.wills@gmail.com>
>> wrote:
>> > > On Mon, Dec 14, 2015 at 4:15 PM, David Whiting <
>> davidwhiting@gmail.com>
>> > > wrote:
>> > >
>> > >> 1) Not at all, just some leftover working names for stuff.
>> > >>
>> > >> 2) Not for a totally minimal implementation, but some of the
>> features I
>> > >> would like to include would rely on Java 8 things, for example
>> adapting
>> > the
>> > >> GroupedTable stuff to use Streams rather than Iterables because of
a)
>> > the
>> > >> extra expressivity and b) the implied once-only traversal. We could
>> > have a
>> > >> filterMap which applies a Function<S, Optional<T>> (my
most common
>> use
>> > case
>> > >> for a DoFn instead of a MapFn at the moment). We can also potentially
>> > >> utilise Collectors for collapsing values in reduce-side stuff and
>> > finally,
>> > >> it'll make the implementation of it a fair bit easier. The maven
>> > overhead
>> > >> is pretty low, so I guess it's just the existence of an extra
>> artifact
>> > to
>> > >> consider. The way I see it is that it's a push to make the API feel
>> more
>> > >> like Java streams and be more immediately usable by someone who knows
>> > Java
>> > >> streams but not necessarily big data, so the more we can replicate
>> that
>> > >> feel by integrating with other familiar Java 8 features, the better.
>> > >>
>> > >
>> > > Makes sense to me. +1 for a new crunch-lambda module.
>> > >
>> > >
>> > >>
>> > >> On 15 December 2015 at 00:51, Josh Wills <josh.wills@gmail.com>
>> wrote:
>> > >>
>> > >> > I think I lean towards the collections approach, but that's
>> probably
>> > >> > because of my Scrunch experience. Two questions:
>> > >> >
>> > >> > 1) Is mapToTable necessary? I would think map(SFunction,
>> PTableType)
>> > >> would
>> > >> > be distinguishable from map(SFunction, PType) by the compiler
in
>> the
>> > same
>> > >> > way it is for parallelDo.
>> > >> > 2) Does the collections approach need a separate maven target
at
>> all,
>> > or
>> > >> > could it just be part of crunch-core as a replacement for the
IFn
>> > stuff?
>> > >> Or
>> > >> > is there Java 8-only stuff we'll want to add in to its API?
>> > >> >
>> > >> > On Mon, Dec 14, 2015 at 3:13 PM, David Whiting <davw@apache.org>
>> > wrote:
>> > >> >
>> > >> > > Ok, so I've implemented a few iterations of this. I went
forward
>> > with
>> > >> the
>> > >> > > "wrap the functions" method, which seemed to work alright,
but
>> > finding
>> > >> > good
>> > >> > > names for functions which essentially just wrap functions
but
>> which
>> > >> > aren't
>> > >> > > ambiguous in erasure and read nicely was a real challenge.
I
>> showed
>> > >> some
>> > >> > > sample code to some of my fellow data engineers and the consensus
>> > >> seemed
>> > >> > to
>> > >> > > be that it was definitely better than anonymous inner classes,
>> but
>> > it
>> > >> > still
>> > >> > > felt kind of awkward and strange to use.
>> > >> > >
>> > >> > > So here's a 3rd option: wrap the collection types rather
than the
>> > >> > function
>> > >> > > types, and present an API which feels truly Java 8 native
whilst
>> > still
>> > >> > > being able to dig back to the underlying PCollections (doing
>> pretty
>> > >> much
>> > >> > > what Scrunch does, but with less implicit Scala magic).
>> > >> > >
>> > >> > > Here's a super-minimal proof-of-concept for that:
>> > >> > > https://gist.github.com/DavW/7efe484ea0c00cf6e66b
>> > >> > >
>> > >> > > and a comparison of the two approaches in usage:
>> > >> > > https://gist.github.com/DavW/997a92b31d55c5317fb7
>> > >> > >
>> > >> > >
>> > >> > > On 13 December 2015 at 16:14, Gabriel Reid <
>> gabriel.reid@gmail.com>
>> > >> > wrote:
>> > >> > >
>> > >> > > > This looks very cool. As long as we can keep things
compatible
>> > with
>> > >> > > > Java 7 using whatever kind of maven voodoo that's necessary,
>> I'm
>> > all
>> > >> > > > for it.
>> > >> > > >
>> > >> > > > I'd say no real reason to keep the IFn stuff if this
goes in.
>> > >> > > >
>> > >> > > > - Gabriel
>> > >> > > >
>> > >> > > > On Fri, Dec 11, 2015 at 11:18 PM, Josh Wills <
>> > josh.wills@gmail.com>
>> > >> > > wrote:
>> > >> > > > > It seems like a net positive over the IFn stuff,
so I could
>> > make an
>> > >> > > > > argument for replacing it, but if there's anyone
out there in
>> > love
>> > >> > > > w/IFns,
>> > >> > > > > they should speak up now. :)
>> > >> > > > >
>> > >> > > > > J
>> > >> > > > >
>> > >> > > > > On Fri, Dec 11, 2015 at 2:17 PM, David Whiting
<
>> davw@apache.org
>> > >
>> > >> > > wrote:
>> > >> > > > >
>> > >> > > > >> I *think* you can set language level and target
jdk on a
>> > >> per-module
>> > >> > > > basis,
>> > >> > > > >> so it should be relatively easy. I'll experiment
at some
>> point
>> > >> over
>> > >> > > the
>> > >> > > > >> weekend. Would this complement or replace the
I*Fn stuff do
>> you
>> > >> > think?
>> > >> > > > 14.0
>> > >> > > > >> is not yet released, so I guess it's not too
late to change
>> if
>> > we
>> > >> > want
>> > >> > > > to.
>> > >> > > > >>
>> > >> > > > >> On 11 December 2015 at 22:57, Josh Wills <
>> josh.wills@gmail.com
>> > >
>> > >> > > wrote:
>> > >> > > > >>
>> > >> > > > >> > That's the sexiest thing I've seen in
some time. +1 for a
>> > lambda
>> > >> > > > module,
>> > >> > > > >> > but how does that work in Maven-fu? Is
it like a
>> conditional
>> > >> > compile
>> > >> > > > or
>> > >> > > > >> > something?
>> > >> > > > >> >
>> > >> > > > >> > On Fri, Dec 11, 2015 at 1:20 PM, David
Whiting <
>> > davw@apache.org
>> > >> >
>> > >> > > > wrote:
>> > >> > > > >> >
>> > >> > > > >> > > Oops, my bad. Here's a Gist:
>> > >> > > > >> > > https://gist.github.com/DavW/e2588e42c45ad8c06038
>> > >> > > > >> > >
>> > >> > > > >> > > On 11 December 2015 at 18:43, Josh
Wills <
>> > >> josh.wills@gmail.com>
>> > >> > > > wrote:
>> > >> > > > >> > >
>> > >> > > > >> > > > I think it's kind of awesome,
but the attachment
>> didn't
>> > go
>> > >> > > > through-
>> > >> > > > >> PR
>> > >> > > > >> > or
>> > >> > > > >> > > > gist?
>> > >> > > > >> > > > On Fri, Dec 11, 2015 at 7:42
AM David Whiting <
>> > >> > davw@apache.org>
>> > >> > > > >> wrote:
>> > >> > > > >> > > >
>> > >> > > > >> > > > > While fixing the bug where
the IFn version of
>> > mapValues on
>> > >> > > > >> > > PGroupedTable
>> > >> > > > >> > > > > was missing, I got thinking
that this is quite an
>> > >> > inefficient
>> > >> > > > way
>> > >> > > > >> of
>> > >> > > > >> > > > > including support for lambdas
and method references,
>> > and
>> > >> it
>> > >> > > > still
>> > >> > > > >> > > didn't
>> > >> > > > >> > > > > actually support quite
a few of the features that
>> would
>> > >> make
>> > >> > > it
>> > >> > > > >> easy
>> > >> > > > >> > to
>> > >> > > > >> > > > > code against.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Negative parts of existing
lambda implementation:
>> > >> > > > >> > > > > 1) Explosion of already-crowded
PCollection, PTable
>> and
>> > >> > > > >> PGroupedTable
>> > >> > > > >> > > > > interfaces, and having
to implement those methods in
>> > all
>> > >> > > > >> > > implementations.
>> > >> > > > >> > > > > 2) Not supporting flatMap
to Optional or Stream
>> types.
>> > >> > > > >> > > > > 3) Not exposing convenient
types for reduce-type
>> > >> operations
>> > >> > > > (Stream
>> > >> > > > >> > > > > instead of Iterable, for
example).
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Something that would solve
all three of these is to
>> > build
>> > >> > > lambda
>> > >> > > > >> > > support
>> > >> > > > >> > > > > as a separate artifact
(so we can use all java8
>> types),
>> > >> and
>> > >> > > > instead
>> > >> > > > >> > of
>> > >> > > > >> > > > the
>> > >> > > > >> > > > > API being directly on the
PSomething interfaces, we
>> > just
>> > >> > have
>> > >> > > > >> > > convenient
>> > >> > > > >> > > > > ways to wrap up lambdas
into DoFns or MapFns via
>> > >> > > > >> statically-imported
>> > >> > > > >> > > > > methods.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > The usage then becomes
>> > >> > > > >> > > > > import static org.apache.crunch.Lambda.*;
>> > >> > > > >> > > > > ...
>> > >> > > > >> > > > > someCollection.parallelDo(flatMap(d
-> someFnOf(d)),
>> > pt)
>> > >> > > > >> > > > > ...
>> > >> > > > >> > > > > otherGroupedTable.mapValue(reduce(seq
->
>> > seq.mapToInt(i ->
>> > >> > > > >> i).sum()),
>> > >> > > > >> > > > > ints())
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Where flatMap and reduce
are static methods on
>> Lambda,
>> > and
>> > >> > > > Lambda
>> > >> > > > >> > goes
>> > >> > > > >> > > in
>> > >> > > > >> > > > > it's own artifact (to preserve
compatibility with 6
>> > and 7
>> > >> > for
>> > >> > > > the
>> > >> > > > >> > rest
>> > >> > > > >> > > of
>> > >> > > > >> > > > > Crunch).
>> > >> > > > >> > > > > I've attached a basic proof-of-concept
>> implementation
>> > >> which
>> > >> > > I've
>> > >> > > > >> > > tested a
>> > >> > > > >> > > > > few things with, and I'm
very happy to sketch out a
>> > more
>> > >> > > > >> substantial
>> > >> > > > >> > > > > implementation if people
here think it's a good
>> idea in
>> > >> > > general.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > > Thoughts? Ideas? Suggestions?
Please tell me if
>> this is
>> > >> > crazy.
>> > >> > > > >> > > > >
>> > >> > > > >> > > > >
>> > >> > > > >> > > >
>> > >> > > > >> > >
>> > >> > > > >> >
>> > >> > > > >>
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message