flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yun Gao" <yungao...@aliyun.com>
Subject Re: [QUESTION] How to parallelize with explicit punctuation in Flink?
Date Tue, 08 Oct 2019 06:19:24 GMT

      Hi Filip,
           I have one question on the problem: what is the expected behavior when the  parallelism
of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial
sum of all values received, and whenever the barrier is received, then it just outputs the
partial sum of the received value ? 

          Another question is that I think in Flink the watermark mechanism has provided the
functionality similar to punctuation,  therefore is it possible to implement the same logic
with the Flink Window directly?
    Best,
    Yun


------------------------------------------------------------------
From:Filip Niksic <fniksic@seas.upenn.edu>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <user@flink.apache.org>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,
What would be a natural way to implement a parallel version of the following Flink program?
Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem:
Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit
punctuation.
public interface DataItem {}
public class Value implements DataItem {
 private final int val;
 public Value(int val) { this.val = val; }
 public int getVal() { return val; }
}
public class Barrier implements DataItem {}
The program should maintain a sum of values seen since the beginning of the stream. On each
Barrier, the program should output the sum seen so far.
An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state
and emitting it on each Barrier.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<DataItem> stream = env.fromElements(DataItem.class,
 new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());
stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
 private int sum = 0;
 @Override
 public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception
{
 if (dataItem instanceof Value) {
 sum += ((Value) dataItem).getVal();
       } else {
           collector.collect(sum);
       }
   }
}).setParallelism(1).print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output
However, such an operator cannot be parallelized, since the order of Values and Barriers matters.
That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit
parallelism?
(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance
of the FlatMapFunction. A proper implementation would take more care in using state. Feel
free to comment on that as well.)

Best regards,

Filip Niksic

Mime
View raw message