flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows
Date Fri, 23 Jun 2017 07:50:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060530#comment-16060530
] 

Fabian Hueske commented on FLINK-6990:
--------------------------------------

The poor performance has two reasons:

1) The implementation of time-based sliding windows in Flink. Flink treats each window individually
and replicates records to each window. For a window of 10 minute size that slides by 1 second
the data is replicated 600 fold (10 minutes / 1 second).
2) Your choice of using a WindowFunction instead of a ReduceFunction or AggregateFunction.
A WindowFundtion requires to collect all elements and applies the function at the end of the
window. If you implement this with a ReduceFunction (or AggregateFunction) the aggregation
can be incrementally applied whenever a new record is assigned to a window. Consequently,
the window only holds a single aggregated record instead of a list of all records.

Count-based sliding windows are differently implemented and avoid the replication of records.
However, they cannot leverage the eager aggregation of a ReduceFunction and apply the function
at the end of a window similar to a WindowFunction.

> Poor performance with Sliding Time Windows
> ------------------------------------------
>
>                 Key: FLINK-6990
>                 URL: https://issues.apache.org/jira/browse/FLINK-6990
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.3.0
>         Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>            Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a simple
example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
>         //Streaming 10,000 events per second
>         see.addSource(new SourceFunction<TestObject>() {
>             transient ScheduledExecutorService executor;
>             @Override
>             public synchronized void run(final SourceContext<TestObject> ctx) throws
Exception {
>                 executor = Executors.newSingleThreadScheduledExecutor();
>                 executor.scheduleAtFixedRate(new Runnable() {
>                     @Override
>                     public void run() {
>                         for (int k = 0; k < 10; k++) {
>                             for (int i = 0; i < 1000; i++) {
>                                 TestObject obj = new TestObject();
>                                 obj.setKey(k);
>                                 ctx.collect(obj);
>                             }
>                         }
>                     }
>                 }, 0, 1, TimeUnit.SECONDS);
>                 this.wait();
>             }
>             @Override
>             public synchronized void cancel() {
>                 executor.shutdown();
>                 this.notify();
>             }
>         }).keyBy("key")
>         .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(1))).apply(new
WindowFunction<TestObject, String, Tuple, TimeWindow>() {
>             @Override
>             public void apply(Tuple key, TimeWindow window, Iterable<TestObject>
input, Collector<String> out) throws Exception {
>                 int count = 0;
>                 for (Object obj : input) {
>                     count++;
>                 }
>                 out.collect(key.getField(0) + ": " + count);
>             }
>         })
>         .print();
>         see.execute();
>     }
>     public static class TestObject {
>         private Integer key;
>         public Integer getKey() {
>             return key;
>         }
>         public void setKey(Integer key) {
>             this.key = key;
>         }
>     }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I would expect
a steady stream of output at 1 second intervals.  For comparison, you can switch to a count
window of similar size which peforms just fine:
> {code:java}
>    .countWindow(600000, 1000).apply(new WindowFunction<FlinkPerfTest.TestObject, String,
Tuple, GlobalWindow>() {
>                     @Override
>                     public void apply(Tuple key, GlobalWindow window, Iterable<TestObject>
input, Collector<String> out) throws Exception {
>                         int count = 0;
>                         for (Object obj : input) {
>                             count++;
>                         }
>                         out.collect(key.getField(0) + ": " + count);
>                     }
>                 })
> {code}
> I would expect the sliding time window to perform similarly to a count window.  The sliding
time window also uses significantly more cpu and memory than the count window.  I would also
expect resource consumption to be similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is locking
with the checkpointLock which acts like a global lock.  There should be a lock per key or
preferably a lock-less solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message