flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2495) Add a null point check in API DataStream.union
Date Fri, 07 Aug 2015 12:58:45 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661799#comment-14661799
] 

ASF GitHub Bot commented on FLINK-2495:
---------------------------------------

Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/999#discussion_r36515671
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
---
    @@ -256,9 +256,11 @@ public ExecutionConfig getExecutionConfig() {
     		DataStream<OUT> returnStream = this.copy();
     
     		for (DataStream<OUT> stream : streams) {
    -			for (DataStream<OUT> ds : stream.unionedStreams) {
    -				validateUnion(ds.getId());
    -				returnStream.unionedStreams.add(ds.copy());
    +			if (stream != null) {
    --- End diff --
    
    Hi,
    you may look at this problem with my test above.
    As you see, if I ignore the null, code "temp2.union(temp1)" will just copy a new DataStream.
    And this new DataStream has absolutely no change comparing with temp2.
    So, If the temp2 is good, the new DataStream is good too.
    And the test or other case similar to this test will execute successfully.
    Instead, there just will be a error.



> Add a null point check in API DataStream.union
> ----------------------------------------------
>
>                 Key: FLINK-2495
>                 URL: https://issues.apache.org/jira/browse/FLINK-2495
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Huang Wei
>             Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> The API(public DataStream<OUT> union(DataStream<OUT>... streams)) is a  external
interface for user.
> The parameter "streams" maybe null and it will throw NullPointerException error.
> This test below can be intuitive to explain this problem:
> package org.apache.flink.streaming.api;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import org.junit.Test;
> /**
>  * Created by HuangWHWHW on 2015/8/7.
>  */
> public class test {
> 	public static class sourceFunction extends RichParallelSourceFunction<String>
{
> 		public sourceFunction() {
> 		}
> 		@Override
> 		public void run(SourceContext<String> sourceContext) throws Exception {
> 			sourceContext.collect("a");
> 		}
> 		@Override
> 		public void cancel() {
> 		}
> 	}
> 	@Test
> 	public void testUnion(){
> 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		env.setParallelism(1);
> 		DataStream<String> source = env.addSource(new sourceFunction());
> 		DataStream<String> temp1 = null;
> 		DataStream<String> temp2 = source.map(new MapFunction<String, String>()
{
> 			@Override
> 			public String map(String value) throws Exception {
> 				if (value == "a") {
> 					return "This is for test temp2.";
> 				}
> 				return null;
> 			}
> 		});
> 		DataStream<String> sink = temp2.union(temp1);
> 		sink.print();
> 		try {
> 			env.execute();
> 		}catch (Exception e){
> 			e.printStackTrace();
> 		}
> 	}
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message