beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Łukasz Gajowy (JIRA) <j...@apache.org>
Subject [jira] [Comment Edited] (BEAM-1820) Source.getDefaultOutputCoder() should be @Nullable
Date Fri, 07 Jul 2017 17:22:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-1820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078407#comment-16078407
] 

Łukasz Gajowy edited comment on BEAM-1820 at 7/7/17 5:21 PM:
-------------------------------------------------------------

Below there's a list of PTransforms that assume Source.getDefaultOutputCoder() is not nullable:

- UnboundedReadFromBoundedSource
- StreamingBoundedRead
- StreamingUnboundedRead
- BoundedReadFromUnboundedSource
- Read.Bounded 
- Read.Unbounded
- StreamingUnboundedRead/ReadWithIds (DataflowRunner)

Coder in first four PTransforms can be obtained during the expand() method call from a Read.from()
performed there. When I modify it, this way, all the test pass. Below there's an example on
how i do this (based on UnboundedReadFromBoundedSource)

{code:title=Bar.java|borderStyle=solid}
private Coder<T> outputCoder;
  
  @Override
  public PCollection<T> expand(PBegin input) {
    PCollection<T> collection = input.getPipeline().apply(
        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));

    outputCoder = collection.getCoder();

    return collection;
  }

  @Override
  protected Coder<T> getDefaultOutputCoder() {
    return outputCoder;
  }

{code}
The other three PTransforms fail the tests when i try to make a similar change. This is due
to the fact that the coder is not being set in the PCollection. 

Therefore I have the following questions: 

# Is the way of obtaining the coder correct in the first four cases (this is how i understood
the task)? Is the fact that the outputCoder variable remains null until the expand() method
executes ok?
# What about PTransforms in which the Read.from() in the expand() method does not set the
Coder on PCollection? 
# Also, there are places in Source classes (e.g. MicrobatchSource) that assume not null default
output coder, but I think that correcting them is not a part of the task because it's the
transform code that should worry about the Coders regarding to the issue description. Do I
understand correctly?


was (Author: łukaszg):
Below there's a list of PTransforms that assume Source.getDefaultOutputCoder() is not nullable:

- UnboundedReadFromBoundedSource
- StreamingBoundedRead
- StreamingUnboundedRead
- BoundedReadFromUnboundedSource
- Read.Bounded 
- Read.Unbounded
- StreamingUnboundedRead/ReadWithIds (DataflowRunner)

Coder in first four PTransforms can be obtained during the expand() method call from a Read.from()
performed there. When I modify it, this way, all the test pass. Below there's an example on
how i do this (based on UnboundedReadFromBoundedSource)


private Coder<T> outputCoder;
  
  @Override
  public PCollection<T> expand(PBegin input) {
    PCollection<T> collection = input.getPipeline().apply(
        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));

    outputCoder = collection.getCoder();

    return collection;
  }

  @Override
  protected Coder<T> getDefaultOutputCoder() {
    return outputCoder;
  }


The other three PTransforms fail the tests when i try to make a similar change. This is due
to the fact that the coder is not being set in the PCollection. 

Therefore I have the following questions: 

# Is the way of obtaining the coder correct in the first four cases (this is how i understood
the task)? Is the fact that the outputCoder variable remains null until the expand() method
executes ok?
# What about PTransforms in which the Read.from() in the expand() method does not set the
Coder on PCollection? 
# Also, there are places in Source classes (e.g. MicrobatchSource) that assume not null default
output coder, but I think that correcting them is not a part of the task because it's the
transform code that should worry about the Coders regarding to the issue description. Do I
understand correctly?

> Source.getDefaultOutputCoder() should be @Nullable
> --------------------------------------------------
>
>                 Key: BEAM-1820
>                 URL: https://issues.apache.org/jira/browse/BEAM-1820
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Łukasz Gajowy
>              Labels: easyfix, starter
>
> Source.getDefaultOutputCoder() returns a coder for elements produced by the source.
> However, the Source objects are nearly always hidden from the user and instead encapsulated
in a transform. Often, an enclosing transform has a better idea of what coder should be used
to encode these elements (e.g. a user supplied a Coder to that transform's configuration).
In that case, it'd be good if Source.getDefaultOutputCoder() could just return null, and coder
would have to be handled by the enclosing transform or perhaps specified on the output of
that transform explicitly.
> Right now there's a bunch of code in the SDK and runners that assumes Source.getDefaultOutputCoder()
returns non-null. That code would need to be fixed to instead use the coder set on the collection
produced by Read.from(source).
> It all appears pretty easy to fix, so this is a good starter item.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message