flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9141) Calling getSideOutput() and split() on one DataStream causes NPE
Date Wed, 11 Apr 2018 09:20:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433602#comment-16433602

ASF GitHub Bot commented on FLINK-9141:

GitHub user zentol opened a pull request:


    [FLINK-9141][datastream] Fail early when using both split and side-outputs

    ## What is the purpose of the change
    With this PR we fail early if a user attempts to use split() and side-outputs on a single
DataStream. Previously this would lead to a NullPointerException at runtime.
    ## Brief change log
    * keep track of split() calls in `SingleOutputStreamOperator` by overriding it and setting
the `wasSplitApplied` flag
    * add checks to split() and getSideOutput() that throw an exception if the other method
was already called
    ## Verifying this change
    This change added tests and can be verified as follows:
    * run SplitSideOutputTest
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    ## Documentation
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 9141

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5836
commit ed3ec8716c6d26eee31c4d0ff02c8bfdd70a19d4
Author: zentol <chesnay@...>
Date:   2018-04-11T09:13:52Z

    [FLINK-9141][datastream] Fail early when using both split and side-outputs


> Calling getSideOutput() and split() on one DataStream causes NPE
> ----------------------------------------------------------------
>                 Key: FLINK-9141
>                 URL: https://issues.apache.org/jira/browse/FLINK-9141
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Critical
> Calling both {{getSideOutput()}} and {{split()}} on one DataStream causes a {{NullPointerException}}
to be thrown at runtime.
> As a work-around one can add a no-op map function before the split() call.
> Exception:
> {code}
> Caused by: java.lang.NullPointerException
> 	at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:79)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:326)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:128)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:274)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> Reproducer:
> {code}
> private static final OutputTag<String> tag = new OutputTag<String>("tag")
> public static void main(String[] args) throws Exception {
> 	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 	DataStream<String> dataStream1 = env.fromElements("foo");
> 	SingleOutputStreamOperator<String> processedStream = dataStream1
> 		.process(new ProcessFunction<String, String>() {
> 			@Override
> 			public void processElement(String value, Context ctx, Collector<String> out)
> 			}
> 		});
> 	processedStream.getSideOutput(tag)
> 		.print();
> 	processedStream
> 		.split(Collections::singletonList)
> 		.select("bar")
> 		.print();
> 	env.execute();
> }
> {code}

This message was sent by Atlassian JIRA

View raw message