beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <lc...@google.com>
Subject Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression
Date Wed, 19 Sep 2018 20:52:25 GMT
Thanks for the proposal and it does seem to make the API cleaner to build
anonymous composite transforms.

In your experience have you had issues where the API doesn't work out well
because the PTransform:
* is not able to override how the output coder is inferred?
* can't supply display data?

+user@beam.apache.org <user@beam.apache.org>, do users think that the
provided API would be useful enough for it to be added to the core SDK or
would the addition of the method provide noise/detract from the existing
API?

On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas <jklukas@mozilla.com> wrote:

> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
> suggestion and make it more concrete:
>
> https://issues.apache.org/jira/browse/BEAM-5413
> https://github.com/apache/beam/pull/6414
>
> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <jklukas@mozilla.com> wrote:
>
>> Hello all, I'm a data engineer at Mozilla working on a first project
>> using Beam. I've been impressed with the usability of the API as there are
>> good built-in solutions for handling many simple transformation cases with
>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>> missing.
>>
>> It appears that none of the existing PTransform factories are generic
>> enough to take in or output a PCollectionTuple, but we've found many use
>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>> in a lambda expression.
>>
>> For example, we've defined several PTransforms that return main and error
>> output stream bundled in a PCollectionTuple. We defined a
>> CompositeTransform interface so that we could handle the error output in a
>> lambda expression like:
>>
>> pipeline
>>     .apply("attempt to deserialize messages", new
>> MyDeserializationTransform())
>>     .apply("write deserialization errors",
>>         CompositeTransform.of((PCollectionTuple input) -> {
>>             input.get(errorTag).apply(new MyErrorOutputTransform())
>>             return input.get(mainTag);
>>         })
>>     .apply("more processing on the deserialized messages", new
>> MyOtherTransform())
>>
>> I'd be interested in contributing a patch to add this functionality,
>> perhaps as a static method PTransform.compose(). Would that patch be
>> welcome? Are there other thoughts on naming?
>>
>> The full code of the CompositeTransform interface we're currently using
>> is included below.
>>
>>
>> public interface CompositeTransform<InputT extends PInput, OutputT
>> extends POutput> {
>>   OutputT expand(InputT input);
>>
>>   /**
>>    * The public factory method that serves as the entrypoint for users to
>> create a composite PTransform.
>>    */
>>   static <InputT extends PInput, OutputT extends POutput>
>>         PTransform<InputT, OutputT> of(CompositeTransform<InputT,
>> OutputT> transform) {
>>     return new PTransform<InputT, OutputT>() {
>>       @Override
>>       public OutputT expand(InputT input) {
>>         return transform.expand(input);
>>       }
>>     };
>>   }
>> }
>>
>>
>>
>>

Mime
View raw message