flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-4207) WindowOperator becomes very slow with allowed lateness
Date Wed, 13 Jul 2016 08:56:20 GMT
Aljoscha Krettek created FLINK-4207:
---------------------------------------

             Summary: WindowOperator becomes very slow with allowed lateness
                 Key: FLINK-4207
                 URL: https://issues.apache.org/jira/browse/FLINK-4207
             Project: Flink
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.1.0
            Reporter: Aljoscha Krettek


In this simple example the throughput (as measured by the count the window emits) becomes
very low when an allowed lateness is set:

{code}
public class WindowWordCount {

	public static void main(String[] args) throws Exception {

		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
		env.setParallelism(1);

		env.addSource(new InfiniteTupleSource(100_000))
				.keyBy(0)
				.timeWindow(Time.seconds(3))
				.allowedLateness(Time.seconds(1))
				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
					@Override
					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
							Tuple2<String, Integer> value2) throws Exception {
						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
					}
				})
				.filter(new FilterFunction<Tuple2<String, Integer>>() {
					private static final long serialVersionUID = 1L;

					@Override
					public boolean filter(Tuple2<String, Integer> value) throws Exception {
						return value.f0.startsWith("Tuple 0");
					}
				})
				.print();

		// execute program
		env.execute("WindowWordCount");
	}

	public static class InfiniteTupleSource implements ParallelSourceFunction<Tuple2<String,
Integer>> {
		private static final long serialVersionUID = 1L;

		private int numGroups;

		public InfiniteTupleSource(int numGroups) {
			this.numGroups = numGroups;
		}

		@Override
		public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception
{
			long index = 0;
			while (true) {
				Tuple2<String, Integer> tuple = new Tuple2<>("Tuple " + (index % numGroups),
1);
				out.collect(tuple);
				index++;
			}
		}

		@Override
		public void cancel() {
		}
	}
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message