flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-5031) Consecutive DataStream.split() ignored
Date Tue, 25 Jul 2017 15:55:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16100255#comment-16100255
] 

Aljoscha Krettek edited comment on FLINK-5031 at 7/25/17 3:54 PM:
------------------------------------------------------------------

In that case, I would suggest to create an {{OutputTag}} for every combination of tags in
your values that you're interested in. Say {{OutputTag("a")}} would receive elements that
have {{"tag1, "tag2"}} and {{OutputTag("b")}} would receive elements with tags {{"tag3", "tag5"}}.
Inside the {{ProcessFunction}} you do the filtering based on the element tag and emit to the
correct output tags.

This is even more efficient than split/select because split/select creates more objects under
the hood.

Also, the output type of your {{ProcessFunction}} can be {{Void}}, in case you never need
the output.


was (Author: aljoscha):
In that case, I would suggest to create an {{OutputTag}} for every combination of tags in
your values that you're interested in. Say {{OutputTag("a")}} would receive elements that
have {"tag1, "tag2"} and {{OutputTag("b")}} would receive elements with tags {{"tag3", "tag5"}}.
Inside the {{ProcessFunction}} you do the filtering based on the element tag and emit to the
correct output tags.

This is even more efficient than split/select because split/select creates more objects under
the hood.

Also, the output type of your {{ProcessFunction}} can be {{Void}}, in case you never need
the output.

> Consecutive DataStream.split() ignored
> --------------------------------------
>
>                 Key: FLINK-5031
>                 URL: https://issues.apache.org/jira/browse/FLINK-5031
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Fabian Hueske
>            Assignee: Renkai Ge
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector<Long> {
> 	long threshold;
> 	public ThresholdSelector(long threshold) {
> 		this.threshold = threshold;
> 	}
> 	@Override
> 	public Iterable<String> select(Long value) {
> 		if (value < threshold) {
> 			return Collections.singletonList("Less");
> 		} else {
> 			return Collections.singletonList("GreaterEqual");
> 		}
> 	}
> }
> public static void main(String[] args) throws Exception {
> 	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 	env.setParallelism(1);
> 	SplitStream<Long> split1 = env.generateSequence(1, 11)
> 		.split(new ThresholdSelector(6));
> 	// stream11 should be [1,2,3,4,5]
> 	DataStream<Long> stream11 = split1.select("Less");
> 	SplitStream<Long> split2 = stream11
> //		.map(new MapFunction<Long, Long>() {
> //			@Override
> //			public Long map(Long value) throws Exception {
> //				return value;
> //			}
> //		})
> 		.split(new ThresholdSelector(3));
> 	DataStream<Long> stream21 = split2.select("Less");
> 	// stream21 should be [1,2]
> 	stream21.print();
> 	env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second {{split}}
operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to the program.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message